Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions src/app/endpoints/streaming_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
APIStatusError as LLSApiStatusError,
)
from openai._exceptions import APIStatusError as OpenAIAPIStatusError
from typing_extensions import deprecated

from authentication import get_auth_dependency
from authentication.interface import AuthTuple
Expand Down Expand Up @@ -74,6 +75,10 @@
from models.common.responses.types import ResponseInput
from models.common.turn_summary import TurnSummary
from models.config import Action
from utils.agents.streaming import (
generate_agent_response,
retrieve_agent_response_generator,
)
from utils.conversation_compaction import (
CompactionResult,
CompactionStartedEvent,
Expand Down Expand Up @@ -329,7 +334,7 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals
media_type=response_media_type,
)

generator, turn_summary = await retrieve_response_generator(
generator, turn_summary = await retrieve_agent_response_generator(
responses_params=responses_params,
context=context,
endpoint_path=endpoint_path,
Expand All @@ -342,16 +347,21 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals
)

return StreamingResponse(
generate_response(
generate_agent_response(
generator=generator,
context=context,
responses_params=responses_params,
turn_summary=turn_summary,
background_topic_summary_tasks=_background_topic_summary_tasks,
),
media_type=response_media_type,
)


@deprecated(
"Deprecated in favor of utils.agents.streaming.retrieve_agent_response_generator.",
stacklevel=2,
)
async def retrieve_response_generator(
responses_params: ResponsesApiParams,
context: ResponseGeneratorContext,
Expand Down Expand Up @@ -474,7 +484,7 @@ async def generate_response_with_compaction(
request_id=context.request_id,
)

compacted = False
_compacted = False

@asimurka asimurka Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing compacted turns is disconnected and will be addressed in a separate PR.

compacted_original_input: Optional[ResponseInput] = None
try:
async for item in apply_compaction(
Expand All @@ -491,10 +501,10 @@ async def generate_response_with_compaction(
yield stream_compaction_event(context.conversation_id)
elif isinstance(item, CompactionResult):
responses_params = item.params
compacted = item.compacted
_compacted = item.compacted
compacted_original_input = item.original_input

generator, turn_summary = await retrieve_response_generator(
generator, turn_summary = await retrieve_agent_response_generator(
responses_params=responses_params,
context=context,
endpoint_path=endpoint_path,
Expand Down Expand Up @@ -531,18 +541,22 @@ async def generate_response_with_compaction(

# The start event was already emitted above; delegate the rest (re-yield,
# finalization, compacted-turn storage) to the shared generator.
async for event in generate_response(
async for event in generate_agent_response(
generator,
context,
responses_params,
turn_summary,
background_topic_summary_tasks=_background_topic_summary_tasks,
emit_start=False,
compacted=compacted,
original_input=compacted_original_input,
):
yield event


@deprecated(
"Deprecated in favor of utils.agents.streaming.generate_agent_response.",
stacklevel=2,
)
async def generate_response( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals,too-many-branches,too-many-statements
generator: AsyncIterator[str],
context: ResponseGeneratorContext,
Expand Down Expand Up @@ -711,6 +725,10 @@ async def generate_response( # pylint: disable=too-many-arguments,too-many-posi
)


@deprecated(
"Deprecated in favor of utils.agents.streaming.agent_response_generator.",
stacklevel=2,
)
async def response_generator( # pylint: disable=too-many-branches,too-many-statements,too-many-locals
turn_response: AsyncIterator[OpenAIResponseObjectStream],
context: ResponseGeneratorContext,
Expand Down
3 changes: 2 additions & 1 deletion src/pydantic_ai_lightspeed/llamastack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Pydantic AI provider for Llama Stack."""

from pydantic_ai_lightspeed.llamastack._model import LlamaStackResponsesModel
from pydantic_ai_lightspeed.llamastack._provider import LlamaStackProvider

__all__ = ["LlamaStackProvider"]
__all__ = ["LlamaStackProvider", "LlamaStackResponsesModel"]
4 changes: 2 additions & 2 deletions src/utils/agents/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
TextPartDelta,
)

from app.endpoints.streaming_query import shield_violation_generator
from configuration import configuration
from constants import INTERRUPTED_RESPONSE_MESSAGE, MEDIA_TYPE_JSON
from log import get_logger
Expand Down Expand Up @@ -70,6 +69,7 @@
persist_interrupted_turn,
register_interrupt_callback,
)
from utils.streaming_sse import shield_violation_generator

AgentDispatchEvent: TypeAlias = AgentStreamEvent | AgentRunResultEvent

Expand Down Expand Up @@ -117,7 +117,7 @@ async def retrieve_agent_response_generator(
turn_summary,
)

agent = build_agent(context.client, responses_params)
agent = build_agent(context.client, responses_params, configuration.skills)

return (
agent_response_generator(
Expand Down
9 changes: 6 additions & 3 deletions src/utils/pydantic_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
from llama_stack.core.library_client import AsyncLlamaStackAsLibraryClient
from llama_stack_client import AsyncLlamaStackClient
from pydantic_ai import Agent, AgentCapability
from pydantic_ai.models.openai import OpenAIResponsesModel, OpenAIResponsesModelSettings
from pydantic_ai.models.openai import OpenAIResponsesModelSettings
from pydantic_ai_skills import SkillsCapability

from models.common.responses.responses_api_params import ResponsesApiParams
from models.config import SkillsConfiguration
from pydantic_ai_lightspeed.llamastack import LlamaStackProvider
from pydantic_ai_lightspeed.llamastack import (
LlamaStackProvider,
LlamaStackResponsesModel,
)

_LLS_RESPONSES_EXTRA_FIELDS: Final[frozenset[str]] = frozenset(
{
Expand Down Expand Up @@ -132,7 +135,7 @@ def build_agent(
provider = _llama_stack_provider_from_client(client)
settings = _model_settings_from_responses_params(responses_params)

model = OpenAIResponsesModel(
model = LlamaStackResponsesModel(
responses_params.model,
provider=provider,
settings=settings,
Expand Down
5 changes: 0 additions & 5 deletions tests/e2e/features/steps/llm_query_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,6 @@ def _parse_streaming_response(response_text: str) -> dict:
full_response = ""
full_response_split = []
finished = False
first_token = True
stream_error = (
None # {"status_code": int, "response": str, "cause": str} if event "error"
)
Expand All @@ -380,10 +379,6 @@ def _parse_streaming_response(response_text: str) -> dict:
if event == "start":
conversation_id = data["data"]["conversation_id"]
elif event == "token":
# Skip the first token (shield status message)
if first_token:
first_token = False
continue

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed workaround from old Agents API that was introduced for Responses API
First token is no longer a shield status nor an empty token but a real token.

full_response_split.append(data["data"]["token"])
elif event == "turn_complete":
full_response = data["data"]["token"]
Expand Down
Loading
Loading