Servable framework refactor - phase 1#4219
Conversation
4a40996 to
6b07f7f
Compare
There was a problem hiding this comment.
Pull request overview
Phase 1 of a servable-framework refactor that moves streaming from “raw text chunks” to “pre-parsed delta documents”, introducing a shared parsed-delta channel and a new OVMSTextStreamer to unify streaming behavior across continuous-batching and legacy pipelines.
Changes:
- Introduces
OVMSTextStreamerto detokenize and invokeOutputParser::parseChunk()with both text and the corresponding token slice, producingrapidjson::Documentdeltas. - Replaces legacy mutex/CV “last text chunk” flow with a shared
DeltaChanneland updates servables/executors to signal completion via the channel. - Refactors OpenAI streaming serialization APIs to accept already-parsed delta
Documents and updates tests/parsers for the newparseChunk()signature.
Reviewed changes
Copilot reviewed 54 out of 54 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| src/test/llm/output_parsers/qwen3coder_output_parser_test.cpp | Updates tests to call OutputParser::parseChunk() with the new tokens argument. |
| src/test/llm/output_parsers/qwen3_output_parser_test.cpp | Updates streaming parser tests for the new parseChunk() signature and error paths. |
| src/test/llm/output_parsers/phi4_output_parser_test.cpp | Updates streaming tests to pass tokens to parseChunk(). |
| src/test/llm/output_parsers/mistral_output_parser_test.cpp | Updates streaming tests to pass tokens to parseChunk(). |
| src/test/llm/output_parsers/llama3_output_parser_test.cpp | Updates streaming tests to pass tokens to parseChunk(). |
| src/test/llm/output_parsers/lfm2_output_parser_test.cpp | Updates helper/assert flows to pass tokens to parseChunk(). |
| src/test/llm/output_parsers/hermes3_output_parser_test.cpp | Updates streaming tests to pass tokens to parseChunk(). |
| src/test/llm/output_parsers/gptoss_output_parser_test.cpp | Updates streaming tests to pass tokens to parseChunk(). |
| src/test/llm/output_parsers/gemma4_output_parser_test.cpp | Updates streaming tests to pass tokens to parseChunk(). |
| src/test/llm/output_parsers/devstral_output_parser_test.cpp | Updates streaming tests to pass tokens to parseChunk(). |
| src/test/http_openai_handler_test.cpp | Adds migration helper to build parsed deltas and adapts tests to new serializeStreamingChunk(Document, reason) API. |
| src/llm/visual_language_model/legacy/servable.hpp | Updates legacy VLM execution context for the new delta-channel streaming approach and unary accumulation. |
| src/llm/visual_language_model/legacy/servable.cpp | Migrates VLM legacy streaming/unary flows to OVMSTextStreamer + DeltaChannel. |
| src/llm/visual_language_model/legacy/legacy_executor.cpp | Signals streaming completion via deltaChannel.signalComplete(). |
| src/llm/servable.hpp | Introduces DeltaChannel and updates execution context fields for parsed-delta streaming. |
| src/llm/servable.cpp | Switches continuous-batching streaming to drain parsed deltas from DeltaChannel and serialize them. |
| src/llm/ovms_text_streamer.hpp | Adds new OVMSTextStreamer (TextStreamer-derived) that produces parsed delta Documents. |
| src/llm/ovms_text_streamer.cpp | Implements flush heuristics, token-slice computation, and callback emission for parsed deltas. |
| src/llm/language_model/legacy/servable.hpp | Removes legacy mutex/CV fields and switches disconnect handling to DeltaChannel. |
| src/llm/language_model/legacy/servable.cpp | Migrates legacy LM streaming flow to OVMSTextStreamer + DeltaChannel. |
| src/llm/language_model/legacy/legacy_executor.cpp | Signals streaming completion via deltaChannel.signalComplete(). |
| src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.hpp | Updates tool parser streaming API to accept tokens. |
| src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.cpp | Adapts streaming tool parsing implementation to new signature. |
| src/llm/io_processing/qwen3/reasoning_parser.hpp | Updates reasoning parser streaming API to accept tokens. |
| src/llm/io_processing/qwen3/reasoning_parser.cpp | Adapts reasoning parser to new signature. |
| src/llm/io_processing/phi4/tool_parser.hpp | Updates Phi4 tool parser streaming API to accept tokens. |
| src/llm/io_processing/phi4/tool_parser.cpp | Adapts Phi4 tool parser streaming implementation and recursive calls. |
| src/llm/io_processing/output_parser.hpp | Adds tokens parameter threading through tool/reasoning chunk parsing and top-level parseChunk(). |
| src/llm/io_processing/output_parser.cpp | Passes tokens through phase transitions and parser calls. |
| src/llm/io_processing/mistral/tool_parser.hpp | Updates Mistral tool parser streaming API to accept tokens. |
| src/llm/io_processing/mistral/tool_parser.cpp | Adapts Mistral streaming parsing implementation and recursive calls. |
| src/llm/io_processing/llama3/tool_parser.hpp | Updates Llama3 tool parser streaming API to accept tokens. |
| src/llm/io_processing/llama3/tool_parser.cpp | Adapts Llama3 streaming parsing implementation to new signature. |
| src/llm/io_processing/lfm2/lfm2_tool_parser.hpp | Updates LFM2 tool parser streaming API to accept tokens. |
| src/llm/io_processing/lfm2/lfm2_tool_parser.cpp | Adapts LFM2 streaming parsing implementation to new signature. |
| src/llm/io_processing/hermes3/tool_parser.hpp | Updates Hermes3 tool parser streaming API to accept tokens. |
| src/llm/io_processing/hermes3/tool_parser.cpp | Adapts Hermes3 streaming parsing signature to accept tokens. |
| src/llm/io_processing/gptoss/tool_parser.hpp | Updates GPT-OSS tool parser streaming API to accept tokens. |
| src/llm/io_processing/gptoss/tool_parser.cpp | Adapts GPT-OSS tool parsing signature to accept tokens. |
| src/llm/io_processing/gptoss/reasoning_parser.hpp | Updates GPT-OSS reasoning parser streaming API to accept tokens. |
| src/llm/io_processing/gptoss/reasoning_parser.cpp | Adapts GPT-OSS reasoning parsing signature to accept tokens. |
| src/llm/io_processing/gemma4/gemma4_tool_parser.hpp | Updates Gemma4 tool parser streaming API to accept tokens. |
| src/llm/io_processing/gemma4/gemma4_tool_parser.cpp | Adapts Gemma4 tool parsing signature to accept tokens. |
| src/llm/io_processing/gemma4/gemma4_reasoning_parser.hpp | Updates Gemma4 reasoning parser streaming API to accept tokens. |
| src/llm/io_processing/gemma4/gemma4_reasoning_parser.cpp | Adapts Gemma4 reasoning parsing signature to accept tokens. |
| src/llm/io_processing/devstral/tool_parser.hpp | Updates Devstral tool parser streaming API to accept tokens. |
| src/llm/io_processing/devstral/tool_parser.cpp | Adapts Devstral tool parsing signature to accept tokens. |
| src/llm/io_processing/base_output_parser.hpp | Updates base streaming parser interface to include token IDs for each chunk. |
| src/llm/BUILD | Adds ovms_text_streamer sources/headers to the genai_servables target. |
| src/llm/apis/openai_responses.hpp | Updates streaming serialization signature to accept parsed deltas. |
| src/llm/apis/openai_responses.cpp | Refactors streaming serialization to consume parsed delta Documents (no parsing in handler). |
| src/llm/apis/openai_completions.hpp | Updates streaming serialization signature to accept parsed deltas. |
| src/llm/apis/openai_completions.cpp | Refactors chat/completions streaming serialization to consume parsed delta Documents. |
| src/llm/apis/openai_api_handler.hpp | Updates abstract streaming serialization API to accept parsed delta Documents. |
Comments suppressed due to low confidence (1)
src/llm/io_processing/hermes3/tool_parser.cpp:209
- parseChunk() returns nullopt immediately on empty chunk, even when finishReason != NONE. With the new OVMSTextStreamer, a final STOP flush can legitimately call parseChunk("", STOP) to force end-of-tool-call cleanup (delay window / argument string closing). Consider handling empty chunk specially when finishReason != NONE (finalize/close pending arguments and emit any delayed delta as needed) instead of returning early.
std::optional<rapidjson::Document> Hermes3ToolParser::parseChunk(const std::string& chunk, const std::vector<int64_t>& /*tokens*/, ov::genai::GenerationFinishReason finishReason) {
/*
We first collect data until we have full function name - that's when we return the first delta.
Every next delta contains next parts of the arguments. Hermes3 generates arguments as JSON, but OpenAI API expects them in a string format.
That's why once we reach 'arguments' key, we add double quote to force string type and escape all double quotes that come in next parts.
To know when we reach the end of the arguments string, we return delta with a one-chunk delay. This way, when we reach end of tool call, we modify previous chunk to close
arguments string properly and return such modified chunk.
*/
/*
PHASE 0: Prepare data and state for processing
- If previous call finished tool call (received </tool_call> tag), we clear state.
- If current call finishes tool call (finishReason != NONE), we set flag to clear state in the next call.
- If chunk is empty, we return std::nullopt.
- We prepend unprocessedBuffer to the chunk and clear unprocessedBuffer.
*/
// Check if previous call finished tool call (received </tool_call> tag)
if (toolCallCompleted) {
clearState();
toolCallCompleted = false;
}
toolCallCompleted = (finishReason != ov::genai::GenerationFinishReason::NONE);
if (chunk.empty()) {
SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Received empty chunk for Hermes3ToolParser");
return std::nullopt;
}
| std::move(deltas[i]), | ||
| ov::genai::GenerationFinishReason::NONE); | ||
| if (!serialized.empty()) { | ||
| executionContext->response += wrapTextInServerSideEventMessage(serialized); |
| std::string serialized = executionContext->apiHandler->serializeStreamingChunk( | ||
| std::move(delta), ov::genai::GenerationFinishReason::NONE); | ||
| if (!serialized.empty()) { | ||
| executionContext->response += wrapTextInServerSideEventMessage(serialized); |
| std::string serialized = executionContext->apiHandler->serializeStreamingChunk( | ||
| std::move(delta), ov::genai::GenerationFinishReason::NONE); | ||
| if (!serialized.empty()) { | ||
| executionContext->response += wrapTextInServerSideEventMessage(serialized); |
| void OVMSTextStreamer::end() { | ||
| if (!m_tokens_cache.empty()) { |
| if (!legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens) { | ||
| streamerConfig.insert(ov::genai::skip_special_tokens(false)); |
| // parsedDelta is a pre-parsed Document produced by OVMSTextStreamer::flushChunk. | ||
| // Shape: {"delta":{...}} for content/reasoning/tool_calls, or an empty Document{} | ||
| // for finish-only chunks. Inspect the delta directly — no parsing needed here. | ||
| if (parsedDelta.HasMember("delta") && parsedDelta["delta"].IsObject()) { |
There was a problem hiding this comment.
I think you might be loosing some events here -> in responses API we not always stream data when delta is ready. Sometimes delta is not ready and we want to stream something anyway (TTFT indicator, prefill end mark)
The same goes for chat/completions API.
I cant see it here, is it supported?
| legacyExecutionContext->apiHandler->getOutputParser()->requiresStreamingWithSpecialTokens()) || | ||
| !legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens)) { | ||
| streamerConfig.insert(ov::genai::skip_special_tokens(false)); | ||
| auto ovmsCallback = [& ctx = *legacyExecutionContext](rapidjson::Document delta) -> ov::genai::StreamingStatus { |
There was a problem hiding this comment.
are you sure that lifetime of legacyExecutionContext is longer than existence of this lambda? if not, it will crash
There was a problem hiding this comment.
The callback must fire in the scope of one generation. The text streamer, which would hold the lambda is a member of execution context and cannot outlive it. The streamer pointer is passed only to generate call which cannot outlive the context and generate does not save the object anywhere in the pipeline.
| } | ||
|
|
||
| // Returns true after signalComplete() has been called. | ||
| bool complete() const { |
There was a problem hiding this comment.
| bool complete() const { | |
| bool isComplete() const { |
There was a problem hiding this comment.
I think we might have TOCTOU with this function
There was a problem hiding this comment.
The state change is one directional - uncompleted -> completed.
If there is a miss, we will capture the completion signal in the next iteration.
If we already finished generating and next iteration does not have any remaining deltas we send event with empty content and finish reason.
| if (count > 0 || executionContext->apiHandler->getEndpoint() == Endpoint::RESPONSES) { | ||
| // Emit each delta. All are mid-stream so finishReason is NONE. | ||
| for (size_t i = 0; i < count; ++i) { | ||
| std::string serialized = executionContext->apiHandler->serializeStreamingChunk( |
There was a problem hiding this comment.
4 duplicates of exact same code in llm/vllm x unary/streaming, can we unify at some point in one of the refactor phases?
There was a problem hiding this comment.
Yes, hopefully next phase will clean a lot of stuff up.
No description provided.