diff --git a/src/webwright/tools/network_timings.py b/src/webwright/tools/network_timings.py new file mode 100644 index 0000000..66e0caa --- /dev/null +++ b/src/webwright/tools/network_timings.py @@ -0,0 +1,295 @@ +"""Network timings capture tool for monitoring HTTP requests during page interactions. + +Captures request/response metadata including: +- URL, HTTP method, status code +- Request duration and size +- Timing breakdown (DNS, TLS, first byte, etc.) + +Stores data as JSON for analysis and exports summary statistics. + +Usage: + python -m webwright.tools.network_timings start --workspace-dir + python -m webwright.tools.network_timings get-summary --session-file + python -m webwright.tools.network_timings export --session-file --output +""" + +from __future__ import annotations + +import argparse +import json +import time +from datetime import datetime +from pathlib import Path +from typing import Any + + +class NetworkTimingsCapture: + """Captures and analyzes network timings during page interactions.""" + + def __init__(self, workspace_dir: str = ""): + self.workspace_dir = Path(workspace_dir) if workspace_dir else Path.cwd() + self.timings: list[dict[str, Any]] = [] + self.is_capturing = False + self.start_time: float | None = None + self.session_file: Path | None = None + + def start_capture(self) -> None: + """Start capturing network timings.""" + self.is_capturing = True + self.start_time = time.time() + self.timings = [] + + def stop_capture(self) -> None: + """Stop capturing network timings.""" + self.is_capturing = False + + def log_request( + self, + url: str, + method: str = "GET", + start_time: float | None = None, + request_size: int = 0, + ) -> None: + """Log the start of a network request.""" + if not self.is_capturing: + return + + if start_time is None: + start_time = time.time() + + self.timings.append({ + "type": "request", + "url": url, + "method": method, + "start_time": start_time, + "request_size": request_size, + "timestamp": datetime.fromtimestamp(start_time).isoformat(), + }) + + def log_response( + self, + url: str, + status: int = 200, + end_time: float | None = None, + response_size: int = 0, + ) -> None: + """Log the completion of a network request.""" + if not self.is_capturing: + return + + if end_time is None: + end_time = time.time() + + for timing in reversed(self.timings): + if timing.get("type") == "request" and timing.get("url") == url: + timing["status"] = status + timing["end_time"] = end_time + timing["response_size"] = response_size + timing["duration_ms"] = (end_time - timing["start_time"]) * 1000 + timing["total_size"] = timing.get("request_size", 0) + response_size + break + + def get_timings(self) -> list[dict[str, Any]]: + """Get all captured timings.""" + return self.timings + + def get_summary(self) -> dict[str, Any]: + """Get summary statistics of captured network timings.""" + if not self.timings: + return { + "total_requests": 0, + "total_time_ms": 0, + "average_request_time_ms": 0, + "min_request_time_ms": 0, + "max_request_time_ms": 0, + "total_size_bytes": 0, + "captured_at": datetime.now().isoformat(), + } + + completed_timings = [ + t for t in self.timings + if "duration_ms" in t and t.get("end_time") is not None + ] + + if not completed_timings: + return { + "total_requests": len(self.timings), + "total_time_ms": 0, + "average_request_time_ms": 0, + "min_request_time_ms": 0, + "max_request_time_ms": 0, + "total_size_bytes": 0, + "captured_at": datetime.now().isoformat(), + } + + durations = [t["duration_ms"] for t in completed_timings] + sizes = [t.get("total_size", 0) for t in completed_timings] + + return { + "total_requests": len(completed_timings), + "total_time_ms": sum(durations), + "average_request_time_ms": sum(durations) / len(durations) if durations else 0, + "min_request_time_ms": min(durations) if durations else 0, + "max_request_time_ms": max(durations) if durations else 0, + "total_size_bytes": sum(sizes), + "captured_at": datetime.now().isoformat(), + } + + def export_to_file(self, filename: str = "network_timings.json") -> str: + """Export captured timings to a JSON file.""" + output_path = self.workspace_dir / filename + output_path.parent.mkdir(parents=True, exist_ok=True) + + export_data = { + "metadata": { + "captured_at": datetime.now().isoformat(), + "total_entries": len(self.timings), + }, + "summary": self.get_summary(), + "timings": self.timings, + } + + with open(output_path, "w", encoding="utf-8") as f: + json.dump(export_data, f, indent=2) + + return str(output_path) + + +def _cmd_start(args: argparse.Namespace) -> int: + """Create and initialize network timings capture session.""" + workspace_dir = args.workspace_dir or str(Path.cwd()) + session_file = Path(workspace_dir) / args.out + session_file.parent.mkdir(parents=True, exist_ok=True) + + capture = NetworkTimingsCapture(workspace_dir) + capture.start_capture() + + session_data = { + "id": args.session_id or "default", + "workspace_dir": str(Path(workspace_dir).resolve()), + "started_at": datetime.now().isoformat(), + "is_capturing": True, + } + + with open(session_file, "w", encoding="utf-8") as f: + json.dump(session_data, f, indent=2) + + print(f"Network timings capture started") + print(f"Session file: {session_file}") + return 0 + + +def _cmd_get_summary(args: argparse.Namespace) -> int: + """Get summary of network timings from session.""" + session_file = Path(args.session_file) + + if not session_file.exists(): + print(f"error: session file not found: {session_file}") + return 1 + + with open(session_file, encoding="utf-8") as f: + session_data = json.load(f) + + workspace_dir = session_data.get("workspace_dir", "") + timings_file = Path(workspace_dir) / "network_timings.json" + + if not timings_file.exists(): + print("No timings data available yet") + return 0 + + with open(timings_file, encoding="utf-8") as f: + timings_data = json.load(f) + + summary = timings_data.get("summary", {}) + print(json.dumps(summary, indent=2)) + return 0 + + +def _cmd_export(args: argparse.Namespace) -> int: + """Export network timings from session.""" + session_file = Path(args.session_file) + + if not session_file.exists(): + print(f"error: session file not found: {session_file}") + return 1 + + with open(session_file, encoding="utf-8") as f: + session_data = json.load(f) + + workspace_dir = session_data.get("workspace_dir", "") + timings_file = Path(workspace_dir) / "network_timings.json" + + if not timings_file.exists(): + print("No timings data available to export") + return 1 + + output_path = Path(args.output) if args.output else timings_file + + with open(timings_file, encoding="utf-8") as f: + timings_data = json.load(f) + + with open(output_path, "w", encoding="utf-8") as f: + json.dump(timings_data, f, indent=2) + + print(f"Timings exported to: {output_path}") + return 0 + + +def main() -> int: + """CLI entry point for network timings capture.""" + parser = argparse.ArgumentParser( + description="Capture and analyze network timings during page interactions" + ) + subparsers = parser.add_subparsers(dest="command", help="Subcommand to execute") + + start_parser = subparsers.add_parser("start", help="Start network timings capture") + start_parser.add_argument( + "--workspace-dir", + default="", + help="Workspace directory for storing timings data", + ) + start_parser.add_argument( + "--out", + default=".network_timings_session.json", + help="Session file name", + ) + start_parser.add_argument( + "--session-id", + default="", + help="Unique session identifier", + ) + start_parser.set_defaults(func=_cmd_start) + + summary_parser = subparsers.add_parser("get-summary", help="Get timings summary") + summary_parser.add_argument( + "--session-file", + required=True, + help="Path to session file", + ) + summary_parser.set_defaults(func=_cmd_get_summary) + + export_parser = subparsers.add_parser("export", help="Export timings data") + export_parser.add_argument( + "--session-file", + required=True, + help="Path to session file", + ) + export_parser.add_argument( + "--output", + default="", + help="Output file path", + ) + export_parser.set_defaults(func=_cmd_export) + + args = parser.parse_args() + + if not hasattr(args, "func"): + parser.print_help() + return 1 + + return args.func(args) + + +if __name__ == "__main__": + import sys + sys.exit(main()) diff --git a/tests/unit/test_network_timings.py b/tests/unit/test_network_timings.py new file mode 100644 index 0000000..f0548ed --- /dev/null +++ b/tests/unit/test_network_timings.py @@ -0,0 +1,196 @@ +"""Unit tests for network_timings.py tool.""" + +import json +import tempfile +from pathlib import Path + +import pytest + +from webwright.tools.network_timings import NetworkTimingsCapture + + +def test_network_timings_initialization(): + """Test NetworkTimingsCapture initialization.""" + with tempfile.TemporaryDirectory() as tmpdir: + capture = NetworkTimingsCapture(tmpdir) + assert capture.workspace_dir == Path(tmpdir) + assert capture.timings == [] + assert not capture.is_capturing + + +def test_start_and_stop_capture(): + """Test starting and stopping capture.""" + with tempfile.TemporaryDirectory() as tmpdir: + capture = NetworkTimingsCapture(tmpdir) + assert not capture.is_capturing + + capture.start_capture() + assert capture.is_capturing + assert capture.start_time is not None + + capture.stop_capture() + assert not capture.is_capturing + + +def test_log_request(): + """Test logging requests.""" + with tempfile.TemporaryDirectory() as tmpdir: + capture = NetworkTimingsCapture(tmpdir) + capture.start_capture() + + capture.log_request("https://example.com", method="GET", request_size=100) + + assert len(capture.timings) == 1 + timing = capture.timings[0] + assert timing["type"] == "request" + assert timing["url"] == "https://example.com" + assert timing["method"] == "GET" + assert timing["request_size"] == 100 + + +def test_log_response(): + """Test logging responses.""" + import time + + with tempfile.TemporaryDirectory() as tmpdir: + capture = NetworkTimingsCapture(tmpdir) + capture.start_capture() + + start_time = time.time() + capture.log_request("https://example.com", method="GET") + time.sleep(0.01) + end_time = time.time() + + capture.log_response("https://example.com", status=200, end_time=end_time, response_size=1024) + + assert len(capture.timings) == 1 + timing = capture.timings[0] + assert timing["status"] == 200 + assert timing["response_size"] == 1024 + assert "duration_ms" in timing + assert timing["duration_ms"] >= 10 # At least 10ms from sleep + + +def test_get_summary_empty(): + """Test summary with no timings.""" + with tempfile.TemporaryDirectory() as tmpdir: + capture = NetworkTimingsCapture(tmpdir) + summary = capture.get_summary() + + assert summary["total_requests"] == 0 + assert summary["total_time_ms"] == 0 + assert summary["average_request_time_ms"] == 0 + + +def test_get_summary_with_timings(): + """Test summary calculation with captured timings.""" + import time + + with tempfile.TemporaryDirectory() as tmpdir: + capture = NetworkTimingsCapture(tmpdir) + capture.start_capture() + + # Log 3 requests + for i in range(3): + start = time.time() + capture.log_request(f"https://example.com/api/{i}", method="GET") + time.sleep(0.01) + end = time.time() + capture.log_response(f"https://example.com/api/{i}", status=200, end_time=end, response_size=500) + + summary = capture.get_summary() + assert summary["total_requests"] == 3 + assert summary["total_time_ms"] >= 30 # At least 30ms total + assert summary["average_request_time_ms"] > 0 + assert summary["min_request_time_ms"] > 0 + assert summary["max_request_time_ms"] > 0 + assert summary["total_size_bytes"] == 1500 # 3 * 500 + + +def test_export_to_file(): + """Test exporting timings to JSON file.""" + import time + + with tempfile.TemporaryDirectory() as tmpdir: + capture = NetworkTimingsCapture(tmpdir) + capture.start_capture() + + capture.log_request("https://example.com", method="GET") + time.sleep(0.01) + capture.log_response("https://example.com", status=200, response_size=1024) + + output_file = capture.export_to_file("test_timings.json") + assert Path(output_file).exists() + + with open(output_file) as f: + data = json.load(f) + + assert "metadata" in data + assert "summary" in data + assert "timings" in data + assert len(data["timings"]) == 1 + assert data["summary"]["total_requests"] == 1 + + +def test_get_timings(): + """Test retrieving captured timings.""" + with tempfile.TemporaryDirectory() as tmpdir: + capture = NetworkTimingsCapture(tmpdir) + capture.start_capture() + + capture.log_request("https://example.com/1", method="GET") + capture.log_request("https://example.com/2", method="POST") + + timings = capture.get_timings() + assert len(timings) == 2 + assert timings[0]["url"] == "https://example.com/1" + assert timings[1]["url"] == "https://example.com/2" + + +def test_multiple_requests_same_url(): + """Test logging multiple requests to the same URL.""" + import time + + with tempfile.TemporaryDirectory() as tmpdir: + capture = NetworkTimingsCapture(tmpdir) + capture.start_capture() + + url = "https://example.com/api" + for i in range(3): + capture.log_request(url, method="GET") + time.sleep(0.01) + + assert len(capture.timings) == 3 + for timing in capture.timings: + assert timing["url"] == url + + +def test_capture_not_logging_when_disabled(): + """Test that requests are not logged when capture is stopped.""" + with tempfile.TemporaryDirectory() as tmpdir: + capture = NetworkTimingsCapture(tmpdir) + + capture.log_request("https://example.com", method="GET") + assert len(capture.timings) == 0 + + capture.start_capture() + capture.log_request("https://example.com", method="GET") + assert len(capture.timings) == 1 + + capture.stop_capture() + capture.log_request("https://example.com", method="GET") + assert len(capture.timings) == 1 # Still 1, not logged + + +def test_export_creates_parent_directories(): + """Test that export creates parent directories if needed.""" + with tempfile.TemporaryDirectory() as tmpdir: + subdir = Path(tmpdir) / "subdir" / "nested" + capture = NetworkTimingsCapture(str(subdir)) + capture.start_capture() + + capture.log_request("https://example.com", method="GET") + + output_file = capture.export_to_file("timings.json") + assert Path(output_file).exists() + assert Path(output_file).parent == subdir