diff --git a/sentry_sdk/_types.py b/sentry_sdk/_types.py index baf5f6a2fd..3a139f6c91 100644 --- a/sentry_sdk/_types.py +++ b/sentry_sdk/_types.py @@ -13,6 +13,9 @@ SENSITIVE_DATA_SUBSTITUTE = "[Filtered]" BLOB_DATA_SUBSTITUTE = "[Blob substitute]" +OVER_SIZE_LIMIT_SUBSTITUTE = ( + "[Value removed due to size of field exceeding configured maximum size.]" +) class AnnotatedValue: @@ -77,6 +80,26 @@ def removed_because_over_size_limit(cls, value: "Any" = "") -> "AnnotatedValue": }, ) + @classmethod + def substituted_because_over_size_limit( + cls, value: "Any" = OVER_SIZE_LIMIT_SUBSTITUTE + ) -> "AnnotatedValue": + """ + The actual value was replaced because the size of the field exceeded the configured maximum size, + for example specified with the max_request_body_size sdk option. + """ + return AnnotatedValue( + value=value, + metadata={ + "rem": [ # Remark + [ + "!config", # Because of configured maximum size + "s", # The fields original value was substituted + ] + ] + }, + ) + @classmethod def substituted_because_contains_sensitive_data(cls) -> "AnnotatedValue": """The actual value was removed because it contained sensitive information.""" diff --git a/sentry_sdk/integrations/aiohttp.py b/sentry_sdk/integrations/aiohttp.py index 46ee5f67b6..95ced490bd 100644 --- a/sentry_sdk/integrations/aiohttp.py +++ b/sentry_sdk/integrations/aiohttp.py @@ -4,26 +4,44 @@ import sentry_sdk from sentry_sdk.api import continue_trace -from sentry_sdk.consts import OP, SPANSTATUS, SPANDATA +from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS from sentry_sdk.integrations import ( _DEFAULT_FAILED_REQUEST_STATUS_CODES, - _check_minimum_version, - Integration, DidNotEnable, + Integration, + _check_minimum_version, ) -from sentry_sdk.integrations.logging import ignore_logger -from sentry_sdk.sessions import track_session from sentry_sdk.integrations._wsgi_common import ( _filter_headers, request_body_within_bounds, ) +from sentry_sdk.integrations.logging import ignore_logger +from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.sessions import track_session +from sentry_sdk.traces import ( + SOURCE_FOR_STYLE as SEGMENT_SOURCE_FOR_STYLE, +) +from sentry_sdk.traces import ( + NoOpStreamedSpan, + SegmentSource, + SpanStatus, + StreamedSpan, +) from sentry_sdk.tracing import ( BAGGAGE_HEADER_NAME, SOURCE_FOR_STYLE, TransactionSource, ) -from sentry_sdk.tracing_utils import should_propagate_trace, add_http_request_source +from sentry_sdk.tracing_utils import ( + add_http_request_source, + has_span_streaming_enabled, + should_propagate_trace, +) from sentry_sdk.utils import ( + CONTEXTVARS_ERROR_MESSAGE, + HAS_REAL_CONTEXTVARS, + SENSITIVE_DATA_SUBSTITUTE, + AnnotatedValue, capture_internal_exceptions, ensure_integration_enabled, event_from_exception, @@ -32,17 +50,13 @@ parse_version, reraise, transaction_from_function, - HAS_REAL_CONTEXTVARS, - CONTEXTVARS_ERROR_MESSAGE, - SENSITIVE_DATA_SUBSTITUTE, - AnnotatedValue, ) try: import asyncio - from aiohttp import __version__ as AIOHTTP_VERSION from aiohttp import ClientSession, TraceConfig + from aiohttp import __version__ as AIOHTTP_VERSION from aiohttp.web import Application, HTTPException, UrlDispatcher except ImportError: raise DidNotEnable("AIOHTTP not installed") @@ -50,19 +64,17 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from aiohttp.web_request import Request - from aiohttp.web_urldispatcher import UrlMappingMatchInfo - from aiohttp import TraceRequestStartParams, TraceRequestEndParams - from collections.abc import Set from types import SimpleNamespace - from typing import Any - from typing import Optional - from typing import Tuple - from typing import Union + from typing import Any, ContextManager, Optional, Tuple, Union + + from aiohttp import TraceRequestEndParams, TraceRequestStartParams + from aiohttp.web_request import Request + from aiohttp.web_urldispatcher import UrlMappingMatchInfo + from sentry_sdk._types import Attributes, Event, EventProcessor + from sentry_sdk.tracing import Span from sentry_sdk.utils import ExcInfo - from sentry_sdk._types import Event, EventProcessor TRANSACTION_STYLE_VALUES = ("handler_name", "method_and_path_pattern") @@ -106,11 +118,13 @@ def setup_once() -> None: async def sentry_app_handle( self: "Any", request: "Request", *args: "Any", **kwargs: "Any" ) -> "Any": - integration = sentry_sdk.get_client().get_integration(AioHttpIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(AioHttpIntegration) if integration is None: return await old_handle(self, request, *args, **kwargs) weak_request = weakref.ref(request) + is_span_streaming_enabled = has_span_streaming_enabled(client.options) with sentry_sdk.isolation_scope() as scope: with track_session(scope, session_mode="request"): @@ -121,38 +135,134 @@ async def sentry_app_handle( scope.add_event_processor(_make_request_processor(weak_request)) headers = dict(request.headers) - transaction = continue_trace( - headers, - op=OP.HTTP_SERVER, - # If this transaction name makes it to the UI, AIOHTTP's - # URL resolver did not find a route or died trying. - name="generic AIOHTTP request", - source=TransactionSource.ROUTE, - origin=AioHttpIntegration.origin, - ) - with sentry_sdk.start_transaction( - transaction, - custom_sampling_context={"aiohttp_request": request}, - ): - try: - response = await old_handle(self, request) - except HTTPException as e: - transaction.set_http_status(e.status_code) - if ( - e.status_code - in integration._failed_request_status_codes + span_ctx: "ContextManager[Union[Span, StreamedSpan]]" + if is_span_streaming_enabled: + sentry_sdk.traces.continue_trace(headers) + sentry_sdk.get_current_scope().set_custom_sampling_context( + {"aiohttp_request": request} + ) + + header_attributes: "dict[str, Any]" = {} + for header, header_value in _filter_headers( + headers, use_annotated_value=False + ).items(): + header_attributes[ + f"http.request.header.{header.lower()}" + ] = ( + # header_value will always be a string because we set `use_annotated_value` to false above + header_value + ) + + url_query_attribute = ( + {"url.query": request.query_string} + if request.query_string + else {} + ) + + client_address_attributes = ( + { + "client.address": request.remote, + "user.ip_address": request.remote, + } + if should_send_default_pii() and request.remote + else {} + ) + + span_ctx = sentry_sdk.traces.start_span( + # If this name makes it to the UI, AIOHTTP's URL + # resolver did not find a route or died trying. + name="generic AIOHTTP request", + attributes={ + "sentry.op": OP.HTTP_SERVER, + "sentry.origin": AioHttpIntegration.origin, + "sentry.span.source": SegmentSource.ROUTE.value, + "url.full": "%s://%s%s" + % (request.scheme, request.host, request.path), + "http.request.method": request.method, + **url_query_attribute, + **client_address_attributes, + **header_attributes, + }, + ) + else: + transaction = continue_trace( + headers, + op=OP.HTTP_SERVER, + # If this transaction name makes it to the UI, AIOHTTP's + # URL resolver did not find a route or died trying. + name="generic AIOHTTP request", + source=TransactionSource.ROUTE, + origin=AioHttpIntegration.origin, + ) + span_ctx = sentry_sdk.start_transaction( + transaction, + custom_sampling_context={"aiohttp_request": request}, + ) + + with span_ctx as span: + try: + try: + response = await old_handle(self, request) + except HTTPException as e: + if isinstance(span, StreamedSpan) and not isinstance( + span, NoOpStreamedSpan + ): + span.set_attribute( + "http.response.status_code", e.status_code + ) + + # There are a number of sub-exceptions that should have an "ok" status, + # but will actually have a status of error once the `raise` happens below + # and we pass through the `should_be_treated_as_error` method invoked + # within a span's `__exit__` method. + # + # As a result, made this behaviour explicit + # so people don't think that the "ok" status persists in that scenario. + # + # Although not obvious, this overwriting behaviour occurs in the legacy + # approach as well, so this matches what we do today. + span.status = SpanStatus.ERROR.value + else: + # Since a NoOpStreamedSpan can end up here, we have to guard against it + # so this only gets set in the legacy transaction approach. + if not isinstance(span, NoOpStreamedSpan): + span.set_http_status(e.status_code) + + if ( + e.status_code + in integration._failed_request_status_codes + ): + _capture_exception() + raise + except (asyncio.CancelledError, ConnectionResetError): + if isinstance(span, StreamedSpan): + span.status = SpanStatus.ERROR.value + else: + span.set_status(SPANSTATUS.CANCELLED) + raise + except Exception: + # This will probably map to a 500 but seems like we + # have no way to tell. Do not set span status. + reraise(*_capture_exception()) + finally: + # The handler has had a chance to read the body, so + # request._read_bytes may now be populated. Capture + # body data on the segment regardless of outcome. + if isinstance(span, StreamedSpan) and not isinstance( + span, NoOpStreamedSpan ): - _capture_exception() - - raise - except (asyncio.CancelledError, ConnectionResetError): - transaction.set_status(SPANSTATUS.CANCELLED) - raise - except Exception: - # This will probably map to a 500 but seems like we - # have no way to tell. Do not set span status. - reraise(*_capture_exception()) + with capture_internal_exceptions(): + raw_data = get_aiohttp_request_data(request) + body_data = ( + raw_data.value + if isinstance(raw_data, AnnotatedValue) + else raw_data + ) + if body_data is not None: + span._segment.set_attribute( + "http.request.body.data", body_data + ) try: # A valid response handler will return a valid response with a status. But, if the handler @@ -163,7 +273,17 @@ async def sentry_app_handle( except AttributeError: pass else: - transaction.set_http_status(response_status) + if isinstance(span, StreamedSpan): + span.set_attribute( + "http.response.status_code", response_status + ) + span.status = ( + SpanStatus.ERROR.value + if response_status >= 400 + else SpanStatus.OK.value + ) + else: + span.set_http_status(response_status) return response @@ -194,10 +314,21 @@ async def sentry_urldispatcher_resolve( pass if name is not None: - sentry_sdk.get_current_scope().set_transaction_name( - name, - source=SOURCE_FOR_STYLE[integration.transaction_style], - ) + current_span = sentry_sdk.get_current_span() + if isinstance(current_span, StreamedSpan) and not isinstance( + current_span, NoOpStreamedSpan + ): + current_span._segment.name = name + current_span._segment.set_attribute( + "sentry.span.source", + SEGMENT_SOURCE_FOR_STYLE[integration.transaction_style].value, + ) + else: + current_scope = sentry_sdk.get_current_scope() + current_scope.set_transaction_name( + name, + source=SOURCE_FOR_STYLE[integration.transaction_style], + ) return rv @@ -223,7 +354,8 @@ async def on_request_start( trace_config_ctx: "SimpleNamespace", params: "TraceRequestStartParams", ) -> None: - if sentry_sdk.get_client().get_integration(AioHttpIntegration) is None: + client = sentry_sdk.get_client() + if client.get_integration(AioHttpIntegration) is None: return method = params.method.upper() @@ -232,19 +364,38 @@ async def on_request_start( with capture_internal_exceptions(): parsed_url = parse_url(str(params.url), sanitize=False) - span = sentry_sdk.start_span( - op=OP.HTTP_CLIENT, - name="%s %s" - % (method, parsed_url.url if parsed_url else SENSITIVE_DATA_SUBSTITUTE), - origin=AioHttpIntegration.origin, + span_name = "%s %s" % ( + method, + parsed_url.url if parsed_url else SENSITIVE_DATA_SUBSTITUTE, ) - span.set_data(SPANDATA.HTTP_METHOD, method) - if parsed_url is not None: - span.set_data("url", parsed_url.url) - span.set_data(SPANDATA.HTTP_QUERY, parsed_url.query) - span.set_data(SPANDATA.HTTP_FRAGMENT, parsed_url.fragment) - client = sentry_sdk.get_client() + span: "Union[Span, StreamedSpan]" + if has_span_streaming_enabled(client.options): + attributes: "Attributes" = { + "sentry.op": OP.HTTP_CLIENT, + "sentry.origin": AioHttpIntegration.origin, + "http.request.method": method, + } + if parsed_url is not None: + attributes["url.full"] = parsed_url.url + if parsed_url.query: + attributes["url.query"] = parsed_url.query + if parsed_url.fragment: + attributes["url.fragment"] = parsed_url.fragment + + span = sentry_sdk.traces.start_span(name=span_name, attributes=attributes) + else: + legacy_span = sentry_sdk.start_span( + op=OP.HTTP_CLIENT, + name=span_name, + origin=AioHttpIntegration.origin, + ) + legacy_span.set_data(SPANDATA.HTTP_METHOD, method) + if parsed_url is not None: + legacy_span.set_data("url", parsed_url.url) + legacy_span.set_data(SPANDATA.HTTP_QUERY, parsed_url.query) + legacy_span.set_data(SPANDATA.HTTP_FRAGMENT, parsed_url.fragment) + span = legacy_span if should_propagate_trace(client, str(params.url)): for ( @@ -277,12 +428,23 @@ async def on_request_end( return span = trace_config_ctx.span - span.set_http_status(int(params.response.status)) - span.set_data("reason", params.response.reason) - span.finish() + status = int(params.response.status) - with capture_internal_exceptions(): - add_http_request_source(span) + if isinstance(span, StreamedSpan): + span.set_attribute("http.response.status_code", status) + span.status = ( + SpanStatus.ERROR.value if status >= 400 else SpanStatus.OK.value + ) + + with capture_internal_exceptions(): + add_http_request_source(span) + span.end() + else: + span.set_http_status(status) + span.set_data("reason", params.response.reason) + span.finish() + with capture_internal_exceptions(): + add_http_request_source(span) trace_config = TraceConfig() @@ -349,7 +511,7 @@ def get_aiohttp_request_data( if bytes_body is not None: # we have body to show if not request_body_within_bounds(sentry_sdk.get_client(), len(bytes_body)): - return AnnotatedValue.removed_because_over_size_limit() + return AnnotatedValue.substituted_because_over_size_limit() encoding = request.charset or "utf-8" return bytes_body.decode(encoding, "replace") diff --git a/tests/integrations/aiohttp/test_aiohttp.py b/tests/integrations/aiohttp/test_aiohttp.py index 849f9d017b..57dddcb964 100644 --- a/tests/integrations/aiohttp/test_aiohttp.py +++ b/tests/integrations/aiohttp/test_aiohttp.py @@ -1,27 +1,32 @@ -import os -import datetime import asyncio +import datetime import json - +import os from contextlib import suppress from unittest import mock import pytest - from aiohttp import web from aiohttp.client import ServerDisconnectedError -from aiohttp.web_request import Request from aiohttp.web_exceptions import ( + HTTPBadRequest, HTTPInternalServerError, HTTPNetworkAuthenticationRequired, - HTTPBadRequest, HTTPNotFound, HTTPUnavailableForLegalReasons, ) +from aiohttp.web_request import Request +import sentry_sdk from sentry_sdk import capture_message, start_transaction -from sentry_sdk.integrations.aiohttp import AioHttpIntegration, create_trace_config +from sentry_sdk._types import OVER_SIZE_LIMIT_SUBSTITUTE from sentry_sdk.consts import SPANDATA +from sentry_sdk.integrations.aiohttp import ( + BODY_NOT_READ_MESSAGE, + AioHttpIntegration, + create_trace_config, +) +from sentry_sdk.utils import SENSITIVE_DATA_SUBSTITUTE from tests.conftest import ApproxDict @@ -1131,3 +1136,529 @@ async def handle(_): (event,) = events assert event["exception"]["values"][0]["type"] == "ZeroDivisionError" + + +@pytest.mark.asyncio +async def test_tracing_span_streaming(sentry_init, aiohttp_client, capture_items): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + return web.Response(text="hello") + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.get("/") + assert resp.status == 200 + + sentry_sdk.flush() + + # Spans finish inner-first, so the segment is the last item. + # The aiohttp_client fixture is itself sentry-instrumented and emits the + # outer http.client segment; the server-side http.server span is its only + # child. Asserting the exact length confirms no other spans leak in. + assert len(items) == 2 + + segment = items.pop().payload + (server_span,) = [item.payload for item in items] + + assert segment["is_segment"] is True + assert segment["attributes"]["sentry.op"] == "http.client" + assert segment["name"] == ( + "tests.integrations.aiohttp.test_aiohttp." + "test_tracing_span_streaming..hello" + ) + assert segment["attributes"]["sentry.span.source"] == "component" + + assert server_span["is_segment"] is False + assert server_span["name"] == "generic AIOHTTP request" + assert server_span["attributes"]["sentry.op"] == "http.server" + assert server_span["attributes"]["sentry.origin"] == "auto.http.aiohttp" + assert server_span["attributes"]["http.response.status_code"] == 200 + assert server_span["attributes"]["sentry.span.source"] == "route" + assert server_span["status"] == "ok" + # No query string on the request, so the attribute should be omitted. + assert "url.query" not in server_span["attributes"] + + # Request attributes derived directly from the aiohttp request. + assert server_span["attributes"]["http.request.method"] == "GET" + # client.address and user.ip_address is gated on send_default_pii (default False), so it must + # not be captured here. + assert "client.address" not in server_span["attributes"] + assert "user.ip_address" not in server_span["attributes"] + url_full = server_span["attributes"]["url.full"] + assert url_full.startswith("http://127.0.0.1:") + assert url_full.endswith("/") + # aiohttp's test client always sends a Host header; we assert it propagates + # into the span attributes via _filter_headers. + assert "http.request.header.host" in server_span["attributes"] + + +@pytest.mark.asyncio +async def test_sensitive_header_scrubbing_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + return web.Response(text="hello") + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.get( + "/", + headers={ + "Authorization": "Bearer secret-token", + "X-Custom-Header": "passthrough", + }, + ) + assert resp.status == 200 + + sentry_sdk.flush() + + items.pop() # drop the test client's outer segment + (server_span,) = [item.payload for item in items] + + # send_default_pii defaults to False, so _filter_headers substitutes + # sensitive headers with SENSITIVE_DATA_SUBSTITUTE ("[Filtered]"). The + # original token must not leak. + assert ( + server_span["attributes"]["http.request.header.authorization"] + == SENSITIVE_DATA_SUBSTITUTE + ) + # Non-sensitive headers pass through untouched. + assert ( + server_span["attributes"]["http.request.header.x-custom-header"] + == "passthrough" + ) + + +@pytest.mark.asyncio +async def test_sensitive_header_passthrough_with_pii_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + send_default_pii=True, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + return web.Response(text="hello") + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + await client.get("/", headers={"Authorization": "Bearer secret-token"}) + + sentry_sdk.flush() + + items.pop() # drop the test client's outer segment + (server_span,) = [item.payload for item in items] + + # With send_default_pii=True, _filter_headers is a no-op and the original + # value reaches the span attribute. + assert ( + server_span["attributes"]["http.request.header.authorization"] + == "Bearer secret-token" + ) + # client.address and user.ip_address is captured under send_default_pii=True. + assert server_span["attributes"]["client.address"] == "127.0.0.1" + assert server_span["attributes"]["user.ip_address"] == "127.0.0.1" + + +@pytest.mark.asyncio +async def test_request_body_captured_on_segment_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + body = {"some": "value"} + + async def hello(request): + # Reading the body populates request._read_bytes; the integration + # captures body data in a finally after the handler returns. + await request.json() + return web.Response(text="hello") + + app = web.Application() + app.router.add_post("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.post("/", json=body) + assert resp.status == 200 + + sentry_sdk.flush() + + # The integration sets http.request.body.data on the segment span. In this + # test the segment is the aiohttp_client's outgoing http.client span (the + # test client is itself sentry-instrumented). In production with separate + # processes, the server span is the segment; the assertion is the same. + segment = items.pop().payload + assert segment["is_segment"] is True + assert segment["attributes"]["http.request.body.data"] == json.dumps(body) + + +@pytest.mark.asyncio +async def test_request_body_not_read_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + # Handler does not read the body; request._read_bytes stays None. + return web.Response(text="hello") + + app = web.Application() + app.router.add_post("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.post("/", json={"some": "value"}) + assert resp.status == 200 + + sentry_sdk.flush() + + segment = items.pop().payload + assert segment["is_segment"] is True + assert segment["attributes"]["http.request.body.data"] == BODY_NOT_READ_MESSAGE + + +@pytest.mark.asyncio +async def test_request_body_over_size_limit_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + max_request_body_size="small", + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + await request.read() + return web.Response(text="hello") + + app = web.Application() + app.router.add_post("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + # "small" caps at 1 KB; send a body larger than that. + resp = await client.post("/", data=b"x" * 2000) + assert resp.status == 200 + + sentry_sdk.flush() + + segment = items.pop().payload + assert segment["is_segment"] is True + assert segment["attributes"]["http.request.body.data"] == OVER_SIZE_LIMIT_SUBSTITUTE + + +@pytest.mark.asyncio +async def test_url_query_attribute_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + return web.Response(text="hello") + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.get("/?foo=bar&baz=qux") + assert resp.status == 200 + + sentry_sdk.flush() + + assert len(items) == 2 + items.pop() # drop the test client's outer segment + (server_span,) = [item.payload for item in items] + + assert server_span["attributes"]["url.query"] == "foo=bar&baz=qux" + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "url,transaction_style,expected_name,expected_source", + [ + ( + "/message", + "handler_name", + "tests.integrations.aiohttp.test_aiohttp." + "test_transaction_style_span_streaming..hello", + "component", + ), + ( + "/message", + "method_and_path_pattern", + "GET /{var}", + "route", + ), + ], +) +async def test_transaction_style_span_streaming( + sentry_init, + aiohttp_client, + capture_items, + url, + transaction_style, + expected_name, + expected_source, +): + sentry_init( + integrations=[AioHttpIntegration(transaction_style=transaction_style)], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + return web.Response(text="hello") + + app = web.Application() + app.router.add_get(r"/{var}", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.get(url) + assert resp.status == 200 + + sentry_sdk.flush() + + assert len(items) == 2 + segment = items.pop().payload + (server_span,) = [item.payload for item in items] + + assert segment["name"] == expected_name + assert segment["attributes"]["sentry.span.source"] == expected_source + + assert server_span["name"] == "generic AIOHTTP request" + assert not server_span["is_segment"] + assert server_span["attributes"]["sentry.span.source"] == "route" + + +@pytest.mark.asyncio +async def test_server_error_span_streaming(sentry_init, aiohttp_client, capture_items): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + 1 / 0 + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("event", "span") + + client = await aiohttp_client(app) + resp = await client.get("/") + assert resp.status == 500 + + sentry_sdk.flush() + + # 1 error event + 2 spans (server http.server, test client http.client segment) + assert len(items) == 3 + + error_event = items[0] + assert error_event.type == "event" + assert error_event.payload["exception"]["values"][0]["type"] == "ZeroDivisionError" + + spans = items[1:] + assert spans[-1].type == "span" + segment = spans.pop().payload + (server_span,) = [item.payload for item in spans] + + assert segment["is_segment"] is True + assert segment["attributes"]["sentry.op"] == "http.client" + # The test client receives the 500 response that aiohttp's outer error + # handler synthesizes from the unhandled exception. + assert segment["attributes"]["http.response.status_code"] == 500 + assert segment["status"] == "error" + + # The integration's generic Exception path reraises without recording + # http.response.status_code on the server span. StreamedSpan.__exit__ + # observes the propagating exception and sets status to "error". + assert server_span["attributes"]["sentry.op"] == "http.server" + assert "http.response.status_code" not in server_span["attributes"] + assert server_span["status"] == "error" + + +@pytest.mark.asyncio +async def test_http_exception_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + raise web.HTTPForbidden() + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.get("/") + assert resp.status == 403 + + sentry_sdk.flush() + + assert len(items) == 2 + segment = items.pop().payload + (server_span,) = [item.payload for item in items] + + assert segment["is_segment"] is True + assert segment["attributes"]["sentry.op"] == "http.client" + assert segment["attributes"]["http.response.status_code"] == 403 + assert segment["status"] == "error" + + assert server_span["attributes"]["sentry.op"] == "http.server" + assert server_span["attributes"]["http.response.status_code"] == 403 + assert server_span["status"] == "error" + + +@pytest.mark.asyncio +async def test_outgoing_client_span_span_streaming( + sentry_init, aiohttp_raw_server, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def handler(request): + return web.Response(text="OK") + + raw_server = await aiohttp_raw_server(handler) + + async def hello(request): + span_client = await aiohttp_client(raw_server) + await span_client.get("/?foo=bar") + return web.Response(text="hello") + + app = web.Application() + app.router.add_get(r"/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + await client.get("/") + + sentry_sdk.flush() + + # 3 spans, finished inner-first: + # #0 inner http.client (server -> raw_server) + # #1 server http.server + # #2 outer http.client segment (test client -> server) + assert len(items) == 3 + + inner_client_span = items[0].payload + server_span = items[1].payload + segment = items[2].payload + + assert segment["is_segment"] is True + assert segment["attributes"]["sentry.op"] == "http.client" + + assert server_span["attributes"]["sentry.op"] == "http.server" + + assert inner_client_span["is_segment"] is False + assert inner_client_span["name"].startswith("GET ") + assert inner_client_span["attributes"]["sentry.op"] == "http.client" + assert inner_client_span["attributes"]["sentry.origin"] == "auto.http.aiohttp" + assert inner_client_span["attributes"]["http.request.method"] == "GET" + assert inner_client_span["attributes"]["http.response.status_code"] == 200 + assert inner_client_span["attributes"]["url.query"] == "foo=bar" + assert inner_client_span["status"] == "ok" + + url_full = inner_client_span["attributes"]["url.full"] + # parse_url() splits the URL — url.full is the base URL only, with the + # query string captured separately on url.query. + assert url_full.startswith("http://127.0.0.1:") + assert url_full.endswith("/") + + +@pytest.mark.asyncio +async def test_outgoing_trace_headers_span_streaming( + sentry_init, aiohttp_raw_server, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def handler(request): + return web.Response(text="OK") + + raw_server = await aiohttp_raw_server(handler) + + items = capture_items("span") + + client = await aiohttp_client(raw_server) + resp = await client.get("/") + + sentry_sdk.flush() + + # raw_server bypasses Application._handle, so only the test client's + # outgoing http.client segment is emitted. + assert len(items) == 1 + client_span = items[0].payload + + assert client_span["is_segment"] is True + assert client_span["attributes"]["sentry.op"] == "http.client" + assert client_span["name"].startswith("GET http://127.0.0.1:") + assert resp.request_info.headers[ + "sentry-trace" + ] == "{trace_id}-{span_id}-{sampled}".format( + trace_id=client_span["trace_id"], + span_id=client_span["span_id"], + sampled=1, + )