Skip to content

Commit a0ee2b2

Browse files
rgarciaclaude
andcommitted
fix(streaming): don't dispatch empty SSE keepalive comment frames
The SSE decoder's empty-block guard included last_event_id, which is sticky across events per the SSE spec. Once any event carried an id, every subsequent comment-only block (e.g. the server's ":\n\n" keepalive, sent every 15s on an idle stream) fell through the guard and dispatched an empty-data event. The typed Stream wrapper then calls .json() on it unconditionally, raising JSONDecodeError and killing the stream. This made idle browser telemetry streams crash after ~15s. Drop last_event_id from the guard so dispatch depends only on the current block's event/data/retry fields. Event-typed empty-data frames still dispatch (unchanged). Adds a regression test for a keepalive comment following an id-bearing event. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 4c29993 commit a0ee2b2

2 files changed

Lines changed: 32 additions & 1 deletion

File tree

src/kernel/_streaming.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,12 @@ def decode(self, line: str) -> ServerSentEvent | None:
251251
# See: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation # noqa: E501
252252

253253
if not line:
254-
if not self._event and not self._data and not self._last_event_id and self._retry is None:
254+
# Whether to dispatch depends only on what was set in the *current* block. last_event_id
255+
# is sticky across events (per the SSE spec, it is intentionally not reset below), so it
256+
# must not be part of this check -- otherwise, once any event carries an id, every
257+
# subsequent comment-only block (e.g. a ``:\n\n`` keepalive) would dispatch an empty
258+
# event, which then fails to JSON-decode in the typed Stream wrapper.
259+
if not self._event and not self._data and self._retry is None:
255260
return None
256261

257262
sse = ServerSentEvent(

tests/test_streaming.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,32 @@ def body() -> Iterator[bytes]:
2626
await assert_empty_iter(iterator)
2727

2828

29+
@pytest.mark.asyncio
30+
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
31+
async def test_keepalive_comment_after_event_with_id(sync: bool, client: Kernel, async_client: AsyncKernel) -> None:
32+
# A ``:`` comment frame (the server's SSE keepalive) that arrives after an event which set an
33+
# id must be ignored, not dispatched as an empty event. last_event_id is sticky, so this is a
34+
# regression guard against it leaking an undecodable empty frame into the typed stream.
35+
def body() -> Iterator[bytes]:
36+
yield b"id: 1\n"
37+
yield b'data: {"foo":true}\n'
38+
yield b"\n"
39+
yield b":\n"
40+
yield b"\n"
41+
yield b'data: {"bar":false}\n'
42+
yield b"\n"
43+
44+
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
45+
46+
sse = await iter_next(iterator)
47+
assert sse.json() == {"foo": True}
48+
49+
sse = await iter_next(iterator)
50+
assert sse.json() == {"bar": False}
51+
52+
await assert_empty_iter(iterator)
53+
54+
2955
@pytest.mark.asyncio
3056
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
3157
async def test_data_missing_event(sync: bool, client: Kernel, async_client: AsyncKernel) -> None:

0 commit comments

Comments
 (0)