Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/openai/lib/streaming/responses/_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,18 +328,20 @@ def accumulate_event(self, event: RawResponseStreamEvent) -> ParsedResponseSnaps
return self._create_initial_response(event)

if event.type == "response.output_item.added":
if event.item.type == "function_call":
item = event.item
if item is None:
return snapshot
Comment on lines +332 to +333
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve output index when item is null

Returning early when response.output_item.added has item=None leaves snapshot.output shorter than the stream’s declared indices, but later handlers index into snapshot.output[event.output_index] (for example in response.content_part.added and response.output_text.delta). In malformed streams where a null item is followed by events for subsequent output_index values, this still raises IndexError, so the new guard does not reliably let consumers recover from the malformed event.

Useful? React with 👍 / 👎.


if item.type == "function_call":
snapshot.output.append(
construct_type_unchecked(
type_=cast(Any, ParsedResponseFunctionToolCall), value=event.item.to_dict()
)
construct_type_unchecked(type_=cast(Any, ParsedResponseFunctionToolCall), value=item.to_dict())
)
elif event.item.type == "message":
elif item.type == "message":
snapshot.output.append(
construct_type_unchecked(type_=cast(Any, ParsedResponseOutputMessage), value=event.item.to_dict())
construct_type_unchecked(type_=cast(Any, ParsedResponseOutputMessage), value=item.to_dict())
)
else:
snapshot.output.append(event.item)
snapshot.output.append(item)
elif event.type == "response.content_part.added":
output = snapshot.output[event.output_index]
if output.type == "message":
Expand Down
41 changes: 41 additions & 0 deletions tests/lib/responses/test_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

from openai import OpenAI, AsyncOpenAI
from openai._utils import assert_signatures_in_sync
from openai._types import omit
from openai._models import construct_type_unchecked
from openai.types.responses import ResponseStreamEvent
from openai.lib.streaming.responses._responses import ResponseStreamState

from ...conftest import base_url
from ..snapshots import make_snapshot_request
Expand Down Expand Up @@ -61,3 +65,40 @@ def test_parse_method_definition_in_sync(sync: bool, client: OpenAI, async_clien
checking_client.responses.parse,
exclude_params={"tools"},
)


def test_stream_accumulator_ignores_missing_output_item() -> None:
state = ResponseStreamState(input_tools=omit, text_format=omit)

state.handle_event(
construct_type_unchecked(
type_=ResponseStreamEvent,
value={
"type": "response.created",
"sequence_number": 0,
"response": {
"id": "resp_123",
"object": "response",
"created_at": 0,
"status": "in_progress",
"model": "gpt-4o-mini",
"output": [],
"parallel_tool_calls": True,
"tool_choice": "auto",
"tools": [],
},
},
)
)

event = construct_type_unchecked(
type_=ResponseStreamEvent,
value={
"type": "response.output_item.added",
"sequence_number": 1,
"output_index": 0,
"item": None,
},
)

assert state.handle_event(event) == [event]