diff --git a/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py b/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py index 3f31ffe8..99d50285 100644 --- a/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py +++ b/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py @@ -1,12 +1,12 @@ """LangGraph agent graph runner for LaunchDarkly AI SDK.""" import time -from typing import Annotated, Any, Dict, List, Set, Tuple +from typing import Annotated, Any, Dict, FrozenSet, List, Set, Tuple from ldai import log from ldai.agent_graph import AgentGraphDefinition, AgentGraphNode from ldai.providers import AgentGraphRunner, ToolRegistry -from ldai.providers.types import AgentGraphRunnerResult, AIGraphMetrics +from ldai.providers.types import AgentGraphRunnerResult, AIGraphMetrics, EvalRequest from ldai_langchain.langchain_helper import ( build_structured_tools, @@ -17,6 +17,62 @@ from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler +def _message_content_to_str(content: Any) -> str: + """Normalize a LangChain message ``content`` (string or list of parts) to a string.""" + if isinstance(content, str): + return content + if isinstance(content, list): + parts: List[str] = [] + for item in content: + if isinstance(item, str): + parts.append(item) + elif isinstance(item, dict): + text = item.get('text') + if isinstance(text, str): + parts.append(text) + return '\r\n'.join(parts) + return str(content) + + +def _maybe_record_eval_request( + eval_requests: List[EvalRequest], + node_key: str, + msgs: List[Any], + response: Any, + handoff_tool_names: FrozenSet[str], +) -> None: + """ + Append an :class:`EvalRequest` to ``eval_requests`` when ``response`` + represents the agent's final output for this activation. + + Skips emission when the response only requests further tool calls (still + working in a tool loop) or when there is no content to evaluate. Tool + calls limited to handoff tools are treated as the agent terminating with + a transfer, so the response is still emitted. + """ + tool_calls = getattr(response, 'tool_calls', None) or [] + if tool_calls: + # If every tool call is a handoff, the agent is terminating with a + # transfer; otherwise it is still working through a tool loop. + for tc in tool_calls: + name = tc.get('name') if isinstance(tc, dict) else getattr(tc, 'name', None) + if name not in handoff_tool_names: + return + + response_content = getattr(response, 'content', response) + output_text = _message_content_to_str(response_content) + if not output_text or not output_text.strip(): + return + + input_text = '\r\n'.join( + _message_content_to_str(getattr(m, 'content', m)) for m in msgs + ) if msgs else '' + + eval_requests.append( + EvalRequest(node_key=node_key, input=input_text, output=output_text) + ) + + def _make_handoff_tool(child_key: str, description: str) -> Any: """ Create a tool that transfers control to ``child_key``. @@ -81,19 +137,10 @@ def __init__( """ self._graph = graph self._tools = tools - self._compiled: Any = None - self._fn_name_to_config_key: Dict[str, str] = {} - self._node_keys: Set[str] = set() - - def _ensure_compiled(self) -> None: - """Build and cache the compiled graph if not already done.""" - if self._compiled is None: - compiled, fn_name_to_config_key, node_keys = self._build_graph() - self._compiled = compiled - self._fn_name_to_config_key = fn_name_to_config_key - self._node_keys = node_keys - - def _build_graph(self) -> Tuple[Any, Dict[str, str], Set[str]]: + + def _build_graph( + self, eval_requests: List[EvalRequest] + ) -> Tuple[Any, Dict[str, str], Set[str]]: """ Build and compile the LangGraph StateGraph from the AgentGraphDefinition. @@ -169,7 +216,25 @@ def handle_traversal(node: AgentGraphNode, ctx: dict) -> None: else: model = lc_model - def make_node_fn(bound_model: Any, node_instructions: Any, nk: str): + # Names of the handoff tools attached to this node. Tool calls + # against these are control-flow signals, not the agent doing work, + # so they must not block emission of an EvalRequest. + handoff_tool_names: FrozenSet[str] = frozenset( + getattr(t, 'name', '') for t in handoff_fns + ) + + # Whether this node has at least one judge configured. Nodes without + # judges contribute zero EvalRequest entries. + jc = getattr(node_config, 'judge_configuration', None) + node_has_judges = bool(jc is not None and getattr(jc, 'judges', None)) + + def make_node_fn( + bound_model: Any, + node_instructions: Any, + nk: str, + ht_names: FrozenSet[str], + emit_eval: bool, + ): async def invoke(state: WorkflowState) -> dict: if not bound_model: return {'messages': []} @@ -177,12 +242,20 @@ async def invoke(state: WorkflowState) -> dict: if node_instructions: msgs = [SystemMessage(content=node_instructions)] + msgs response = await bound_model.ainvoke(msgs) + + if emit_eval: + _maybe_record_eval_request( + eval_requests, nk, msgs, response, ht_names + ) + return {'messages': [response]} invoke.__name__ = nk return invoke - invoke_fn = make_node_fn(model, instructions, node_key) + invoke_fn = make_node_fn( + model, instructions, node_key, handoff_tool_names, node_has_judges + ) agent_builder.add_node(node_key, invoke_fn) if node_key == root_key: @@ -287,14 +360,16 @@ async def run(self, input: str) -> AgentGraphRunnerResult: :return: AgentGraphRunnerResult with the final content and AIGraphMetrics """ start_ns = time.perf_counter_ns() + # Per-run state — kept local so concurrent run() calls do not share it. + eval_requests: List[EvalRequest] = [] try: from langchain_core.messages import HumanMessage - self._ensure_compiled() - handler = LDMetricsCallbackHandler(self._node_keys, self._fn_name_to_config_key) + compiled, fn_name_to_config_key, node_keys = self._build_graph(eval_requests) + handler = LDMetricsCallbackHandler(node_keys, fn_name_to_config_key) - result = await self._compiled.ainvoke( # type: ignore[call-overload] + result = await compiled.ainvoke( # type: ignore[call-overload] {'messages': [HumanMessage(content=input)]}, config={'callbacks': [handler], 'recursion_limit': 25}, ) @@ -316,6 +391,7 @@ async def run(self, input: str) -> AgentGraphRunnerResult: tokens=total_usage if (total_usage is not None and total_usage.total > 0) else None, node_metrics=node_metrics, ), + eval_requests=eval_requests if eval_requests else None, ) except Exception as exc: @@ -334,4 +410,5 @@ async def run(self, input: str) -> AgentGraphRunnerResult: success=False, duration_ms=duration_ms, ), + eval_requests=eval_requests if eval_requests else None, ) diff --git a/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py b/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py index 49ab0159..184ae77d 100644 --- a/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py +++ b/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py @@ -6,12 +6,21 @@ import pytest from ldai.agent_graph import AgentGraphDefinition from ldai.evaluator import Evaluator -from ldai.models import AIAgentConfig, AIAgentGraphConfig, ModelConfig, ProviderConfig +from ldai.models import ( + AIAgentConfig, + AIAgentGraphConfig, + JudgeConfiguration, + ModelConfig, + ProviderConfig, +) from ldai.providers import ToolRegistry -from ldai.providers.types import AgentGraphRunnerResult +from ldai.providers.types import AgentGraphRunnerResult, EvalRequest from ldai_langchain.langchain_runner_factory import LangChainRunnerFactory -from ldai_langchain.langgraph_agent_graph_runner import LangGraphAgentGraphRunner +from ldai_langchain.langgraph_agent_graph_runner import ( + LangGraphAgentGraphRunner, + _maybe_record_eval_request, +) def _make_graph(enabled: bool = True) -> AgentGraphDefinition: @@ -167,11 +176,10 @@ async def test_langgraph_runner_run_resets_node_metrics_between_runs(): in the OpenAI provider tests. Each ``run()`` invocation must produce its own fresh ``node_metrics`` rather than a union of all prior runs' metrics. - Strategy: bypass ``_build_graph()`` by pre-populating ``_compiled`` and - ``_node_keys`` on the runner. The mock compiled graph's ``ainvoke`` is a - side-effect coroutine that fires callbacks on the handler passed in via + Strategy: stub ``_build_graph`` to return a mock compiled graph whose + ``ainvoke`` fires callbacks on the handler the runner passes via ``config['callbacks']`` — the same handler the real LangGraph executor - would invoke. Each call fires events for only ``root-agent`` so we can + would invoke. Each call fires events for only ``root-agent`` so we can assert the second result's ``node_metrics`` reflects only the second run. """ graph = _make_graph() @@ -199,11 +207,11 @@ async def fire_callbacks(_payload, *, config): mock_lc_core_messages.HumanMessage = MagicMock(return_value=mock_human_message) runner = LangGraphAgentGraphRunner(graph, {}) - # Bypass _build_graph(): provide a pre-compiled graph and the node keys + # Stub _build_graph(): return a pre-compiled mock plus the node keys # that the callback handler would otherwise be initialised with. - runner._compiled = mock_compiled - runner._node_keys = {'root-agent'} - runner._fn_name_to_config_key = {} + runner._build_graph = MagicMock( # type: ignore[method-assign] + return_value=(mock_compiled, {}, {'root-agent'}), + ) with patch.dict('sys.modules', { 'langchain_core': MagicMock(), @@ -226,3 +234,259 @@ async def fire_callbacks(_payload, *, config): # Path and node_metrics keys reflect only the second invocation. assert second.metrics.path == ['root-agent'] assert set(second.metrics.node_metrics.keys()) == {'root-agent'} + + +# --- _maybe_record_eval_request unit tests --- + +def _msg(content): + m = MagicMock() + m.content = content + return m + + +def _response(content, tool_calls=None): + r = MagicMock() + r.content = content + r.tool_calls = tool_calls + return r + + +def test_maybe_record_eval_request_emits_for_plain_response(): + out = [] + _maybe_record_eval_request( + out, + node_key='root', + msgs=[_msg('user prompt')], + response=_response('final answer'), + handoff_tool_names=frozenset(), + ) + assert len(out) == 1 + req = out[0] + assert isinstance(req, EvalRequest) + assert req.node_key == 'root' + assert req.input == 'user prompt' + assert req.output == 'final answer' + + +def test_maybe_record_eval_request_skips_when_response_has_functional_tool_call(): + out = [] + _maybe_record_eval_request( + out, + node_key='root', + msgs=[_msg('user prompt')], + response=_response('', tool_calls=[{'name': 'search', 'args': {}}]), + handoff_tool_names=frozenset(['transfer_to_x']), + ) + assert out == [] + + +def test_maybe_record_eval_request_emits_when_only_handoff_tool_calls(): + out = [] + _maybe_record_eval_request( + out, + node_key='root', + msgs=[_msg('user prompt')], + response=_response( + 'handing off now', + tool_calls=[{'name': 'transfer_to_specialist', 'args': {}}], + ), + handoff_tool_names=frozenset(['transfer_to_specialist']), + ) + assert len(out) == 1 + assert out[0].output == 'handing off now' + + +def test_maybe_record_eval_request_skips_when_output_is_blank(): + out = [] + _maybe_record_eval_request( + out, + node_key='root', + msgs=[_msg('user prompt')], + response=_response(' '), + handoff_tool_names=frozenset(), + ) + assert out == [] + + +def test_maybe_record_eval_request_joins_msgs_with_crlf(): + out = [] + _maybe_record_eval_request( + out, + node_key='root', + msgs=[_msg('system'), _msg('user')], + response=_response('answer'), + handoff_tool_names=frozenset(), + ) + assert out[0].input == 'system\r\nuser' + + +# --- Runner-level eval_requests behavior --- + + +def _make_graph_with_judge(node_keys_with_judges=None) -> AgentGraphDefinition: + """Build a 2-node graph (root -> specialist) with judges optionally configured.""" + if node_keys_with_judges is None: + node_keys_with_judges = {'root-agent'} + graph_tracker = MagicMock() + + def _agent_config(key: str) -> AIAgentConfig: + jc = ( + JudgeConfiguration(judges=[JudgeConfiguration.Judge(key='j1', sampling_rate=1.0)]) + if key in node_keys_with_judges + else None + ) + return AIAgentConfig( + key=key, + enabled=True, + create_tracker=MagicMock(return_value=MagicMock()), + model=ModelConfig(name='gpt-4'), + provider=ProviderConfig(name='openai'), + instructions=f'You are {key}.', + evaluator=Evaluator.noop(), + judge_configuration=jc, + ) + + from ldai.models import Edge + graph_config = AIAgentGraphConfig( + key='judge-graph', + root_config_key='root-agent', + edges=[Edge(key='e1', source_config='root-agent', target_config='specialist-agent')], + enabled=True, + ) + configs = { + 'root-agent': _agent_config('root-agent'), + 'specialist-agent': _agent_config('specialist-agent'), + } + nodes = AgentGraphDefinition.build_nodes(graph_config, configs) + return AgentGraphDefinition( + agent_graph=graph_config, + nodes=nodes, + context=MagicMock(), + enabled=True, + create_tracker=lambda: graph_tracker, + ) + + +@pytest.mark.asyncio +async def test_runner_eval_requests_absent_when_no_judges_configured(): + """A graph whose nodes have no judge_configuration must produce no EvalRequests.""" + graph = _make_graph() # nodes use Evaluator.noop() and no judge_configuration + + mock_message = MagicMock() + mock_message.content = "answer" + mock_message.usage_metadata = None + mock_message.response_metadata = None + + async def fire(_payload, *, config): + return {'messages': [mock_message]} + + mock_compiled = MagicMock() + mock_compiled.ainvoke = AsyncMock(side_effect=fire) + + mock_human_message = MagicMock() + mock_lc_core_messages = MagicMock() + mock_lc_core_messages.HumanMessage = MagicMock(return_value=mock_human_message) + + runner = LangGraphAgentGraphRunner(graph, {}) + runner._build_graph = MagicMock( # type: ignore[method-assign] + return_value=(mock_compiled, {}, {'root-agent'}), + ) + + with patch.dict('sys.modules', { + 'langchain_core': MagicMock(), + 'langchain_core.messages': mock_lc_core_messages, + }): + result = await runner.run("hello") + + assert result.eval_requests is None or result.eval_requests == [] + + +@pytest.mark.asyncio +async def test_runner_eval_requests_populated_for_node_with_judges(): + """When a node has judges configured, its activation emits an EvalRequest with input/output captured.""" + graph = _make_graph_with_judge(node_keys_with_judges={'root-agent'}) + + captured_eval_requests = [] + + def fake_build_graph(eval_requests_list): + # Capture the runner-provided list and emulate the closure appending to it. + captured_eval_requests.append(eval_requests_list) + mock_compiled = MagicMock() + + async def fire(_payload, *, config): + # Pretend the LangGraph node closure invoked _maybe_record_eval_request. + eval_requests_list.append( + EvalRequest(node_key='root-agent', input='hello', output='final answer') + ) + mock_message = MagicMock() + mock_message.content = "final answer" + mock_message.usage_metadata = None + mock_message.response_metadata = None + return {'messages': [mock_message]} + + mock_compiled.ainvoke = AsyncMock(side_effect=fire) + return mock_compiled, {}, {'root-agent'} + + mock_human_message = MagicMock() + mock_lc_core_messages = MagicMock() + mock_lc_core_messages.HumanMessage = MagicMock(return_value=mock_human_message) + + runner = LangGraphAgentGraphRunner(graph, {}) + runner._build_graph = fake_build_graph # type: ignore[method-assign] + + with patch.dict('sys.modules', { + 'langchain_core': MagicMock(), + 'langchain_core.messages': mock_lc_core_messages, + }): + result = await runner.run("hello") + + assert result.eval_requests is not None + assert len(result.eval_requests) == 1 + assert result.eval_requests[0].node_key == 'root-agent' + assert result.eval_requests[0].input == 'hello' + assert result.eval_requests[0].output == 'final answer' + + +@pytest.mark.asyncio +async def test_runner_eval_requests_isolated_between_runs(): + """Concurrent / successive runs must each receive a fresh eval_requests list.""" + graph = _make_graph_with_judge(node_keys_with_judges={'root-agent'}) + + seen_lists = [] + + def fake_build_graph(eval_requests_list): + seen_lists.append(id(eval_requests_list)) + mock_compiled = MagicMock() + + async def fire(_payload, *, config): + eval_requests_list.append( + EvalRequest(node_key='root-agent', input='x', output='y') + ) + mock_message = MagicMock() + mock_message.content = "y" + mock_message.usage_metadata = None + mock_message.response_metadata = None + return {'messages': [mock_message]} + + mock_compiled.ainvoke = AsyncMock(side_effect=fire) + return mock_compiled, {}, {'root-agent'} + + mock_human_message = MagicMock() + mock_lc_core_messages = MagicMock() + mock_lc_core_messages.HumanMessage = MagicMock(return_value=mock_human_message) + + runner = LangGraphAgentGraphRunner(graph, {}) + runner._build_graph = fake_build_graph # type: ignore[method-assign] + + with patch.dict('sys.modules', { + 'langchain_core': MagicMock(), + 'langchain_core.messages': mock_lc_core_messages, + }): + first = await runner.run("a") + second = await runner.run("b") + + # Each call gets a fresh list object. + assert len(seen_lists) == 2 + assert seen_lists[0] != seen_lists[1] + assert len(first.eval_requests) == 1 + assert len(second.eval_requests) == 1 diff --git a/packages/ai-providers/server-ai-openai/src/ldai_openai/openai_agent_graph_runner.py b/packages/ai-providers/server-ai-openai/src/ldai_openai/openai_agent_graph_runner.py index 4347d3d9..d2ed07a5 100644 --- a/packages/ai-providers/server-ai-openai/src/ldai_openai/openai_agent_graph_runner.py +++ b/packages/ai-providers/server-ai-openai/src/ldai_openai/openai_agent_graph_runner.py @@ -5,7 +5,12 @@ from ldai import log from ldai.agent_graph import AgentGraphDefinition, AgentGraphNode from ldai.providers import AgentGraphRunner, ToolRegistry -from ldai.providers.types import AgentGraphRunnerResult, AIGraphMetrics, LDAIMetrics +from ldai.providers.types import ( + AgentGraphRunnerResult, + AIGraphMetrics, + EvalRequest, + LDAIMetrics, +) from ldai_openai.openai_helper import ( extract_usage_from_request_entry, @@ -91,6 +96,7 @@ async def run(self, input: str) -> AgentGraphRunnerResult: result = await Runner.run(root_agent, input) self._flush_final_segment(state, result) self._collect_tool_calls(result) + eval_requests = self._extract_eval_requests(result, input) duration_ms = (time.perf_counter_ns() - start_ns) // 1_000_000 token_usage = get_ai_usage_from_response(result) @@ -105,6 +111,7 @@ async def run(self, input: str) -> AgentGraphRunnerResult: tokens=token_usage, node_metrics=self._node_metrics, ), + eval_requests=eval_requests if eval_requests else None, ) except Exception as exc: if isinstance(exc, ImportError): @@ -270,6 +277,105 @@ def _flush_final_segment(self, state: _RunState, result: Any) -> None: except Exception: pass + def _extract_eval_requests( + self, result: Any, user_input: str + ) -> List[EvalRequest]: + """ + Extract per-node input/output pairs from a finished run result. + + Walks ``result.new_items`` in order. Each ``MessageOutputItem`` is the + text produced by an agent on a particular activation; when followed by + a ``HandoffOutputItem`` (or end of run) the message is treated as that + agent's final output. Nodes whose ``AIAgentConfig`` has no + ``judge_configuration`` with at least one judge contribute no entries. + + Returns an empty list when no items match (import failure, no + configured judges, empty outputs, etc.). + """ + try: + from agents.items import ( + HandoffOutputItem, + ItemHelpers, + MessageOutputItem, + ) + except ImportError: + return [] + + new_items = getattr(result, 'new_items', None) or [] + requests: List[EvalRequest] = [] + + # Last MessageOutputItem text seen per agent name (sanitized). + last_output_by_agent: Dict[str, str] = {} + # Pending node activations waiting to be flushed when the agent + # transitions away (via handoff) or the run ends. + pending: List[tuple] = [] # list of (node_key, input_text) + current_agent_name: Any = None + # Prompt the next activation will receive. Starts as the user input; + # after a handoff becomes the source agent's last message — that is + # what the target agent sees as its trigger. + current_input: str = user_input + + for item in new_items: + if isinstance(item, MessageOutputItem): + agent_name = getattr(item.agent, 'name', None) + if not agent_name: + continue + if current_agent_name != agent_name: + # New activation — record input for this node. + node_key = self._agent_name_map.get(agent_name, agent_name) + pending.append((node_key, current_input)) + current_agent_name = agent_name + text = ItemHelpers.extract_text(item.raw_item) or '' + last_output_by_agent[agent_name] = text + elif isinstance(item, HandoffOutputItem): + src_name = getattr(item.source_agent, 'name', None) + if src_name and src_name in last_output_by_agent: + src_output = last_output_by_agent[src_name] + self._flush_eval_request(requests, pending, src_name, src_output) + # The target agent's input is the source agent's final + # output that triggered the handoff. + current_input = src_output or current_input + current_agent_name = None + + # Flush any agent activation that did not end in a handoff (end of run). + if current_agent_name is not None: + final_output = last_output_by_agent.get(current_agent_name) or str( + getattr(result, 'final_output', '') or '' + ) + self._flush_eval_request( + requests, pending, current_agent_name, final_output + ) + + return requests + + def _flush_eval_request( + self, + out: List[EvalRequest], + pending: List[tuple], + agent_name: str, + output_text: str, + ) -> None: + """Append the pending activation for ``agent_name`` if the node has judges and output is non-empty.""" + if not pending: + return + node_key, input_text = pending.pop() + if not output_text or not output_text.strip(): + return + node = self._graph.get_node(node_key) + if node is None: + return + cfg = node.get_config() + jc = getattr(cfg, 'judge_configuration', None) + if jc is None or not getattr(jc, 'judges', None): + return + out.append( + EvalRequest( + node_key=node_key, + input=input_text, + output=output_text, + ) + ) + def _collect_tool_calls(self, result: Any) -> None: """Collect all tool calls from the run result, attributed to the node that called them.""" for agent_name, tool_fn_name in get_tool_calls_from_run_items(result.new_items): diff --git a/packages/ai-providers/server-ai-openai/tests/test_openai_agent_graph_runner.py b/packages/ai-providers/server-ai-openai/tests/test_openai_agent_graph_runner.py index f6b03b51..72baa4fe 100644 --- a/packages/ai-providers/server-ai-openai/tests/test_openai_agent_graph_runner.py +++ b/packages/ai-providers/server-ai-openai/tests/test_openai_agent_graph_runner.py @@ -4,9 +4,16 @@ from unittest.mock import MagicMock, AsyncMock, patch from ldai.agent_graph import AgentGraphDefinition -from ldai.models import AIAgentGraphConfig, AIAgentConfig, Edge, ModelConfig, ProviderConfig +from ldai.models import ( + AIAgentConfig, + AIAgentGraphConfig, + Edge, + JudgeConfiguration, + ModelConfig, + ProviderConfig, +) from ldai.providers import ToolRegistry -from ldai.providers.types import AgentGraphRunnerResult, AIGraphMetrics +from ldai.providers.types import AgentGraphRunnerResult, AIGraphMetrics, EvalRequest from ldai_openai.openai_agent_graph_runner import OpenAIAgentGraphRunner from ldai_openai.openai_runner_factory import OpenAIRunnerFactory from ldai.evaluator import Evaluator @@ -224,3 +231,242 @@ async def test_openai_agent_graph_runner_run_resets_node_metrics_between_runs(): assert second.metrics.success is True assert second.metrics.node_metrics['root-agent'].success is True assert second.metrics.node_metrics['root-agent'] is not failed_metrics + + +# --- _extract_eval_requests unit tests --- + + +def _make_judge_graph(node_keys_with_judges=None) -> AgentGraphDefinition: + """Build a 2-node graph (root -> specialist) with judges optionally configured.""" + if node_keys_with_judges is None: + node_keys_with_judges = {'root-agent'} + + def _agent_config(key: str) -> AIAgentConfig: + jc = ( + JudgeConfiguration(judges=[JudgeConfiguration.Judge(key='j1', sampling_rate=1.0)]) + if key in node_keys_with_judges + else None + ) + return AIAgentConfig( + key=key, + enabled=True, + create_tracker=MagicMock(), + model=ModelConfig(name='gpt-4'), + provider=ProviderConfig(name='openai'), + instructions=f'You are {key}.', + evaluator=Evaluator.noop(), + judge_configuration=jc, + ) + + graph_config = AIAgentGraphConfig( + key='judge-graph', + root_config_key='root-agent', + edges=[Edge(key='e1', source_config='root-agent', target_config='specialist-agent')], + enabled=True, + ) + configs = { + 'root-agent': _agent_config('root-agent'), + 'specialist-agent': _agent_config('specialist-agent'), + } + nodes = AgentGraphDefinition.build_nodes(graph_config, configs) + return AgentGraphDefinition( + agent_graph=graph_config, + nodes=nodes, + context=MagicMock(), + enabled=True, + create_tracker=MagicMock(), + ) + + +def _stub_message_item(agent_name: str, text: str): + """Build a stand-in for a MessageOutputItem (just enough for _extract_eval_requests).""" + from agents.items import MessageOutputItem + from openai.types.responses import ResponseOutputMessage, ResponseOutputText + + raw = ResponseOutputMessage( + id='m', + content=[ResponseOutputText(annotations=[], text=text, type='output_text')], + role='assistant', + status='completed', + type='message', + ) + agent = MagicMock() + agent.name = agent_name + return MessageOutputItem(agent=agent, raw_item=raw) + + +def _stub_handoff_item(source_name: str, target_name: str): + """Build a stand-in for a HandoffOutputItem.""" + from agents.items import HandoffOutputItem + + src = MagicMock() + src.name = source_name + tgt = MagicMock() + tgt.name = target_name + return HandoffOutputItem( + agent=src, + raw_item={'type': 'function_call_output', 'call_id': 'c', 'output': ''}, + source_agent=src, + target_agent=tgt, + ) + + +def test_extract_eval_requests_single_node_with_judges(): + graph = _make_judge_graph(node_keys_with_judges={'root-agent'}) + runner = OpenAIAgentGraphRunner(graph, {}) + runner._agent_name_map = {'root_agent': 'root-agent'} + + result = MagicMock() + result.new_items = [ + _stub_message_item('root_agent', 'final answer'), + ] + result.final_output = 'final answer' + + requests = runner._extract_eval_requests(result, 'user input') + assert len(requests) == 1 + assert requests[0].node_key == 'root-agent' + assert requests[0].input == 'user input' + assert requests[0].output == 'final answer' + + +def test_extract_eval_requests_handoff_chain(): + """Root hands off to specialist; both nodes have judges → two eval requests.""" + graph = _make_judge_graph(node_keys_with_judges={'root-agent', 'specialist-agent'}) + runner = OpenAIAgentGraphRunner(graph, {}) + runner._agent_name_map = { + 'root_agent': 'root-agent', + 'specialist_agent': 'specialist-agent', + } + + result = MagicMock() + result.new_items = [ + _stub_message_item('root_agent', 'I will hand this off'), + _stub_handoff_item('root_agent', 'specialist_agent'), + _stub_message_item('specialist_agent', 'specialist answer'), + ] + result.final_output = 'specialist answer' + + requests = runner._extract_eval_requests(result, 'user input') + assert len(requests) == 2 + + by_key = {r.node_key: r for r in requests} + assert by_key['root-agent'].input == 'user input' + assert by_key['root-agent'].output == 'I will hand this off' + assert by_key['specialist-agent'].input == 'I will hand this off' + assert by_key['specialist-agent'].output == 'specialist answer' + + +def test_extract_eval_requests_skips_nodes_without_judges(): + """Only the root has judges configured; specialist must contribute nothing.""" + graph = _make_judge_graph(node_keys_with_judges={'root-agent'}) + runner = OpenAIAgentGraphRunner(graph, {}) + runner._agent_name_map = { + 'root_agent': 'root-agent', + 'specialist_agent': 'specialist-agent', + } + + result = MagicMock() + result.new_items = [ + _stub_message_item('root_agent', 'hand off'), + _stub_handoff_item('root_agent', 'specialist_agent'), + _stub_message_item('specialist_agent', 'answer'), + ] + result.final_output = 'answer' + + requests = runner._extract_eval_requests(result, 'user') + assert len(requests) == 1 + assert requests[0].node_key == 'root-agent' + + +def test_extract_eval_requests_empty_when_no_judges(): + graph = _make_judge_graph(node_keys_with_judges=set()) + runner = OpenAIAgentGraphRunner(graph, {}) + runner._agent_name_map = {'root_agent': 'root-agent'} + + result = MagicMock() + result.new_items = [_stub_message_item('root_agent', 'answer')] + result.final_output = 'answer' + + assert runner._extract_eval_requests(result, 'user') == [] + + +def test_extract_eval_requests_empty_when_no_items(): + graph = _make_judge_graph() + runner = OpenAIAgentGraphRunner(graph, {}) + runner._agent_name_map = {} + + result = MagicMock() + result.new_items = [] + result.final_output = '' + + assert runner._extract_eval_requests(result, 'user') == [] + + +@pytest.mark.asyncio +async def test_run_populates_eval_requests_for_judge_nodes(): + """End-to-end: run() must attach eval_requests to AgentGraphRunnerResult.""" + graph = _make_judge_graph(node_keys_with_judges={'root-agent'}) + + mock_result = MagicMock() + mock_result.final_output = "answer" + mock_result.context_wrapper.usage.request_usage_entries = [] + + # Build a single MessageOutputItem-like new_item. + item = _stub_message_item('root_agent', 'answer') + mock_result.new_items = [item] + + mock_agents = MagicMock() + mock_agents.Runner.run = AsyncMock(return_value=mock_result) + mock_agents.Agent = MagicMock(return_value=MagicMock()) + mock_agents.Handoff = MagicMock() + mock_agents.handoff = MagicMock() + + mock_agents_ext = MagicMock() + mock_agents_ext.RECOMMENDED_PROMPT_PREFIX = '[PREFIX]' + + with patch.dict('sys.modules', { + 'agents': mock_agents, + 'agents.extensions': MagicMock(), + 'agents.extensions.handoff_prompt': mock_agents_ext, + 'agents.tool_context': MagicMock(), + }): + runner = OpenAIAgentGraphRunner(graph, {}) + # Force the sanitized agent-name -> LD key mapping so extraction works + # without re-running the full build path under heavy mocking. + runner._agent_name_map = {'root_agent': 'root-agent'} + result = await runner.run("user input") + + assert result.eval_requests is not None + assert len(result.eval_requests) == 1 + assert result.eval_requests[0].node_key == 'root-agent' + + +@pytest.mark.asyncio +async def test_run_does_not_populate_eval_requests_without_judges(): + graph = _make_graph() # default _make_graph has no judges + + mock_result = MagicMock() + mock_result.final_output = "answer" + mock_result.context_wrapper.usage.request_usage_entries = [] + mock_result.new_items = [_stub_message_item('root_agent', 'answer')] + + mock_agents = MagicMock() + mock_agents.Runner.run = AsyncMock(return_value=mock_result) + mock_agents.Agent = MagicMock(return_value=MagicMock()) + mock_agents.Handoff = MagicMock() + mock_agents.handoff = MagicMock() + + mock_agents_ext = MagicMock() + mock_agents_ext.RECOMMENDED_PROMPT_PREFIX = '[PREFIX]' + + with patch.dict('sys.modules', { + 'agents': mock_agents, + 'agents.extensions': MagicMock(), + 'agents.extensions.handoff_prompt': mock_agents_ext, + 'agents.tool_context': MagicMock(), + }): + runner = OpenAIAgentGraphRunner(graph, {}) + runner._agent_name_map = {'root_agent': 'root-agent'} + result = await runner.run("user") + + assert result.eval_requests is None or result.eval_requests == [] diff --git a/packages/sdk/server-ai/src/ldai/__init__.py b/packages/sdk/server-ai/src/ldai/__init__.py index d95bb706..a5a0edad 100644 --- a/packages/sdk/server-ai/src/ldai/__init__.py +++ b/packages/sdk/server-ai/src/ldai/__init__.py @@ -31,6 +31,7 @@ AgentGraphRunnerResult, AIGraphMetrics, AIGraphMetricSummary, + EvalRequest, ManagedGraphResult, ManagedResult, Runner, @@ -47,6 +48,7 @@ 'AgentGraphRunnerResult', 'AIGraphMetrics', 'AIGraphMetricSummary', + 'EvalRequest', 'ManagedGraphResult', 'ManagedResult', 'Runner', diff --git a/packages/sdk/server-ai/src/ldai/managed_agent_graph.py b/packages/sdk/server-ai/src/ldai/managed_agent_graph.py index 73a02ce6..7b940e11 100644 --- a/packages/sdk/server-ai/src/ldai/managed_agent_graph.py +++ b/packages/sdk/server-ai/src/ldai/managed_agent_graph.py @@ -1,11 +1,14 @@ """ManagedAgentGraph — LaunchDarkly managed wrapper for agent graph execution.""" -from typing import Dict +import asyncio +from typing import Dict, List +from ldai import log from ldai.agent_graph import AgentGraphDefinition from ldai.providers import AgentGraphRunner from ldai.providers.types import ( AgentGraphRunnerResult, + JudgeResult, LDAIMetrics, ManagedGraphResult, ) @@ -58,13 +61,58 @@ async def run(self, input: str) -> ManagedGraphResult: summary = graph_tracker.get_summary() summary.node_metrics = self._track_node_metrics(result.metrics.node_metrics) + evaluations_task = self._track_judge_results(result) + return ManagedGraphResult( content=result.content, metrics=summary, + evaluations=evaluations_task, raw=result.raw, - evaluations=None, ) + def _track_judge_results( + self, + runner_result: AgentGraphRunnerResult, + ) -> asyncio.Task[List[JudgeResult]]: + """ + Start judge evaluations for every captured :class:`EvalRequest` as a + single background asyncio Task and return it. + + The returned task awaits each node's evaluator, fires + ``track_judge_result`` on the matching per-node tracker for every + sampled+successful judge result, and resolves to the combined list of + :class:`JudgeResult` across all nodes. When the runner produced no + eval requests, the task resolves immediately to an empty list. + """ + eval_requests = runner_result.eval_requests or [] + + async def _run_and_track() -> List[JudgeResult]: + combined: List[JudgeResult] = [] + for req in eval_requests: + node = self._graph.get_node(req.node_key) + if node is None: + continue + node_config = node.get_config() + eval_task = node_config.evaluator.evaluate(req.input, req.output) + results = await eval_task + if not results: + continue + node_tracker = node_config.create_tracker() + for r in results: + combined.append(r) + if not r.sampled: + continue + if r.success: + try: + node_tracker.track_judge_result(r) + except Exception as exc: + log.warning("Judge evaluation failed: %s", exc) + else: + log.warning("Judge evaluation failed: %s", r.error_message) + return combined + + return asyncio.create_task(_run_and_track()) + def _track_node_metrics( self, node_metrics: Dict[str, LDAIMetrics] ) -> Dict[str, LDAIMetricSummary]: diff --git a/packages/sdk/server-ai/src/ldai/providers/__init__.py b/packages/sdk/server-ai/src/ldai/providers/__init__.py index 91bdb3fa..e1eb00f6 100644 --- a/packages/sdk/server-ai/src/ldai/providers/__init__.py +++ b/packages/sdk/server-ai/src/ldai/providers/__init__.py @@ -6,6 +6,7 @@ AgentGraphRunnerResult, AIGraphMetrics, AIGraphMetricSummary, + EvalRequest, JudgeResult, LDAIMetrics, ManagedGraphResult, @@ -20,6 +21,7 @@ 'AgentGraphRunnerResult', 'AIGraphMetrics', 'AIGraphMetricSummary', + 'EvalRequest', 'JudgeResult', 'LDAIMetrics', 'ManagedGraphResult', diff --git a/packages/sdk/server-ai/src/ldai/providers/types.py b/packages/sdk/server-ai/src/ldai/providers/types.py index bd3bf23b..b5fd1bb1 100644 --- a/packages/sdk/server-ai/src/ldai/providers/types.py +++ b/packages/sdk/server-ai/src/ldai/providers/types.py @@ -86,6 +86,29 @@ class ManagedResult: """Optional asyncio Task that resolves to the list of :class:`JudgeResult` instances when awaited.""" +@dataclass +class EvalRequest: + """ + Per-node input/output pair captured by an :class:`AgentGraphRunner` for later judge evaluation. + + The agent graph runner is the only layer with access to a node's prompt and + response, but it does not invoke judges itself. Instead it records one + ``EvalRequest`` per executed node whose ``AIAgentConfig`` has a + ``judge_configuration`` so the managed layer + (:class:`~ldai.managed_agent_graph.ManagedAgentGraph`) can fire evaluations + as background work after the runner returns. + """ + + node_key: str + """The config key of the node being evaluated.""" + + input: str + """The prompt or input the node received.""" + + output: str + """The content the node produced.""" + + @dataclass class AIGraphMetrics: """Contains raw metrics from a single agent graph run.""" @@ -131,7 +154,7 @@ class AIGraphMetricSummary: @dataclass class ManagedGraphResult: - """Contains the result of a managed agent graph run, including metrics and optional judge evaluations.""" + """Contains the result of a managed agent graph run, including metrics and judge evaluations.""" content: str """The graph's final output content.""" @@ -139,12 +162,18 @@ class ManagedGraphResult: metrics: AIGraphMetricSummary """Aggregated metric summary from the graph tracker for this run.""" + evaluations: asyncio.Task[List[JudgeResult]] + """ + An asyncio Task that resolves to the combined list of :class:`JudgeResult` + instances across every node that had judges configured. The managed layer + always populates this field; when no nodes had judges (or + ``AgentGraphRunnerResult.eval_requests`` was absent or empty) the task + resolves immediately to an empty list. + """ + raw: Optional[Any] = None """Optional provider-native response object for advanced consumers.""" - evaluations: Optional[asyncio.Task[List[JudgeResult]]] = None - """Optional asyncio Task that resolves to the list of :class:`JudgeResult` instances when awaited.""" - @dataclass class AgentGraphRunnerResult: @@ -159,6 +188,15 @@ class AgentGraphRunnerResult: raw: Optional[Any] = None """Optional provider-native response object for advanced consumers.""" + eval_requests: Optional[List[EvalRequest]] = None + """ + Per-node input/output pairs captured for nodes that have judges configured. + + The managed layer consumes these to fire judge evaluations after the + runner returns. Absent or empty when no executed node has a + ``judge_configuration`` with at least one judge. + """ + @dataclass class JudgeResult: diff --git a/packages/sdk/server-ai/tests/test_managed_agent_graph.py b/packages/sdk/server-ai/tests/test_managed_agent_graph.py index 1de164dc..f444c9d7 100644 --- a/packages/sdk/server-ai/tests/test_managed_agent_graph.py +++ b/packages/sdk/server-ai/tests/test_managed_agent_graph.py @@ -1,5 +1,6 @@ """Tests for ManagedAgentGraph and LDAIClient.create_agent_graph().""" +import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -12,6 +13,8 @@ AgentGraphRunnerResult, AIGraphMetrics, AIGraphMetricSummary, + EvalRequest, + JudgeResult, LDAIMetrics, ) from ldai.tracker import TokenUsage @@ -232,6 +235,186 @@ async def run(self, input) -> AgentGraphRunnerResult: mock_tracker.track_graph_metrics_of_async.assert_called_once() +# --- evaluations awaitable --- + + +class _StubRunnerWithEvalRequests(AgentGraphRunner): + """Runner that produces eval_requests for one or more nodes.""" + def __init__(self, eval_requests): + self._eval_requests = eval_requests + + async def run(self, input) -> AgentGraphRunnerResult: + return AgentGraphRunnerResult( + content='final output', + metrics=AIGraphMetrics(success=True), + eval_requests=self._eval_requests, + raw={'input': input}, + ) + + +def _make_node_with_evaluator(node_tracker, eval_results): + """Build a mock node whose config.evaluator.evaluate returns a Task resolving to eval_results.""" + cfg = MagicMock() + cfg.create_tracker = MagicMock(return_value=node_tracker) + + async def fake_evaluate_coro(): + return list(eval_results) + + def fake_evaluate(input_text, output_text): + return asyncio.create_task(fake_evaluate_coro()) + + cfg.evaluator.evaluate = fake_evaluate + node = MagicMock() + node.get_config = MagicMock(return_value=cfg) + return node + + +@pytest.mark.asyncio +async def test_evaluations_field_is_always_a_task(): + """ManagedGraphResult.evaluations is always an asyncio.Task, never None.""" + runner = StubAgentGraphRunner() + runner_result = await runner.run('input') + + mock_graph = MagicMock() + mock_graph.create_tracker = MagicMock(return_value=_make_graph_tracker_mock(runner_result)) + mock_graph.get_node = MagicMock(return_value=None) + + managed = ManagedAgentGraph(mock_graph, runner) + result = await managed.run('input') + assert result.evaluations is not None + assert isinstance(result.evaluations, asyncio.Task) + + +@pytest.mark.asyncio +async def test_evaluations_resolves_to_empty_when_no_eval_requests(): + """eval_requests absent → evaluations awaitable resolves to empty list.""" + runner = _StubRunnerWithEvalRequests(eval_requests=None) + runner_result = await runner.run('input') + + mock_graph = MagicMock() + mock_graph.create_tracker = MagicMock(return_value=_make_graph_tracker_mock(runner_result)) + mock_graph.get_node = MagicMock(return_value=None) + + managed = ManagedAgentGraph(mock_graph, runner) + result = await managed.run('input') + results = await result.evaluations + assert results == [] + + +@pytest.mark.asyncio +async def test_evaluations_resolves_to_empty_when_eval_requests_empty(): + """eval_requests empty list → evaluations awaitable resolves to empty list.""" + runner = _StubRunnerWithEvalRequests(eval_requests=[]) + runner_result = await runner.run('input') + + mock_graph = MagicMock() + mock_graph.create_tracker = MagicMock(return_value=_make_graph_tracker_mock(runner_result)) + mock_graph.get_node = MagicMock(return_value=None) + + managed = ManagedAgentGraph(mock_graph, runner) + result = await managed.run('input') + results = await result.evaluations + assert results == [] + + +@pytest.mark.asyncio +async def test_evaluations_dispatches_and_tracks_per_node(): + """Each EvalRequest is dispatched to its node's evaluator and tracked on the per-node tracker.""" + eval_requests = [ + EvalRequest(node_key='root', input='hi', output='hello'), + EvalRequest(node_key='specialist', input='hello', output='specialist out'), + ] + runner = _StubRunnerWithEvalRequests(eval_requests=eval_requests) + runner_result = await runner.run('input') + + mock_graph = MagicMock() + mock_graph.create_tracker = MagicMock(return_value=_make_graph_tracker_mock(runner_result)) + + root_tracker = MagicMock() + specialist_tracker = MagicMock() + root_results = [JudgeResult(judge_config_key='j1', success=True, sampled=True, score=0.9)] + specialist_results = [JudgeResult(judge_config_key='j2', success=True, sampled=True, score=0.7)] + + root_node = _make_node_with_evaluator(root_tracker, root_results) + specialist_node = _make_node_with_evaluator(specialist_tracker, specialist_results) + + def get_node(key): + return {'root': root_node, 'specialist': specialist_node}.get(key) + + mock_graph.get_node = get_node + + managed = ManagedAgentGraph(mock_graph, runner) + result = await managed.run('input') + + combined = await result.evaluations + assert len(combined) == 2 + keys = {r.judge_config_key for r in combined} + assert keys == {'j1', 'j2'} + + root_tracker.track_judge_result.assert_called_once_with(root_results[0]) + specialist_tracker.track_judge_result.assert_called_once_with(specialist_results[0]) + + +@pytest.mark.asyncio +async def test_evaluations_skips_unsampled_results(): + """Unsampled judge results are returned but not tracked.""" + eval_requests = [EvalRequest(node_key='root', input='hi', output='hello')] + runner = _StubRunnerWithEvalRequests(eval_requests=eval_requests) + runner_result = await runner.run('input') + + mock_graph = MagicMock() + mock_graph.create_tracker = MagicMock(return_value=_make_graph_tracker_mock(runner_result)) + + root_tracker = MagicMock() + root_results = [JudgeResult(judge_config_key='j1', success=True, sampled=False)] + root_node = _make_node_with_evaluator(root_tracker, root_results) + mock_graph.get_node = MagicMock(return_value=root_node) + + managed = ManagedAgentGraph(mock_graph, runner) + result = await managed.run('input') + + combined = await result.evaluations + assert combined == root_results + root_tracker.track_judge_result.assert_not_called() + + +@pytest.mark.asyncio +async def test_run_returns_before_evaluations_resolve(): + """run() must return without awaiting the evaluations task.""" + eval_started = asyncio.Event() + eval_finish = asyncio.Event() + + async def slow_evaluate_coro(): + eval_started.set() + await eval_finish.wait() + return [JudgeResult(judge_config_key='j1', success=True, sampled=True, score=0.5)] + + eval_requests = [EvalRequest(node_key='root', input='hi', output='hello')] + runner = _StubRunnerWithEvalRequests(eval_requests=eval_requests) + runner_result = await runner.run('input') + + mock_graph = MagicMock() + mock_graph.create_tracker = MagicMock(return_value=_make_graph_tracker_mock(runner_result)) + + root_tracker = MagicMock() + cfg = MagicMock() + cfg.create_tracker = MagicMock(return_value=root_tracker) + cfg.evaluator.evaluate = MagicMock(return_value=asyncio.create_task(slow_evaluate_coro())) + root_node = MagicMock() + root_node.get_config = MagicMock(return_value=cfg) + mock_graph.get_node = MagicMock(return_value=root_node) + + managed = ManagedAgentGraph(mock_graph, runner) + result = await managed.run('input') + + # run() returned before the evaluations task resolved. + assert not result.evaluations.done() + eval_finish.set() + combined = await result.evaluations + assert len(combined) == 1 + root_tracker.track_judge_result.assert_called_once() + + # --- LDAIClient.create_agent_graph() integration tests --- @pytest.fixture