Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions detekt-baseline.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" ?>
<SmellBaseline>
<ManuallySuppressedIssues></ManuallySuppressedIssues>
<ManuallySuppressedIssues/>
<CurrentIssues>
<ID>ClassNaming:GeneratedMetaCacheTest.kt$MetaCacheBadTypeBaz__GeneratedSchema</ID>
<ID>ClassNaming:GeneratedMetaCacheTest.kt$MetaCacheCleanFoo__GeneratedSchema</ID>
Expand All @@ -13,11 +13,11 @@
<ID>ClassNaming:GeneratedSchemaLookupTest.kt$HandGeneratedUser__GeneratedSchema</ID>
<ID>ComplexCondition:ClaudeClient.kt$ClaudeClient$toolDefs.isNotEmpty() &amp;&amp; mainSystemHint != null &amp;&amp; mainSystemHint.segment == CacheSegment.SystemPrompt &amp;&amp; consumeBreakpoint()</ID>
<ID>CyclomaticComplexMethod:AgenticLoop.kt$internal suspend fun &lt;IN&gt; executeAgentic( agent: Agent&lt;IN, *&gt;, skill: Skill&lt;*, *&gt;, input: IN, /** * #3376 batch 5 — the per-invocation execution parameters (prompt override, resume/HITL state, * checkpoint callback, manifest-mismatch opt-out, attachments) bundled into one [RunRequest], * shared with [Agent.invokeSuspendForSession]. Defaults to a fresh invocation. */ request: agents_engine.core.RunRequest = agents_engine.core.RunRequest(), /** * #1739: optional AgentEvent emitter. When non-null, the loop streams * via `client.chatStream(...)`, surfaces `Token` / `ToolCallStarted` / * `ToolCallArgumentsDelta` events from chunks, and emits * `ToolCallFinished` after each tool executor runs. When null, the * loop uses `client.chat(...)` byte-for-byte as before — non-streaming * callers (`Agent.invoke`, `Agent.invokeSuspend`) pay no overhead. */ emitter: AgentEventEmitter? = null, runtimeContext: AgentRuntimeContext = AgentRuntimeContext.currentOrNew(), ): AgenticResult</ID>
<ID>CyclomaticComplexMethod:ClaudeClient.kt$ClaudeClient$@Suppress("UNCHECKED_CAST") private suspend fun dispatchSseEvent( event: String, dataJson: String, blocks: MutableMap&lt;Int, BlockState&gt;, collector: kotlinx.coroutines.flow.FlowCollector&lt;LlmChunk&gt;, onInputTokens: (Int) -&gt; Unit, onCachedInputTokens: (Int) -&gt; Unit, onOutputTokens: (Int) -&gt; Unit, onMessageStop: suspend () -&gt; Unit, )</ID>
<ID>CyclomaticComplexMethod:ClaudeClient.kt$ClaudeClient$@Suppress("UNCHECKED_CAST") private suspend fun dispatchSseEvent( event: String, dataJson: String, blocks: MutableMap&lt;Int, BlockState&gt;, collector: FlowCollector&lt;LlmChunk&gt;, onInputTokens: (Int) -&gt; Unit, onCachedInputTokens: (Int) -&gt; Unit, onOutputTokens: (Int) -&gt; Unit, onMessageStop: suspend () -&gt; Unit, )</ID>
<ID>CyclomaticComplexMethod:ClaudeClient.kt$ClaudeClient$internal fun buildRequestJson( messages: List&lt;LlmMessage&gt;, stream: Boolean = false, jsonSchema: JsonSchema? = null, ): String</ID>
<ID>CyclomaticComplexMethod:LenientJsonParser.kt$LenientJsonParser.Parser$private fun parseNumber(): Number</ID>
<ID>CyclomaticComplexMethod:OllamaClient.kt$OllamaClient$override suspend fun chatStream(messages: List&lt;LlmMessage&gt;, jsonSchema: JsonSchema?): Flow&lt;LlmChunk&gt;</ID>
<ID>CyclomaticComplexMethod:OpenAiClient.kt$OpenAiClient$private suspend fun parseSseStream(stream: InputStream, collector: kotlinx.coroutines.flow.FlowCollector&lt;LlmChunk&gt;)</ID>
<ID>CyclomaticComplexMethod:OpenAiClient.kt$OpenAiClient$private suspend fun parseSseStream(stream: InputStream, collector: FlowCollector&lt;LlmChunk&gt;)</ID>
<ID>DestructuringDeclarationWithTooManyEntries:AgentSessionIntegrationTest.kt$AgentSessionIntegrationTest$val (eventsA, outputA, eventsB, outputB) = coroutineScope { val sessionA = echoAgent.session("alpha") val sessionB = echoAgent.session("bravo") val a = async { sessionA.events.toList() } val b = async { sessionB.events.toList() } val outA = sessionA.await() val outB = sessionB.await() Quad(a.await(), outA, b.await(), outB) }</ID>
<ID>EmptyDefaultConstructor:AllSpecsDeepIntegrationTest.kt$AllSpecsDeepIntegrationTest.Spec$()</ID>
<ID>EmptyElseBlock:SnapshotResumeTest.kt$SnapshotResumeTest$if (firstSeenMessages == null) firstSeenMessages = msgs.toList()</ID>
Expand Down Expand Up @@ -153,9 +153,7 @@
<ID>MaxLineLength:ClaudeClient.kt$ClaudeClient$"""{"type":"tool_result","tool_use_id":${id.toJsonString()},"content":${msg.content.toJsonString()}}"""</ID>
<ID>MaxLineLength:ClaudeClient.kt$ClaudeClient$add("""{"name":${t.name.toJsonString()},"description":${t.description.toJsonString()},"input_schema":$schema}""")</ID>
<ID>MaxLineLength:ClaudeClient.kt$ClaudeClient$blocks += """{"type":"tool_use","id":${id.toJsonString()},"name":${call.name.toJsonString()},"input":$argsJson}"""</ID>
<ID>MaxLineLength:ClaudeClient.kt$ClaudeClient$private suspend</ID>
<ID>MaxLineLength:ClaudeClient.kt$ClaudeClient$return """{"model":${model.toJsonString()},"max_tokens":$maxTokens,"temperature":$effectiveTemperature$thinkingField$streamField$systemField,"messages":[${messageObjects.joinToString(",")}]$toolsField$toolChoiceField}"""</ID>
<ID>MaxLineLength:ClaudeClient.kt$ClaudeClient$val cacheControl = if (msg.cacheHint != null &amp;&amp; consumeBreakpoint()) cacheControlJson(msg.cacheHint) else null</ID>
<ID>MaxLineLength:ClaudeClientCancellationTest.kt$ClaudeClientCancellationTest$fun</ID>
<ID>MaxLineLength:ClaudeClientCancellationTest.kt$ClaudeClientCancellationTest.SlowSseStream$add("event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"chunk$i \"}}\n\n")</ID>
<ID>MaxLineLength:ClaudeClientCancellationTest.kt$ClaudeClientCancellationTest.SlowSseStream$add("event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\"}}\n\n")</ID>
Expand Down Expand Up @@ -262,9 +260,7 @@
<ID>MaxLineLength:OpenAiClient.kt$OpenAiClient$""","response_format":{"type":"json_schema","json_schema":{"name":${schema.wireName().toJsonString()},"schema":${schema.schema},"strict":true}}"""</ID>
<ID>MaxLineLength:OpenAiClient.kt$OpenAiClient$"""{"id":${id.toJsonString()},"type":"function","function":{"name":${call.name.toJsonString()},"arguments":${argsString.toJsonString()}}}"""</ID>
<ID>MaxLineLength:OpenAiClient.kt$OpenAiClient$"""{"type":"function","function":{"name":${t.name.toJsonString()},"description":${t.description.toJsonString()},"parameters":$schema}}"""</ID>
<ID>MaxLineLength:OpenAiClient.kt$OpenAiClient$private suspend</ID>
<ID>MaxLineLength:OpenAiClient.kt$OpenAiClient$return """{"model":${model.toJsonString()},"max_tokens":$maxTokens,"temperature":$temperature$additionalFields$cacheKeyField$streamField,"messages":[${messageObjects.joinToString(",")}]$effectiveToolsField$toolChoiceField$responseFormatField}"""</ID>
<ID>MaxLineLength:OpenAiClient.kt$OpenAiClient$throw LlmProviderException("$providerLabel returned an error: ${type ?: "unknown"}: ${message ?: "no message"}")</ID>
<ID>MaxLineLength:OpenAiClientCancellationTest.kt$OpenAiClientCancellationTest$fun</ID>
<ID>MaxLineLength:OpenAiClientCancellationTest.kt$OpenAiClientCancellationTest.SlowOpenAiSseStream$add("data: {\"id\":\"x\",\"choices\":[],\"usage\":{\"prompt_tokens\":3,\"completion_tokens\":12,\"total_tokens\":15}}\n\n")</ID>
<ID>MaxLineLength:OpenAiClientCancellationTest.kt$OpenAiClientCancellationTest.SlowOpenAiSseStream$add("data: {\"id\":\"x\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"chunk$i \"},\"finish_reason\":null}]}\n\n")</ID>
Expand Down Expand Up @@ -409,7 +405,6 @@
<ID>UnusedPrivateProperty:AgentsPipelineTest.kt$AgentsPipelineTest$val totalPipeline = pipelinePt1 then pipelinePt2</ID>
<ID>UnusedPrivateProperty:AgentsPipelineTest.kt$AgentsPipelineTest$val totalPipelineWithType: Pipeline&lt;SomeSpecAsk, SomeProductionMachineryManagement&gt; = pipelinePt1 then pipelinePt2</ID>
<ID>UnusedPrivateProperty:LoopAgenticIntegrationTest.kt$LoopAgenticIntegrationTest$var bestDraft = ""</ID>
<ID>UnusedPrivateProperty:MockTcpMcpServer.kt$MockTcpMcpServer$private val acceptThread = Thread({ acceptLoop() }, "MockTcpMcpServer-accept-${server.localPort}").apply { isDaemon = true start() }</ID>
<ID>UseCheckOrError:LenientJsonParser.kt$LenientJsonParser.Parser$throw IllegalStateException( "LenientJsonParser: unexpected character '${s[pos]}' at pos $pos" )</ID>
<ID>UseCheckOrError:LenientJsonParser.kt$LenientJsonParser.Parser$throw IllegalStateException( "LenientJsonParser: zero-progress at pos $pos in array" )</ID>
<ID>UseCheckOrError:LenientJsonParser.kt$LenientJsonParser.Parser$throw IllegalStateException( "LenientJsonParser: zero-progress at pos $pos in object" )</ID>
Expand Down
8 changes: 5 additions & 3 deletions src/main/kotlin/agents_engine/model/ClaudeClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn

Expand Down Expand Up @@ -167,7 +168,7 @@ open class ClaudeClient(
val argsBuilder: StringBuilder,
)

private suspend fun parseSseStream(stream: InputStream, collector: kotlinx.coroutines.flow.FlowCollector<LlmChunk>) {
private suspend fun parseSseStream(stream: InputStream, collector: FlowCollector<LlmChunk>) {
val blocks = mutableMapOf<Int, BlockState>()
var inputTokens: Int? = null
var outputTokens: Int? = null
Expand Down Expand Up @@ -229,7 +230,7 @@ open class ClaudeClient(
event: String,
dataJson: String,
blocks: MutableMap<Int, BlockState>,
collector: kotlinx.coroutines.flow.FlowCollector<LlmChunk>,
collector: FlowCollector<LlmChunk>,
onInputTokens: (Int) -> Unit,
onCachedInputTokens: (Int) -> Unit,
onOutputTokens: (Int) -> Unit,
Expand Down Expand Up @@ -408,7 +409,8 @@ open class ClaudeClient(
// #2658 — when an assistant/user message carries a CacheHint
// (typically segment=Conversation for rolling mode), attach
// cache_control to the LAST content block on the wire.
val cacheControl = if (msg.cacheHint != null && consumeBreakpoint()) cacheControlJson(msg.cacheHint) else null
val cacheControl =
if (msg.cacheHint != null && consumeBreakpoint()) cacheControlJson(msg.cacheHint) else null
when (msg.role) {
"user" -> {
val images = msg.images
Expand Down
7 changes: 5 additions & 2 deletions src/main/kotlin/agents_engine/model/OpenAiClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn

Expand Down Expand Up @@ -167,7 +168,7 @@ open class OpenAiClient(
val argsBuilder: StringBuilder = StringBuilder(),
)

private suspend fun parseSseStream(stream: InputStream, collector: kotlinx.coroutines.flow.FlowCollector<LlmChunk>) {
private suspend fun parseSseStream(stream: InputStream, collector: FlowCollector<LlmChunk>) {
// Keyed by `tool_calls[].index` within the choice.
val toolStates = mutableMapOf<Int, ToolCallState>()
var usage: TokenUsage? = null
Expand Down Expand Up @@ -377,7 +378,9 @@ open class OpenAiClient(
(root["error"] as? Map<*, *>)?.let { err ->
val type = err["type"] as? String
val message = err["message"] as? String
throw LlmProviderException("$providerLabel returned an error: ${type ?: "unknown"}: ${message ?: "no message"}")
throw LlmProviderException(
"$providerLabel returned an error: ${type ?: "unknown"}: ${message ?: "no message"}"
)
}

val tokenUsage = extractTokenUsage(root)
Expand Down
10 changes: 7 additions & 3 deletions src/test/kotlin/agents_engine/mcp/MockTcpMcpServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ class MockTcpMcpServer internal constructor(
private val workers = Executors.newCachedThreadPool { r ->
Thread(r, "MockTcpMcpServer-${server.localPort}").apply { isDaemon = true }
}
private val acceptThread = Thread({ acceptLoop() }, "MockTcpMcpServer-accept-${server.localPort}").apply {
isDaemon = true
start()
init {
// Accept loop runs on its own daemon thread; we don't retain a handle (stop() closes the
// ServerSocket, which unblocks accept()).
Thread({ acceptLoop() }, "MockTcpMcpServer-accept-${server.localPort}").apply {
isDaemon = true
start()
}
}

fun stop() {
Expand Down
Loading