fix(agent): resolve serializeOnKey gate leak in Flux.create callbacks#1796
Conversation
When a middleware throws an Error during agent event streaming, the external Flux is cancelled but the internal lifecycle Mono subscription inside Flux.create survives independently, holding the per-session serializeOnKey gate open indefinitely. Subsequent calls on the same session then block forever. Fix: capture the Disposable from each internal .subscribe() call and register it via sink.onCancel(disposable) so the lifecycle subscription is disposed when the external Flux is cancelled. Affected locations: - ReActAgent.buildAgentStream() (line 800) - ReActAgent.executeApprovedTools() (line 2404) - AgentBase.createEventStream() (line 984) Adds unit test verifying gate release after middleware error.
AgentScopeJavaBot
left a comment
There was a problem hiding this comment.
🤖 AI Review
This PR fixes a serializeOnKey gate leak in Flux.create callbacks. When a custom MiddlewareBase throws an uncaught exception, the outer Flux is cancelled but the inner lifecycle Mono subscription continues holding the gate, blocking subsequent calls for the same session. The fix adds sink.onCancel(disposable) at three Flux.create locations, ensuring the inner subscription is disposed when the sink is cancelled. The root cause analysis is accurate, the fix follows Reactor best practices, and the test precisely reproduces the problem scenario. AgentBase hook cleanup is also improved to use doFinally instead of onDispose.
| "reasoning and modelCall enter counts must match"); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
[nit] Javadoc states throws an {@link Error} (not {@code Exception}) but the test actually throws a RuntimeException. Consider updating the Javadoc to match the actual code.
AgentScopeJavaBot
left a comment
There was a problem hiding this comment.
🤖 AI Review
This PR fixes a serializeOnKey gate leak in Flux.create callbacks. When a custom MiddlewareBase throws an uncaught exception, the outer Flux is cancelled but the inner lifecycle Mono subscription continues holding the gate, blocking subsequent calls for the same session. The fix adds sink.onCancel(disposable) at three Flux.create locations, ensuring the inner subscription is disposed when the sink is cancelled. The root cause analysis is accurate, the fix follows Reactor best practices, and the test precisely reproduces the problem scenario. AgentBase hook cleanup is also improved to use doFinally instead of onDispose.
| "reasoning and modelCall enter counts must match"); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
[nit] Javadoc states throws an {@link Error} (not {@code Exception}) but the test actually throws a RuntimeException. Consider updating the Javadoc to match the actual code.
…ncellation fix After agentscope-ai#1796 added sink.onCancel(lifecycleDisposable), cancellation now properly propagates to inner tool executions. The slow_tool's sleep is interrupted before it can write to the file, so expect 0 lines instead of 1.
Closes #1798
问题现象
当用户实现自定义
MiddlewareBase并在onAgent等方法中抛出未被捕获的异常时,streamEvents()返回的外部 Flux 进入 error 状态,但Flux.create内部的lifecycle.subscribe()创建的独立订阅继续运行,导致serializeOnKeygate 永不释放。同一 session 的后续调用永久阻塞。根因
ReActAgent.buildAgentStream()/AgentBase.createEventStream()中Flux.create内部.subscribe()返回的Disposable未保存,也未注册sink.onCancel(disposable)。外部 Flux cancel 无法传播到内部 lifecycle Mono 订阅。修复
三个
Flux.create位置各加一行:sink.onCancel(disposable)验证
mvn spotless:applymvn test -pl agentscope-core— 3395 tests, 0 failures