Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/google/adk/agents/parallel_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async def process_an_agent(events_for_one_agent):
finally:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)


class ParallelAgent(BaseAgent):
Expand Down
36 changes: 36 additions & 0 deletions tests/unittests/agents/test_parallel_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from google.adk.agents.base_agent import BaseAgent
from google.adk.agents.base_agent import BaseAgentState
from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.parallel_agent import _merge_agent_run_pre_3_11
from google.adk.agents.parallel_agent import ParallelAgent
from google.adk.agents.sequential_agent import SequentialAgent
from google.adk.agents.sequential_agent import SequentialAgentState
Expand Down Expand Up @@ -373,3 +374,38 @@ async def test_stop_agent_if_sub_agent_fails(
async for _ in agen:
# The infinite agent could iterate a few times depending on scheduling.
pass


async def _slow_agent_with_cleanup_delay():
"""Async generator that sleeps in its finally block to simulate cleanup."""
try:
await asyncio.sleep(10)
yield 'slow-event'
finally:
await asyncio.sleep(0.05)


async def _failing_agent():
"""Async generator that raises after a short delay."""
await asyncio.sleep(0.01)
raise ValueError('simulated sub-agent failure')
yield # pragma: no cover


@pytest.mark.asyncio
async def test_merge_agent_run_pre_3_11_no_aclose_error_on_failure():
"""Regression test for Python 3.10 RuntimeError: aclose() already running.

_merge_agent_run_pre_3_11 must await all cancelled tasks before returning so
that generators are fully released before the caller invokes aclose() on them.
"""
agent_runs = [_slow_agent_with_cleanup_delay(), _failing_agent()]

with pytest.raises(ValueError, match='simulated sub-agent failure'):
async for _ in _merge_agent_run_pre_3_11(agent_runs):
pass

# If tasks were not properly awaited, aclose() on a still-running generator
# would raise RuntimeError here.
for agen in agent_runs:
await agen.aclose()