From 2dc9255c6e848d5a320a83d23114b8af1cb01be0 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Thu, 30 Apr 2026 14:22:04 -0400 Subject: [PATCH 1/7] feat(aiohttp): Add span streaming support Migrate the aiohttp integration to support the new span-first streaming model alongside the existing transaction-based approach. When trace_lifecycle is set to "stream", server and client spans use the StreamedSpan API with OTel-style attributes. The legacy path remains unchanged. Also adds AnnotatedValue.substituted_because_over_size_limit() for body-size-limit annotations and comprehensive tests covering headers, request bodies, transaction styles, error paths, and trace propagation under span streaming. It's being introduced in #6178 but added here in order to be able to start leveraging it right away. --- sentry_sdk/_types.py | 23 + sentry_sdk/integrations/aiohttp.py | 269 ++++++++--- tests/integrations/aiohttp/test_aiohttp.py | 528 ++++++++++++++++++++- 3 files changed, 763 insertions(+), 57 deletions(-) diff --git a/sentry_sdk/_types.py b/sentry_sdk/_types.py index baf5f6a2fd..3a139f6c91 100644 --- a/sentry_sdk/_types.py +++ b/sentry_sdk/_types.py @@ -13,6 +13,9 @@ SENSITIVE_DATA_SUBSTITUTE = "[Filtered]" BLOB_DATA_SUBSTITUTE = "[Blob substitute]" +OVER_SIZE_LIMIT_SUBSTITUTE = ( + "[Value removed due to size of field exceeding configured maximum size.]" +) class AnnotatedValue: @@ -77,6 +80,26 @@ def removed_because_over_size_limit(cls, value: "Any" = "") -> "AnnotatedValue": }, ) + @classmethod + def substituted_because_over_size_limit( + cls, value: "Any" = OVER_SIZE_LIMIT_SUBSTITUTE + ) -> "AnnotatedValue": + """ + The actual value was replaced because the size of the field exceeded the configured maximum size, + for example specified with the max_request_body_size sdk option. + """ + return AnnotatedValue( + value=value, + metadata={ + "rem": [ # Remark + [ + "!config", # Because of configured maximum size + "s", # The fields original value was substituted + ] + ] + }, + ) + @classmethod def substituted_because_contains_sensitive_data(cls) -> "AnnotatedValue": """The actual value was removed because it contained sensitive information.""" diff --git a/sentry_sdk/integrations/aiohttp.py b/sentry_sdk/integrations/aiohttp.py index 46ee5f67b6..0371ff2af3 100644 --- a/sentry_sdk/integrations/aiohttp.py +++ b/sentry_sdk/integrations/aiohttp.py @@ -12,17 +12,29 @@ DidNotEnable, ) from sentry_sdk.integrations.logging import ignore_logger +from sentry_sdk.scope import should_send_default_pii from sentry_sdk.sessions import track_session from sentry_sdk.integrations._wsgi_common import ( _filter_headers, request_body_within_bounds, ) +from sentry_sdk.traces import ( + NoOpStreamedSpan, + SegmentSource, + SpanStatus, + StreamedSpan, + SOURCE_FOR_STYLE as SEGMENT_SOURCE_FOR_STYLE, +) from sentry_sdk.tracing import ( BAGGAGE_HEADER_NAME, SOURCE_FOR_STYLE, TransactionSource, ) -from sentry_sdk.tracing_utils import should_propagate_trace, add_http_request_source +from sentry_sdk.tracing_utils import ( + add_http_request_source, + has_span_streaming_enabled, + should_propagate_trace, +) from sentry_sdk.utils import ( capture_internal_exceptions, ensure_integration_enabled, @@ -57,12 +69,14 @@ from collections.abc import Set from types import SimpleNamespace from typing import Any + from typing import ContextManager from typing import Optional from typing import Tuple from typing import Union + from sentry_sdk.tracing import Span from sentry_sdk.utils import ExcInfo - from sentry_sdk._types import Event, EventProcessor + from sentry_sdk._types import Attributes, Event, EventProcessor TRANSACTION_STYLE_VALUES = ("handler_name", "method_and_path_pattern") @@ -106,11 +120,13 @@ def setup_once() -> None: async def sentry_app_handle( self: "Any", request: "Request", *args: "Any", **kwargs: "Any" ) -> "Any": - integration = sentry_sdk.get_client().get_integration(AioHttpIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(AioHttpIntegration) if integration is None: return await old_handle(self, request, *args, **kwargs) weak_request = weakref.ref(request) + is_span_streaming_enabled = has_span_streaming_enabled(client.options) with sentry_sdk.isolation_scope() as scope: with track_session(scope, session_mode="request"): @@ -121,38 +137,127 @@ async def sentry_app_handle( scope.add_event_processor(_make_request_processor(weak_request)) headers = dict(request.headers) - transaction = continue_trace( - headers, - op=OP.HTTP_SERVER, - # If this transaction name makes it to the UI, AIOHTTP's - # URL resolver did not find a route or died trying. - name="generic AIOHTTP request", - source=TransactionSource.ROUTE, - origin=AioHttpIntegration.origin, - ) - with sentry_sdk.start_transaction( - transaction, - custom_sampling_context={"aiohttp_request": request}, - ): - try: - response = await old_handle(self, request) - except HTTPException as e: - transaction.set_http_status(e.status_code) - if ( - e.status_code - in integration._failed_request_status_codes + span_ctx: "ContextManager[Union[Span, StreamedSpan]]" + if is_span_streaming_enabled: + sentry_sdk.traces.continue_trace(headers) + sentry_sdk.get_current_scope().set_custom_sampling_context( + {"aiohttp_request": request} + ) + + header_dict: "dict[str, Any]" = {} + for header, header_value in _filter_headers( + headers, use_annotated_value=False + ).items(): + header_dict[f"http.request.header.{header.lower()}"] = ( + # header_value will always be a string because we set `use_annotated_value` to false above + header_value + ) + + url_query = ( + {"url.query": request.query_string} + if request.query_string + else {} + ) + + client_address = ( + { + "client.address": request.remote, + "user.ip_address": request.remote, + } + if should_send_default_pii() and request.remote + else {} + ) + + span_ctx = sentry_sdk.traces.start_span( + # If this name makes it to the UI, AIOHTTP's URL + # resolver did not find a route or died trying. + name="generic AIOHTTP request", + attributes={ + "sentry.op": OP.HTTP_SERVER, + "sentry.origin": AioHttpIntegration.origin, + "sentry.span.source": SegmentSource.ROUTE.value, + "url.full": "%s://%s%s" + % (request.scheme, request.host, request.path), + "http.request.method": request.method, + **url_query, + **client_address, + **header_dict, + }, + ) + else: + transaction = continue_trace( + headers, + op=OP.HTTP_SERVER, + # If this transaction name makes it to the UI, AIOHTTP's + # URL resolver did not find a route or died trying. + name="generic AIOHTTP request", + source=TransactionSource.ROUTE, + origin=AioHttpIntegration.origin, + ) + span_ctx = sentry_sdk.start_transaction( + transaction, + custom_sampling_context={"aiohttp_request": request}, + ) + + with span_ctx as span: + try: + try: + response = await old_handle(self, request) + except HTTPException as e: + if isinstance(span, StreamedSpan): + span.set_attribute( + "http.response.status_code", e.status_code + ) + + # There are a number of sub-exceptions that should have an "ok" status, but will + # actually have a status of error once the `raise` happens below + # and we pass through the `should_be_treated_as_error` method invoked + # within a span's `__exit__` method. + # + # As a result, made this behaviour explicit + # so people don't think that the "ok" status persists in that scenario. + # + # Although not obvious, this overwriting behaviour occurs in the legacy + # approach as well, so this matches what we do today. + span.status = SpanStatus.ERROR.value + else: + span.set_http_status(e.status_code) + + if ( + e.status_code + in integration._failed_request_status_codes + ): + _capture_exception() + raise + except (asyncio.CancelledError, ConnectionResetError): + if isinstance(span, StreamedSpan): + span.status = SpanStatus.ERROR.value + else: + span.set_status(SPANSTATUS.CANCELLED) + raise + except Exception: + # This will probably map to a 500 but seems like we + # have no way to tell. Do not set span status. + reraise(*_capture_exception()) + finally: + # The handler has had a chance to read the body, so + # request._read_bytes may now be populated. Capture + # body data on the segment regardless of outcome. + if isinstance(span, StreamedSpan) and not isinstance( + span, NoOpStreamedSpan ): - _capture_exception() - - raise - except (asyncio.CancelledError, ConnectionResetError): - transaction.set_status(SPANSTATUS.CANCELLED) - raise - except Exception: - # This will probably map to a 500 but seems like we - # have no way to tell. Do not set span status. - reraise(*_capture_exception()) + with capture_internal_exceptions(): + raw_data = get_aiohttp_request_data(request) + body_data = ( + raw_data.value + if isinstance(raw_data, AnnotatedValue) + else raw_data + ) + if body_data is not None: + span._segment.set_attribute( + "http.request.body.data", body_data + ) try: # A valid response handler will return a valid response with a status. But, if the handler @@ -163,7 +268,17 @@ async def sentry_app_handle( except AttributeError: pass else: - transaction.set_http_status(response_status) + if isinstance(span, StreamedSpan): + span.set_attribute( + "http.response.status_code", response_status + ) + span.status = ( + SpanStatus.ERROR.value + if response_status >= 400 + else SpanStatus.OK.value + ) + else: + span.set_http_status(response_status) return response @@ -194,10 +309,21 @@ async def sentry_urldispatcher_resolve( pass if name is not None: - sentry_sdk.get_current_scope().set_transaction_name( - name, - source=SOURCE_FOR_STYLE[integration.transaction_style], - ) + current_scope = sentry_sdk.get_current_scope() + current_span = current_scope.span + if isinstance(current_span, StreamedSpan) and not isinstance( + current_span, NoOpStreamedSpan + ): + current_span.name = name + current_span.set_attribute( + "sentry.span.source", + SEGMENT_SOURCE_FOR_STYLE[integration.transaction_style].value, + ) + else: + current_scope.set_transaction_name( + name, + source=SOURCE_FOR_STYLE[integration.transaction_style], + ) return rv @@ -223,7 +349,8 @@ async def on_request_start( trace_config_ctx: "SimpleNamespace", params: "TraceRequestStartParams", ) -> None: - if sentry_sdk.get_client().get_integration(AioHttpIntegration) is None: + client = sentry_sdk.get_client() + if client.get_integration(AioHttpIntegration) is None: return method = params.method.upper() @@ -232,19 +359,38 @@ async def on_request_start( with capture_internal_exceptions(): parsed_url = parse_url(str(params.url), sanitize=False) - span = sentry_sdk.start_span( - op=OP.HTTP_CLIENT, - name="%s %s" - % (method, parsed_url.url if parsed_url else SENSITIVE_DATA_SUBSTITUTE), - origin=AioHttpIntegration.origin, + span_name = "%s %s" % ( + method, + parsed_url.url if parsed_url else SENSITIVE_DATA_SUBSTITUTE, ) - span.set_data(SPANDATA.HTTP_METHOD, method) - if parsed_url is not None: - span.set_data("url", parsed_url.url) - span.set_data(SPANDATA.HTTP_QUERY, parsed_url.query) - span.set_data(SPANDATA.HTTP_FRAGMENT, parsed_url.fragment) - client = sentry_sdk.get_client() + span: "Union[Span, StreamedSpan]" + if has_span_streaming_enabled(client.options): + attributes: "Attributes" = { + "sentry.op": OP.HTTP_CLIENT, + "sentry.origin": AioHttpIntegration.origin, + "http.request.method": method, + } + if parsed_url is not None: + attributes["url.full"] = parsed_url.url + if parsed_url.query: + attributes["url.query"] = parsed_url.query + if parsed_url.fragment: + attributes["url.fragment"] = parsed_url.fragment + + span = sentry_sdk.traces.start_span(name=span_name, attributes=attributes) + else: + legacy_span = sentry_sdk.start_span( + op=OP.HTTP_CLIENT, + name=span_name, + origin=AioHttpIntegration.origin, + ) + legacy_span.set_data(SPANDATA.HTTP_METHOD, method) + if parsed_url is not None: + legacy_span.set_data("url", parsed_url.url) + legacy_span.set_data(SPANDATA.HTTP_QUERY, parsed_url.query) + legacy_span.set_data(SPANDATA.HTTP_FRAGMENT, parsed_url.fragment) + span = legacy_span if should_propagate_trace(client, str(params.url)): for ( @@ -277,12 +423,23 @@ async def on_request_end( return span = trace_config_ctx.span - span.set_http_status(int(params.response.status)) - span.set_data("reason", params.response.reason) - span.finish() + status = int(params.response.status) - with capture_internal_exceptions(): - add_http_request_source(span) + if isinstance(span, StreamedSpan): + span.set_attribute("http.response.status_code", status) + span.status = ( + SpanStatus.ERROR.value if status >= 400 else SpanStatus.OK.value + ) + + with capture_internal_exceptions(): + add_http_request_source(span) + span.end() + else: + span.set_http_status(status) + span.set_data("reason", params.response.reason) + span.finish() + with capture_internal_exceptions(): + add_http_request_source(span) trace_config = TraceConfig() @@ -349,7 +506,7 @@ def get_aiohttp_request_data( if bytes_body is not None: # we have body to show if not request_body_within_bounds(sentry_sdk.get_client(), len(bytes_body)): - return AnnotatedValue.removed_because_over_size_limit() + return AnnotatedValue.substituted_because_over_size_limit() encoding = request.charset or "utf-8" return bytes_body.decode(encoding, "replace") diff --git a/tests/integrations/aiohttp/test_aiohttp.py b/tests/integrations/aiohttp/test_aiohttp.py index 849f9d017b..fe3f0ec582 100644 --- a/tests/integrations/aiohttp/test_aiohttp.py +++ b/tests/integrations/aiohttp/test_aiohttp.py @@ -19,10 +19,13 @@ HTTPUnavailableForLegalReasons, ) +import sentry_sdk from sentry_sdk import capture_message, start_transaction -from sentry_sdk.integrations.aiohttp import AioHttpIntegration, create_trace_config from sentry_sdk.consts import SPANDATA +from sentry_sdk.integrations.aiohttp import AioHttpIntegration, create_trace_config +from sentry_sdk.utils import SENSITIVE_DATA_SUBSTITUTE from tests.conftest import ApproxDict +from sentry_sdk._types import OVER_SIZE_LIMIT_SUBSTITUTE @pytest.mark.asyncio @@ -1131,3 +1134,526 @@ async def handle(_): (event,) = events assert event["exception"]["values"][0]["type"] == "ZeroDivisionError" + + +@pytest.mark.asyncio +async def test_tracing_span_streaming(sentry_init, aiohttp_client, capture_items): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + return web.Response(text="hello") + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.get("/") + assert resp.status == 200 + + sentry_sdk.flush() + + # Spans finish inner-first, so the segment is the last item. + # The aiohttp_client fixture is itself sentry-instrumented and emits the + # outer http.client segment; the server-side http.server span is its only + # child. Asserting the exact length confirms no other spans leak in. + assert len(items) == 2 + + segment = items.pop().payload + (server_span,) = [item.payload for item in items] + + assert segment["is_segment"] is True + assert segment["attributes"]["sentry.op"] == "http.client" + + assert server_span["is_segment"] is False + assert server_span["name"] == ( + "tests.integrations.aiohttp.test_aiohttp." + "test_tracing_span_streaming..hello" + ) + assert server_span["attributes"]["sentry.op"] == "http.server" + assert server_span["attributes"]["sentry.origin"] == "auto.http.aiohttp" + assert server_span["attributes"]["sentry.span.source"] == "component" + assert server_span["attributes"]["http.response.status_code"] == 200 + assert server_span["status"] == "ok" + # No query string on the request, so the attribute should be omitted. + assert "url.query" not in server_span["attributes"] + + # Request attributes derived directly from the aiohttp request. + assert server_span["attributes"]["http.request.method"] == "GET" + # client.address and user.ip_address is gated on send_default_pii (default False), so it must + # not be captured here. + assert "client.address" not in server_span["attributes"] + assert "user.ip_address" not in server_span["attributes"] + url_full = server_span["attributes"]["url.full"] + assert url_full.startswith("http://127.0.0.1:") + assert url_full.endswith("/") + # aiohttp's test client always sends a Host header; we assert it propagates + # into the span attributes via _filter_headers. + assert "http.request.header.host" in server_span["attributes"] + + +@pytest.mark.asyncio +async def test_sensitive_header_scrubbing_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + return web.Response(text="hello") + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.get( + "/", + headers={ + "Authorization": "Bearer secret-token", + "X-Custom-Header": "passthrough", + }, + ) + assert resp.status == 200 + + sentry_sdk.flush() + + items.pop() # drop the test client's outer segment + (server_span,) = [item.payload for item in items] + + # send_default_pii defaults to False, so _filter_headers substitutes + # sensitive headers with SENSITIVE_DATA_SUBSTITUTE ("[Filtered]"). The + # original token must not leak. + assert ( + server_span["attributes"]["http.request.header.authorization"] + == SENSITIVE_DATA_SUBSTITUTE + ) + # Non-sensitive headers pass through untouched. + assert ( + server_span["attributes"]["http.request.header.x-custom-header"] + == "passthrough" + ) + + +@pytest.mark.asyncio +async def test_sensitive_header_passthrough_with_pii_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + send_default_pii=True, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + return web.Response(text="hello") + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + await client.get("/", headers={"Authorization": "Bearer secret-token"}) + + sentry_sdk.flush() + + items.pop() # drop the test client's outer segment + (server_span,) = [item.payload for item in items] + + # With send_default_pii=True, _filter_headers is a no-op and the original + # value reaches the span attribute. + assert ( + server_span["attributes"]["http.request.header.authorization"] + == "Bearer secret-token" + ) + # client.address and user.ip_address is captured under send_default_pii=True. + assert server_span["attributes"]["client.address"] == "127.0.0.1" + assert server_span["attributes"]["user.ip_address"] == "127.0.0.1" + + +@pytest.mark.asyncio +async def test_request_body_captured_on_segment_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + body = {"some": "value"} + + async def hello(request): + # Reading the body populates request._read_bytes; the integration + # captures body data in a finally after the handler returns. + await request.json() + return web.Response(text="hello") + + app = web.Application() + app.router.add_post("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.post("/", json=body) + assert resp.status == 200 + + sentry_sdk.flush() + + # The integration sets http.request.body.data on the segment span. In this + # test the segment is the aiohttp_client's outgoing http.client span (the + # test client is itself sentry-instrumented). In production with separate + # processes, the server span is the segment; the assertion is the same. + segments = [item.payload for item in items if item.payload.get("is_segment")] + assert len(segments) == 1 + assert segments[0]["attributes"]["http.request.body.data"] == json.dumps(body) + + +@pytest.mark.asyncio +async def test_request_body_not_read_span_streaming( + sentry_init, aiohttp_client, capture_items +): + from sentry_sdk.integrations.aiohttp import BODY_NOT_READ_MESSAGE + + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + # Handler does not read the body; request._read_bytes stays None. + return web.Response(text="hello") + + app = web.Application() + app.router.add_post("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.post("/", json={"some": "value"}) + assert resp.status == 200 + + sentry_sdk.flush() + + segments = [item.payload for item in items if item.payload.get("is_segment")] + assert len(segments) == 1 + assert segments[0]["attributes"]["http.request.body.data"] == BODY_NOT_READ_MESSAGE + + +@pytest.mark.asyncio +async def test_request_body_over_size_limit_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + max_request_body_size="small", + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + await request.read() + return web.Response(text="hello") + + app = web.Application() + app.router.add_post("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + # "small" caps at 1 KB; send a body larger than that. + resp = await client.post("/", data=b"x" * 2000) + assert resp.status == 200 + + sentry_sdk.flush() + + segments = [item.payload for item in items if item.payload.get("is_segment")] + assert len(segments) == 1 + assert ( + segments[0]["attributes"]["http.request.body.data"] + == OVER_SIZE_LIMIT_SUBSTITUTE + ) + + +@pytest.mark.asyncio +async def test_url_query_attribute_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + return web.Response(text="hello") + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.get("/?foo=bar&baz=qux") + assert resp.status == 200 + + sentry_sdk.flush() + + assert len(items) == 2 + items.pop() # drop the test client's outer segment + (server_span,) = [item.payload for item in items] + + assert server_span["attributes"]["url.query"] == "foo=bar&baz=qux" + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "url,transaction_style,expected_name,expected_source", + [ + ( + "/message", + "handler_name", + "tests.integrations.aiohttp.test_aiohttp." + "test_transaction_style_span_streaming..hello", + "component", + ), + ( + "/message", + "method_and_path_pattern", + "GET /{var}", + "route", + ), + ], +) +async def test_transaction_style_span_streaming( + sentry_init, + aiohttp_client, + capture_items, + url, + transaction_style, + expected_name, + expected_source, +): + sentry_init( + integrations=[AioHttpIntegration(transaction_style=transaction_style)], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + return web.Response(text="hello") + + app = web.Application() + app.router.add_get(r"/{var}", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.get(url) + assert resp.status == 200 + + sentry_sdk.flush() + + assert len(items) == 2 + items.pop() # drop the test client's outer segment + (server_span,) = [item.payload for item in items] + + assert server_span["name"] == expected_name + assert server_span["attributes"]["sentry.span.source"] == expected_source + + +@pytest.mark.asyncio +async def test_server_error_span_streaming(sentry_init, aiohttp_client, capture_items): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + 1 / 0 + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("event", "span") + + client = await aiohttp_client(app) + resp = await client.get("/") + assert resp.status == 500 + + sentry_sdk.flush() + + # 1 error event + 2 spans (server http.server, test client http.client segment) + assert len(items) == 3 + + error_event = items[0] + assert error_event.type == "event" + assert error_event.payload["exception"]["values"][0]["type"] == "ZeroDivisionError" + + spans = items[1:] + assert spans[-1].type == "span" + segment = spans.pop().payload + (server_span,) = [item.payload for item in spans] + + assert segment["is_segment"] is True + assert segment["attributes"]["sentry.op"] == "http.client" + # The test client receives the 500 response that aiohttp's outer error + # handler synthesizes from the unhandled exception. + assert segment["attributes"]["http.response.status_code"] == 500 + assert segment["status"] == "error" + + # The integration's generic Exception path reraises without recording + # http.response.status_code on the server span. StreamedSpan.__exit__ + # observes the propagating exception and sets status to "error". + assert server_span["attributes"]["sentry.op"] == "http.server" + assert "http.response.status_code" not in server_span["attributes"] + assert server_span["status"] == "error" + + +@pytest.mark.asyncio +async def test_http_exception_span_streaming( + sentry_init, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def hello(request): + raise web.HTTPForbidden() + + app = web.Application() + app.router.add_get("/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + resp = await client.get("/") + assert resp.status == 403 + + sentry_sdk.flush() + + assert len(items) == 2 + segment = items.pop().payload + (server_span,) = [item.payload for item in items] + + assert segment["is_segment"] is True + assert segment["attributes"]["sentry.op"] == "http.client" + assert segment["attributes"]["http.response.status_code"] == 403 + assert segment["status"] == "error" + + assert server_span["attributes"]["sentry.op"] == "http.server" + assert server_span["attributes"]["http.response.status_code"] == 403 + assert server_span["status"] == "error" + + +@pytest.mark.asyncio +async def test_outgoing_client_span_span_streaming( + sentry_init, aiohttp_raw_server, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def handler(request): + return web.Response(text="OK") + + raw_server = await aiohttp_raw_server(handler) + + async def hello(request): + span_client = await aiohttp_client(raw_server) + await span_client.get("/?foo=bar") + return web.Response(text="hello") + + app = web.Application() + app.router.add_get(r"/", hello) + + items = capture_items("span") + + client = await aiohttp_client(app) + await client.get("/") + + sentry_sdk.flush() + + # 3 spans, finished inner-first: + # #0 inner http.client (server -> raw_server) + # #1 server http.server + # #2 outer http.client segment (test client -> server) + assert len(items) == 3 + + inner_client_span = items[0].payload + server_span = items[1].payload + segment = items[2].payload + + assert segment["is_segment"] is True + assert segment["attributes"]["sentry.op"] == "http.client" + + assert server_span["attributes"]["sentry.op"] == "http.server" + + assert inner_client_span["is_segment"] is False + assert inner_client_span["name"].startswith("GET ") + assert inner_client_span["attributes"]["sentry.op"] == "http.client" + assert inner_client_span["attributes"]["sentry.origin"] == "auto.http.aiohttp" + assert inner_client_span["attributes"]["http.request.method"] == "GET" + assert inner_client_span["attributes"]["http.response.status_code"] == 200 + assert inner_client_span["attributes"]["url.query"] == "foo=bar" + url_full = inner_client_span["attributes"]["url.full"] + # parse_url() splits the URL — url.full is the base URL only, with the + # query string captured separately on url.query. + assert url_full.startswith("http://127.0.0.1:") + assert url_full.endswith("/") + assert inner_client_span["status"] == "ok" + + +@pytest.mark.asyncio +async def test_outgoing_trace_headers_span_streaming( + sentry_init, aiohttp_raw_server, aiohttp_client, capture_items +): + sentry_init( + integrations=[AioHttpIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + + async def handler(request): + return web.Response(text="OK") + + raw_server = await aiohttp_raw_server(handler) + + items = capture_items("span") + + client = await aiohttp_client(raw_server) + resp = await client.get("/") + + sentry_sdk.flush() + + # raw_server bypasses Application._handle, so only the test client's + # outgoing http.client segment is emitted. + assert len(items) == 1 + client_span = items[0].payload + + assert client_span["is_segment"] is True + assert client_span["attributes"]["sentry.op"] == "http.client" + assert resp.request_info.headers[ + "sentry-trace" + ] == "{trace_id}-{span_id}-{sampled}".format( + trace_id=client_span["trace_id"], + span_id=client_span["span_id"], + sampled=1, + ) From 6aef3475c794908985f38cc727568875ce2b5fe1 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 1 May 2026 10:26:44 -0400 Subject: [PATCH 2/7] Address warden comment --- sentry_sdk/integrations/aiohttp.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/aiohttp.py b/sentry_sdk/integrations/aiohttp.py index 0371ff2af3..3518fa9d57 100644 --- a/sentry_sdk/integrations/aiohttp.py +++ b/sentry_sdk/integrations/aiohttp.py @@ -205,7 +205,9 @@ async def sentry_app_handle( try: response = await old_handle(self, request) except HTTPException as e: - if isinstance(span, StreamedSpan): + if isinstance(span, StreamedSpan) and not isinstance( + span, NoOpStreamedSpan + ): span.set_attribute( "http.response.status_code", e.status_code ) From 6e51935486fb11381a91f506e381aada90954b8f Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 1 May 2026 10:36:13 -0400 Subject: [PATCH 3/7] Guard against no-op streamed spans --- sentry_sdk/integrations/aiohttp.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/aiohttp.py b/sentry_sdk/integrations/aiohttp.py index 3518fa9d57..efc0bbd13f 100644 --- a/sentry_sdk/integrations/aiohttp.py +++ b/sentry_sdk/integrations/aiohttp.py @@ -224,7 +224,8 @@ async def sentry_app_handle( # approach as well, so this matches what we do today. span.status = SpanStatus.ERROR.value else: - span.set_http_status(e.status_code) + if not isinstance(span, NoOpStreamedSpan): + span.set_http_status(e.status_code) if ( e.status_code From 7b77e3f5eb915478a4b8be65ae416847766ac802 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 1 May 2026 11:15:36 -0400 Subject: [PATCH 4/7] Fix issue that a failing test caught --- sentry_sdk/integrations/aiohttp.py | 4 ++-- tests/integrations/aiohttp/test_aiohttp.py | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sentry_sdk/integrations/aiohttp.py b/sentry_sdk/integrations/aiohttp.py index efc0bbd13f..14614f2c81 100644 --- a/sentry_sdk/integrations/aiohttp.py +++ b/sentry_sdk/integrations/aiohttp.py @@ -317,8 +317,8 @@ async def sentry_urldispatcher_resolve( if isinstance(current_span, StreamedSpan) and not isinstance( current_span, NoOpStreamedSpan ): - current_span.name = name - current_span.set_attribute( + current_span._segment.name = name + current_span._segment.set_attribute( "sentry.span.source", SEGMENT_SOURCE_FOR_STYLE[integration.transaction_style].value, ) diff --git a/tests/integrations/aiohttp/test_aiohttp.py b/tests/integrations/aiohttp/test_aiohttp.py index fe3f0ec582..09a5662f43 100644 --- a/tests/integrations/aiohttp/test_aiohttp.py +++ b/tests/integrations/aiohttp/test_aiohttp.py @@ -1169,16 +1169,18 @@ async def hello(request): assert segment["is_segment"] is True assert segment["attributes"]["sentry.op"] == "http.client" - - assert server_span["is_segment"] is False - assert server_span["name"] == ( + assert segment["name"] == ( "tests.integrations.aiohttp.test_aiohttp." "test_tracing_span_streaming..hello" ) + assert segment["attributes"]["sentry.span.source"] == "component" + + assert server_span["is_segment"] is False + assert server_span["name"] == "generic AIOHTTP request" assert server_span["attributes"]["sentry.op"] == "http.server" assert server_span["attributes"]["sentry.origin"] == "auto.http.aiohttp" - assert server_span["attributes"]["sentry.span.source"] == "component" assert server_span["attributes"]["http.response.status_code"] == 200 + assert server_span["attributes"]["sentry.span.source"] == "route" assert server_span["status"] == "ok" # No query string on the request, so the attribute should be omitted. assert "url.query" not in server_span["attributes"] From b356795f542851603ced7a317478dcd0bba2d061 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 1 May 2026 11:52:01 -0400 Subject: [PATCH 5/7] Update test --- tests/integrations/aiohttp/test_aiohttp.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/integrations/aiohttp/test_aiohttp.py b/tests/integrations/aiohttp/test_aiohttp.py index 09a5662f43..af26e25c32 100644 --- a/tests/integrations/aiohttp/test_aiohttp.py +++ b/tests/integrations/aiohttp/test_aiohttp.py @@ -1469,11 +1469,15 @@ async def hello(request): sentry_sdk.flush() assert len(items) == 2 - items.pop() # drop the test client's outer segment + segment = items.pop().payload (server_span,) = [item.payload for item in items] - assert server_span["name"] == expected_name - assert server_span["attributes"]["sentry.span.source"] == expected_source + assert segment["name"] == expected_name + assert segment["attributes"]["sentry.span.source"] == expected_source + + assert server_span["name"] == "generic AIOHTTP request" + assert not server_span["is_segment"] + assert server_span["attributes"]["sentry.span.source"] == "route" @pytest.mark.asyncio From d9deb29ca56bc7be587163066b79f00a494b69cc Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 1 May 2026 15:17:59 -0400 Subject: [PATCH 6/7] Changes after doing a review --- sentry_sdk/integrations/aiohttp.py | 64 +++++++++++----------- tests/integrations/aiohttp/test_aiohttp.py | 45 ++++++++------- 2 files changed, 55 insertions(+), 54 deletions(-) diff --git a/sentry_sdk/integrations/aiohttp.py b/sentry_sdk/integrations/aiohttp.py index 14614f2c81..7acf1ecf63 100644 --- a/sentry_sdk/integrations/aiohttp.py +++ b/sentry_sdk/integrations/aiohttp.py @@ -4,26 +4,28 @@ import sentry_sdk from sentry_sdk.api import continue_trace -from sentry_sdk.consts import OP, SPANSTATUS, SPANDATA +from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS from sentry_sdk.integrations import ( _DEFAULT_FAILED_REQUEST_STATUS_CODES, - _check_minimum_version, - Integration, DidNotEnable, + Integration, + _check_minimum_version, ) -from sentry_sdk.integrations.logging import ignore_logger -from sentry_sdk.scope import should_send_default_pii -from sentry_sdk.sessions import track_session from sentry_sdk.integrations._wsgi_common import ( _filter_headers, request_body_within_bounds, ) +from sentry_sdk.integrations.logging import ignore_logger +from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.sessions import track_session +from sentry_sdk.traces import ( + SOURCE_FOR_STYLE as SEGMENT_SOURCE_FOR_STYLE, +) from sentry_sdk.traces import ( NoOpStreamedSpan, SegmentSource, SpanStatus, StreamedSpan, - SOURCE_FOR_STYLE as SEGMENT_SOURCE_FOR_STYLE, ) from sentry_sdk.tracing import ( BAGGAGE_HEADER_NAME, @@ -36,6 +38,10 @@ should_propagate_trace, ) from sentry_sdk.utils import ( + CONTEXTVARS_ERROR_MESSAGE, + HAS_REAL_CONTEXTVARS, + SENSITIVE_DATA_SUBSTITUTE, + AnnotatedValue, capture_internal_exceptions, ensure_integration_enabled, event_from_exception, @@ -44,17 +50,13 @@ parse_version, reraise, transaction_from_function, - HAS_REAL_CONTEXTVARS, - CONTEXTVARS_ERROR_MESSAGE, - SENSITIVE_DATA_SUBSTITUTE, - AnnotatedValue, ) try: import asyncio - from aiohttp import __version__ as AIOHTTP_VERSION from aiohttp import ClientSession, TraceConfig + from aiohttp import __version__ as AIOHTTP_VERSION from aiohttp.web import Application, HTTPException, UrlDispatcher except ImportError: raise DidNotEnable("AIOHTTP not installed") @@ -62,21 +64,17 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from aiohttp.web_request import Request - from aiohttp.web_urldispatcher import UrlMappingMatchInfo - from aiohttp import TraceRequestStartParams, TraceRequestEndParams - from collections.abc import Set from types import SimpleNamespace - from typing import Any - from typing import ContextManager - from typing import Optional - from typing import Tuple - from typing import Union + from typing import Any, ContextManager, Optional, Tuple, Union + + from aiohttp import TraceRequestEndParams, TraceRequestStartParams + from aiohttp.web_request import Request + from aiohttp.web_urldispatcher import UrlMappingMatchInfo + from sentry_sdk._types import Attributes, Event, EventProcessor from sentry_sdk.tracing import Span from sentry_sdk.utils import ExcInfo - from sentry_sdk._types import Attributes, Event, EventProcessor TRANSACTION_STYLE_VALUES = ("handler_name", "method_and_path_pattern") @@ -145,22 +143,24 @@ async def sentry_app_handle( {"aiohttp_request": request} ) - header_dict: "dict[str, Any]" = {} + header_attributes: "dict[str, Any]" = {} for header, header_value in _filter_headers( headers, use_annotated_value=False ).items(): - header_dict[f"http.request.header.{header.lower()}"] = ( + header_attributes[ + f"http.request.header.{header.lower()}" + ] = ( # header_value will always be a string because we set `use_annotated_value` to false above header_value ) - url_query = ( + url_query_attribute = ( {"url.query": request.query_string} if request.query_string else {} ) - client_address = ( + client_address_attributes = ( { "client.address": request.remote, "user.ip_address": request.remote, @@ -180,9 +180,9 @@ async def sentry_app_handle( "url.full": "%s://%s%s" % (request.scheme, request.host, request.path), "http.request.method": request.method, - **url_query, - **client_address, - **header_dict, + **url_query_attribute, + **client_address_attributes, + **header_attributes, }, ) else: @@ -212,8 +212,8 @@ async def sentry_app_handle( "http.response.status_code", e.status_code ) - # There are a number of sub-exceptions that should have an "ok" status, but will - # actually have a status of error once the `raise` happens below + # There are a number of sub-exceptions that should have an "ok" status, + # but will actually have a status of error once the `raise` happens below # and we pass through the `should_be_treated_as_error` method invoked # within a span's `__exit__` method. # @@ -224,6 +224,8 @@ async def sentry_app_handle( # approach as well, so this matches what we do today. span.status = SpanStatus.ERROR.value else: + # Since a NoOpStreamedSpan can end up here, we have to guard against it + # so this only gets set in the legacy transaction approach. if not isinstance(span, NoOpStreamedSpan): span.set_http_status(e.status_code) diff --git a/tests/integrations/aiohttp/test_aiohttp.py b/tests/integrations/aiohttp/test_aiohttp.py index af26e25c32..57dddcb964 100644 --- a/tests/integrations/aiohttp/test_aiohttp.py +++ b/tests/integrations/aiohttp/test_aiohttp.py @@ -1,31 +1,33 @@ -import os -import datetime import asyncio +import datetime import json - +import os from contextlib import suppress from unittest import mock import pytest - from aiohttp import web from aiohttp.client import ServerDisconnectedError -from aiohttp.web_request import Request from aiohttp.web_exceptions import ( + HTTPBadRequest, HTTPInternalServerError, HTTPNetworkAuthenticationRequired, - HTTPBadRequest, HTTPNotFound, HTTPUnavailableForLegalReasons, ) +from aiohttp.web_request import Request import sentry_sdk from sentry_sdk import capture_message, start_transaction +from sentry_sdk._types import OVER_SIZE_LIMIT_SUBSTITUTE from sentry_sdk.consts import SPANDATA -from sentry_sdk.integrations.aiohttp import AioHttpIntegration, create_trace_config +from sentry_sdk.integrations.aiohttp import ( + BODY_NOT_READ_MESSAGE, + AioHttpIntegration, + create_trace_config, +) from sentry_sdk.utils import SENSITIVE_DATA_SUBSTITUTE from tests.conftest import ApproxDict -from sentry_sdk._types import OVER_SIZE_LIMIT_SUBSTITUTE @pytest.mark.asyncio @@ -1317,17 +1319,15 @@ async def hello(request): # test the segment is the aiohttp_client's outgoing http.client span (the # test client is itself sentry-instrumented). In production with separate # processes, the server span is the segment; the assertion is the same. - segments = [item.payload for item in items if item.payload.get("is_segment")] - assert len(segments) == 1 - assert segments[0]["attributes"]["http.request.body.data"] == json.dumps(body) + segment = items.pop().payload + assert segment["is_segment"] is True + assert segment["attributes"]["http.request.body.data"] == json.dumps(body) @pytest.mark.asyncio async def test_request_body_not_read_span_streaming( sentry_init, aiohttp_client, capture_items ): - from sentry_sdk.integrations.aiohttp import BODY_NOT_READ_MESSAGE - sentry_init( integrations=[AioHttpIntegration()], traces_sample_rate=1.0, @@ -1349,9 +1349,9 @@ async def hello(request): sentry_sdk.flush() - segments = [item.payload for item in items if item.payload.get("is_segment")] - assert len(segments) == 1 - assert segments[0]["attributes"]["http.request.body.data"] == BODY_NOT_READ_MESSAGE + segment = items.pop().payload + assert segment["is_segment"] is True + assert segment["attributes"]["http.request.body.data"] == BODY_NOT_READ_MESSAGE @pytest.mark.asyncio @@ -1381,12 +1381,9 @@ async def hello(request): sentry_sdk.flush() - segments = [item.payload for item in items if item.payload.get("is_segment")] - assert len(segments) == 1 - assert ( - segments[0]["attributes"]["http.request.body.data"] - == OVER_SIZE_LIMIT_SUBSTITUTE - ) + segment = items.pop().payload + assert segment["is_segment"] is True + assert segment["attributes"]["http.request.body.data"] == OVER_SIZE_LIMIT_SUBSTITUTE @pytest.mark.asyncio @@ -1619,12 +1616,13 @@ async def hello(request): assert inner_client_span["attributes"]["http.request.method"] == "GET" assert inner_client_span["attributes"]["http.response.status_code"] == 200 assert inner_client_span["attributes"]["url.query"] == "foo=bar" + assert inner_client_span["status"] == "ok" + url_full = inner_client_span["attributes"]["url.full"] # parse_url() splits the URL — url.full is the base URL only, with the # query string captured separately on url.query. assert url_full.startswith("http://127.0.0.1:") assert url_full.endswith("/") - assert inner_client_span["status"] == "ok" @pytest.mark.asyncio @@ -1656,6 +1654,7 @@ async def handler(request): assert client_span["is_segment"] is True assert client_span["attributes"]["sentry.op"] == "http.client" + assert client_span["name"].startswith("GET http://127.0.0.1:") assert resp.request_info.headers[ "sentry-trace" ] == "{trace_id}-{span_id}-{sampled}".format( From 299e300679dea41399e54d0b9fd9303617ba9d05 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 1 May 2026 15:55:34 -0400 Subject: [PATCH 7/7] Use get_current_scope to see if it makes the bot happy --- sentry_sdk/integrations/aiohttp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/aiohttp.py b/sentry_sdk/integrations/aiohttp.py index 7acf1ecf63..95ced490bd 100644 --- a/sentry_sdk/integrations/aiohttp.py +++ b/sentry_sdk/integrations/aiohttp.py @@ -314,8 +314,7 @@ async def sentry_urldispatcher_resolve( pass if name is not None: - current_scope = sentry_sdk.get_current_scope() - current_span = current_scope.span + current_span = sentry_sdk.get_current_span() if isinstance(current_span, StreamedSpan) and not isinstance( current_span, NoOpStreamedSpan ): @@ -325,6 +324,7 @@ async def sentry_urldispatcher_resolve( SEGMENT_SOURCE_FOR_STYLE[integration.transaction_style].value, ) else: + current_scope = sentry_sdk.get_current_scope() current_scope.set_transaction_name( name, source=SOURCE_FOR_STYLE[integration.transaction_style],