Skip to content

Servable framework refactor - phase 1#4219

Open
mzegla wants to merge 9 commits into
mainfrom
servables_refactor_phase1
Open

Servable framework refactor - phase 1#4219
mzegla wants to merge 9 commits into
mainfrom
servables_refactor_phase1

Conversation

@mzegla

@mzegla mzegla commented May 18, 2026

Copy link
Copy Markdown
Collaborator

No description provided.

@mzegla mzegla added the 2026.3 label May 18, 2026
mzegla added 2 commits May 19, 2026 13:51
fix style

windows compiler issue

fixes
@mzegla mzegla force-pushed the servables_refactor_phase1 branch from 4a40996 to 6b07f7f Compare May 19, 2026 12:01
@mzegla mzegla requested a review from Copilot May 19, 2026 12:04

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Phase 1 of a servable-framework refactor that moves streaming from “raw text chunks” to “pre-parsed delta documents”, introducing a shared parsed-delta channel and a new OVMSTextStreamer to unify streaming behavior across continuous-batching and legacy pipelines.

Changes:

  • Introduces OVMSTextStreamer to detokenize and invoke OutputParser::parseChunk() with both text and the corresponding token slice, producing rapidjson::Document deltas.
  • Replaces legacy mutex/CV “last text chunk” flow with a shared DeltaChannel and updates servables/executors to signal completion via the channel.
  • Refactors OpenAI streaming serialization APIs to accept already-parsed delta Documents and updates tests/parsers for the new parseChunk() signature.

Reviewed changes

Copilot reviewed 54 out of 54 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/test/llm/output_parsers/qwen3coder_output_parser_test.cpp Updates tests to call OutputParser::parseChunk() with the new tokens argument.
src/test/llm/output_parsers/qwen3_output_parser_test.cpp Updates streaming parser tests for the new parseChunk() signature and error paths.
src/test/llm/output_parsers/phi4_output_parser_test.cpp Updates streaming tests to pass tokens to parseChunk().
src/test/llm/output_parsers/mistral_output_parser_test.cpp Updates streaming tests to pass tokens to parseChunk().
src/test/llm/output_parsers/llama3_output_parser_test.cpp Updates streaming tests to pass tokens to parseChunk().
src/test/llm/output_parsers/lfm2_output_parser_test.cpp Updates helper/assert flows to pass tokens to parseChunk().
src/test/llm/output_parsers/hermes3_output_parser_test.cpp Updates streaming tests to pass tokens to parseChunk().
src/test/llm/output_parsers/gptoss_output_parser_test.cpp Updates streaming tests to pass tokens to parseChunk().
src/test/llm/output_parsers/gemma4_output_parser_test.cpp Updates streaming tests to pass tokens to parseChunk().
src/test/llm/output_parsers/devstral_output_parser_test.cpp Updates streaming tests to pass tokens to parseChunk().
src/test/http_openai_handler_test.cpp Adds migration helper to build parsed deltas and adapts tests to new serializeStreamingChunk(Document, reason) API.
src/llm/visual_language_model/legacy/servable.hpp Updates legacy VLM execution context for the new delta-channel streaming approach and unary accumulation.
src/llm/visual_language_model/legacy/servable.cpp Migrates VLM legacy streaming/unary flows to OVMSTextStreamer + DeltaChannel.
src/llm/visual_language_model/legacy/legacy_executor.cpp Signals streaming completion via deltaChannel.signalComplete().
src/llm/servable.hpp Introduces DeltaChannel and updates execution context fields for parsed-delta streaming.
src/llm/servable.cpp Switches continuous-batching streaming to drain parsed deltas from DeltaChannel and serialize them.
src/llm/ovms_text_streamer.hpp Adds new OVMSTextStreamer (TextStreamer-derived) that produces parsed delta Documents.
src/llm/ovms_text_streamer.cpp Implements flush heuristics, token-slice computation, and callback emission for parsed deltas.
src/llm/language_model/legacy/servable.hpp Removes legacy mutex/CV fields and switches disconnect handling to DeltaChannel.
src/llm/language_model/legacy/servable.cpp Migrates legacy LM streaming flow to OVMSTextStreamer + DeltaChannel.
src/llm/language_model/legacy/legacy_executor.cpp Signals streaming completion via deltaChannel.signalComplete().
src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.hpp Updates tool parser streaming API to accept tokens.
src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.cpp Adapts streaming tool parsing implementation to new signature.
src/llm/io_processing/qwen3/reasoning_parser.hpp Updates reasoning parser streaming API to accept tokens.
src/llm/io_processing/qwen3/reasoning_parser.cpp Adapts reasoning parser to new signature.
src/llm/io_processing/phi4/tool_parser.hpp Updates Phi4 tool parser streaming API to accept tokens.
src/llm/io_processing/phi4/tool_parser.cpp Adapts Phi4 tool parser streaming implementation and recursive calls.
src/llm/io_processing/output_parser.hpp Adds tokens parameter threading through tool/reasoning chunk parsing and top-level parseChunk().
src/llm/io_processing/output_parser.cpp Passes tokens through phase transitions and parser calls.
src/llm/io_processing/mistral/tool_parser.hpp Updates Mistral tool parser streaming API to accept tokens.
src/llm/io_processing/mistral/tool_parser.cpp Adapts Mistral streaming parsing implementation and recursive calls.
src/llm/io_processing/llama3/tool_parser.hpp Updates Llama3 tool parser streaming API to accept tokens.
src/llm/io_processing/llama3/tool_parser.cpp Adapts Llama3 streaming parsing implementation to new signature.
src/llm/io_processing/lfm2/lfm2_tool_parser.hpp Updates LFM2 tool parser streaming API to accept tokens.
src/llm/io_processing/lfm2/lfm2_tool_parser.cpp Adapts LFM2 streaming parsing implementation to new signature.
src/llm/io_processing/hermes3/tool_parser.hpp Updates Hermes3 tool parser streaming API to accept tokens.
src/llm/io_processing/hermes3/tool_parser.cpp Adapts Hermes3 streaming parsing signature to accept tokens.
src/llm/io_processing/gptoss/tool_parser.hpp Updates GPT-OSS tool parser streaming API to accept tokens.
src/llm/io_processing/gptoss/tool_parser.cpp Adapts GPT-OSS tool parsing signature to accept tokens.
src/llm/io_processing/gptoss/reasoning_parser.hpp Updates GPT-OSS reasoning parser streaming API to accept tokens.
src/llm/io_processing/gptoss/reasoning_parser.cpp Adapts GPT-OSS reasoning parsing signature to accept tokens.
src/llm/io_processing/gemma4/gemma4_tool_parser.hpp Updates Gemma4 tool parser streaming API to accept tokens.
src/llm/io_processing/gemma4/gemma4_tool_parser.cpp Adapts Gemma4 tool parsing signature to accept tokens.
src/llm/io_processing/gemma4/gemma4_reasoning_parser.hpp Updates Gemma4 reasoning parser streaming API to accept tokens.
src/llm/io_processing/gemma4/gemma4_reasoning_parser.cpp Adapts Gemma4 reasoning parsing signature to accept tokens.
src/llm/io_processing/devstral/tool_parser.hpp Updates Devstral tool parser streaming API to accept tokens.
src/llm/io_processing/devstral/tool_parser.cpp Adapts Devstral tool parsing signature to accept tokens.
src/llm/io_processing/base_output_parser.hpp Updates base streaming parser interface to include token IDs for each chunk.
src/llm/BUILD Adds ovms_text_streamer sources/headers to the genai_servables target.
src/llm/apis/openai_responses.hpp Updates streaming serialization signature to accept parsed deltas.
src/llm/apis/openai_responses.cpp Refactors streaming serialization to consume parsed delta Documents (no parsing in handler).
src/llm/apis/openai_completions.hpp Updates streaming serialization signature to accept parsed deltas.
src/llm/apis/openai_completions.cpp Refactors chat/completions streaming serialization to consume parsed delta Documents.
src/llm/apis/openai_api_handler.hpp Updates abstract streaming serialization API to accept parsed delta Documents.
Comments suppressed due to low confidence (1)

src/llm/io_processing/hermes3/tool_parser.cpp:209

  • parseChunk() returns nullopt immediately on empty chunk, even when finishReason != NONE. With the new OVMSTextStreamer, a final STOP flush can legitimately call parseChunk("", STOP) to force end-of-tool-call cleanup (delay window / argument string closing). Consider handling empty chunk specially when finishReason != NONE (finalize/close pending arguments and emit any delayed delta as needed) instead of returning early.
std::optional<rapidjson::Document> Hermes3ToolParser::parseChunk(const std::string& chunk, const std::vector<int64_t>& /*tokens*/, ov::genai::GenerationFinishReason finishReason) {
    /* 
    We first collect data until we have full function name - that's when we return the first delta.
    Every next delta contains next parts of the arguments. Hermes3 generates arguments as JSON, but OpenAI API expects them in a string format.
    That's why once we reach 'arguments' key, we add double quote to force string type and escape all double quotes that come in next parts.
    To know when we reach the end of the arguments string, we return delta with a one-chunk delay. This way, when we reach end of tool call, we modify previous chunk to close
    arguments string properly and return such modified chunk.
    */

    /*
    PHASE 0: Prepare data and state for processing
    - If previous call finished tool call (received </tool_call> tag), we clear state.
    - If current call finishes tool call (finishReason != NONE), we set flag to clear state in the next call.
    - If chunk is empty, we return std::nullopt.
    - We prepend unprocessedBuffer to the chunk and clear unprocessedBuffer.
    */

    // Check if previous call finished tool call (received </tool_call> tag)
    if (toolCallCompleted) {
        clearState();
        toolCallCompleted = false;
    }

    toolCallCompleted = (finishReason != ov::genai::GenerationFinishReason::NONE);

    if (chunk.empty()) {
        SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Received empty chunk for Hermes3ToolParser");
        return std::nullopt;
    }

Comment thread src/llm/servable.hpp
Comment thread src/llm/language_model/legacy/servable.cpp
Comment thread src/llm/visual_language_model/legacy/servable.cpp
Comment thread src/llm/visual_language_model/legacy/servable.cpp

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 56 out of 56 changed files in this pull request and generated 5 comments.

Comment thread src/llm/servable.cpp
std::move(deltas[i]),
ov::genai::GenerationFinishReason::NONE);
if (!serialized.empty()) {
executionContext->response += wrapTextInServerSideEventMessage(serialized);
std::string serialized = executionContext->apiHandler->serializeStreamingChunk(
std::move(delta), ov::genai::GenerationFinishReason::NONE);
if (!serialized.empty()) {
executionContext->response += wrapTextInServerSideEventMessage(serialized);
std::string serialized = executionContext->apiHandler->serializeStreamingChunk(
std::move(delta), ov::genai::GenerationFinishReason::NONE);
if (!serialized.empty()) {
executionContext->response += wrapTextInServerSideEventMessage(serialized);
Comment on lines +113 to +114
void OVMSTextStreamer::end() {
if (!m_tokens_cache.empty()) {
Comment on lines +144 to +145
if (!legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens) {
streamerConfig.insert(ov::genai::skip_special_tokens(false));
@mzegla mzegla marked this pull request as ready for review June 2, 2026 10:01
// parsedDelta is a pre-parsed Document produced by OVMSTextStreamer::flushChunk.
// Shape: {"delta":{...}} for content/reasoning/tool_calls, or an empty Document{}
// for finish-only chunks. Inspect the delta directly — no parsing needed here.
if (parsedDelta.HasMember("delta") && parsedDelta["delta"].IsObject()) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might be loosing some events here -> in responses API we not always stream data when delta is ready. Sometimes delta is not ready and we want to stream something anyway (TTFT indicator, prefill end mark)
The same goes for chat/completions API.
I cant see it here, is it supported?

legacyExecutionContext->apiHandler->getOutputParser()->requiresStreamingWithSpecialTokens()) ||
!legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens)) {
streamerConfig.insert(ov::genai::skip_special_tokens(false));
auto ovmsCallback = [& ctx = *legacyExecutionContext](rapidjson::Document delta) -> ov::genai::StreamingStatus {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure that lifetime of legacyExecutionContext is longer than existence of this lambda? if not, it will crash

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback must fire in the scope of one generation. The text streamer, which would hold the lambda is a member of execution context and cannot outlive it. The streamer pointer is passed only to generate call which cannot outlive the context and generate does not save the object anywhere in the pipeline.

Comment thread src/llm/servable.hpp
}

// Returns true after signalComplete() has been called.
bool complete() const {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool complete() const {
bool isComplete() const {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might have TOCTOU with this function

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state change is one directional - uncompleted -> completed.
If there is a miss, we will capture the completion signal in the next iteration.
If we already finished generating and next iteration does not have any remaining deltas we send event with empty content and finish reason.

Comment thread src/llm/servable.cpp
if (count > 0 || executionContext->apiHandler->getEndpoint() == Endpoint::RESPONSES) {
// Emit each delta. All are mid-stream so finishReason is NONE.
for (size_t i = 0; i < count; ++i) {
std::string serialized = executionContext->apiHandler->serializeStreamingChunk(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 duplicates of exact same code in llm/vllm x unary/streaming, can we unify at some point in one of the refactor phases?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, hopefully next phase will clean a lot of stuff up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants