From ed25fea907d77f8d38bb7d2f8acf7a130f8c382b Mon Sep 17 00:00:00 2001 From: guslegend <1670547022@qq.com> Date: Sat, 6 Jun 2026 10:08:19 +0800 Subject: [PATCH] fix: correct v2 stream event ordering --- .../java/io/agentscope/core/ReActAgent.java | 103 +++++++++++++----- .../agent/ReActAgentNewLoopReplyTest.java | 29 +++++ 2 files changed, 107 insertions(+), 25 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java b/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java index a9d4a8680..35e807c57 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java +++ b/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java @@ -1993,6 +1993,7 @@ private Flux modelCallStream( String replyId = UUID.randomUUID().toString().replace("-", ""); AtomicBoolean textStarted = new AtomicBoolean(false); AtomicBoolean thinkingStarted = new AtomicBoolean(false); + AtomicBoolean thinkingEnded = new AtomicBoolean(false); Set startedToolCalls = ConcurrentHashMap.newKeySet(); Flux modelEvents = @@ -2018,6 +2019,7 @@ private Flux modelCallStream( context, textStarted, thinkingStarted, + thinkingEnded, withToolEvents ? startedToolCalls : ConcurrentHashMap.newKeySet(), @@ -2033,9 +2035,8 @@ private Flux modelCallStream( if (textStarted.get()) { events.add(new TextBlockEndEvent(replyId, "text")); } - if (thinkingStarted.get()) { - events.add(new ThinkingBlockEndEvent(replyId, "thinking")); - } + emitThinkingBlockEndIfNeeded( + replyId, thinkingStarted, thinkingEnded, events); for (String toolId : startedToolCalls) { events.add(new ToolCallEndEvent(replyId, toolId)); } @@ -2052,9 +2053,14 @@ private void emitBlockEvents( ReasoningContext context, AtomicBoolean textStarted, AtomicBoolean thinkingStarted, + AtomicBoolean thinkingEnded, Set startedToolCalls, List events) { + if (!(block instanceof ThinkingBlock)) { + emitThinkingBlockEndIfNeeded(replyId, thinkingStarted, thinkingEnded, events); + } + if (block instanceof TextBlock tb) { if (textStarted.compareAndSet(false, true)) { events.add(new TextBlockStartEvent(replyId, "text")); @@ -2085,6 +2091,16 @@ private void emitBlockEvents( } } + private void emitThinkingBlockEndIfNeeded( + String replyId, + AtomicBoolean thinkingStarted, + AtomicBoolean thinkingEnded, + List events) { + if (thinkingStarted.get() && thinkingEnded.compareAndSet(false, true)) { + events.add(new ThinkingBlockEndEvent(replyId, "thinking")); + } + } + private String resolveToolCallId(ToolUseBlock tub, ReasoningContext context) { if (tub.getId() != null && !tub.getId().isEmpty()) { return tub.getId(); @@ -2301,6 +2317,7 @@ private Flux runToolBatch( List> deniedEntries = new ArrayList<>(); List approved = new ArrayList<>(); + Set emittedToolResultIds = ConcurrentHashMap.newKeySet(); for (ToolUseBlock tc : toolCalls) { if (deniedIds.contains(tc.getId())) { ToolResultBlock denied = @@ -2347,6 +2364,11 @@ private Flux runToolBatch( parentCtx -> Flux.create( sink -> { + sink.onDispose( + () -> + toolkit.setInternalChunkCallback( + null)); + for (ToolUseBlock tool : approved) { sink.next( new ToolResultStartEvent( @@ -2357,27 +2379,18 @@ private Flux runToolBatch( toolkit.setInternalChunkCallback( (toolUse, chunk) -> { - if (chunk.getOutput() != null) { + if (chunk.getOutput() != null + && !chunk.getOutput() + .isEmpty()) { + emittedToolResultIds.add( + toolUse.getId()); for (ContentBlock block : chunk.getOutput()) { - if (block - instanceof - TextBlock tb) { - sink.next( - new ToolResultTextDeltaEvent( - replyId, - toolUse - .getId(), - tb - .getText())); - } else { - sink.next( - new ToolResultDataDeltaEvent( - replyId, - toolUse - .getId(), - block)); - } + emitToolResultBlock( + sink, + replyId, + toolUse.getId(), + block); } } hookDispatcher @@ -2403,6 +2416,17 @@ private Flux runToolBatch( ToolUseBlock, ToolResultBlock> entry : results) { + if (emittedToolResultIds + .add( + entry.getKey() + .getId())) { + emitToolResultOutput( + sink, + replyId, + entry.getKey(), + entry.getValue() + .getOutput()); + } ToolResultState state = determineToolResultState( entry @@ -2422,6 +2446,28 @@ private Flux runToolBatch( return deniedEvents.concatWith(approvedEvents); } + private void emitToolResultOutput( + FluxSink sink, + String replyId, + ToolUseBlock toolUse, + List output) { + if (toolUse == null || output == null || output.isEmpty()) { + return; + } + for (ContentBlock block : output) { + emitToolResultBlock(sink, replyId, toolUse.getId(), block); + } + } + + private void emitToolResultBlock( + FluxSink sink, String replyId, String toolCallId, ContentBlock block) { + if (block instanceof TextBlock tb) { + sink.next(new ToolResultTextDeltaEvent(replyId, toolCallId, tb.getText())); + } else { + sink.next(new ToolResultDataDeltaEvent(replyId, toolCallId, block)); + } + } + /** * Outcome of running every {@link ToolBase} call through the {@link PermissionEngine}. * @@ -2838,6 +2884,7 @@ private Flux summaryModelCallStream( String replyId = UUID.randomUUID().toString().replace("-", ""); AtomicBoolean textStarted = new AtomicBoolean(false); AtomicBoolean thinkingStarted = new AtomicBoolean(false); + AtomicBoolean thinkingEnded = new AtomicBoolean(false); Flux modelEvents = mci.model().stream(mci.messages(), mci.tools(), mci.options()) @@ -2857,6 +2904,13 @@ private Flux summaryModelCallStream( List events = new ArrayList<>(); for (ContentBlock block : chunk.getContent()) { + if (!(block instanceof ThinkingBlock)) { + emitThinkingBlockEndIfNeeded( + replyId, + thinkingStarted, + thinkingEnded, + events); + } if (block instanceof TextBlock tb) { if (textStarted.compareAndSet(false, true)) { events.add( @@ -2895,9 +2949,8 @@ private Flux summaryModelCallStream( if (textStarted.get()) { events.add(new TextBlockEndEvent(replyId, "text")); } - if (thinkingStarted.get()) { - events.add(new ThinkingBlockEndEvent(replyId, "thinking")); - } + emitThinkingBlockEndIfNeeded( + replyId, thinkingStarted, thinkingEnded, events); events.add(new ModelCallEndEvent(replyId, context.getChatUsage())); return Flux.fromIterable(events); }); diff --git a/agentscope-core/src/test/java/io/agentscope/core/agent/ReActAgentNewLoopReplyTest.java b/agentscope-core/src/test/java/io/agentscope/core/agent/ReActAgentNewLoopReplyTest.java index 37f5d68a8..76fda455b 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/agent/ReActAgentNewLoopReplyTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/agent/ReActAgentNewLoopReplyTest.java @@ -20,8 +20,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import io.agentscope.core.ReActAgent; +import io.agentscope.core.agent.test.MockModel; import io.agentscope.core.event.AgentEndEvent; import io.agentscope.core.event.AgentEvent; +import io.agentscope.core.event.AgentEventType; import io.agentscope.core.event.AgentStartEvent; import io.agentscope.core.event.ExceedMaxItersEvent; import io.agentscope.core.event.ModelCallEndEvent; @@ -163,6 +165,33 @@ void textOnlyReplyEmitsExpectedEventOrder() { assertEquals(1L, modelEnds); } + @Test + void thinkingBlockEndsBeforeTextBlockStarts() { + ReActAgent agent = + ReActAgent.builder() + .name("asst") + .model(MockModel.withThinking("I am thinking.", "Final answer")) + .toolkit(new Toolkit()) + .build(); + + List events = agent.streamEvents(List.of()).collectList().block(); + assertNotNull(events); + + assertEquals( + List.of( + AgentEventType.AGENT_START, + AgentEventType.MODEL_CALL_START, + AgentEventType.THINKING_BLOCK_START, + AgentEventType.THINKING_BLOCK_DELTA, + AgentEventType.THINKING_BLOCK_END, + AgentEventType.TEXT_BLOCK_START, + AgentEventType.TEXT_BLOCK_DELTA, + AgentEventType.TEXT_BLOCK_END, + AgentEventType.MODEL_CALL_END, + AgentEventType.AGENT_END), + events.stream().map(AgentEvent::getType).toList()); + } + @Test void callResolvesToFinalAssistantMsg() { ChatModelBase model = new ScriptedModel(List.of(() -> Flux.just(textResponse("answer"))));