diff --git a/src/BUILD b/src/BUILD index c2d9aec3b1..608317ae2b 100644 --- a/src/BUILD +++ b/src/BUILD @@ -354,7 +354,6 @@ ovms_cc_library( "libovms_cliparser", "libovms_systeminfo", "ovms_exit_codes", - "//src/utils:env_guard", ], visibility = ["//visibility:public",], additional_copts = COPTS_DROGON, @@ -1007,6 +1006,7 @@ ovms_cc_library( hdrs = ["logging.hpp"], srcs = ["logging.cpp"], deps = [ + "//src/utils:env_guard", "@com_github_gabime_spdlog//:spdlog", "@com_github_glog_glog//:glog", # used to manage Mediapipe logging ], diff --git a/src/config.cpp b/src/config.cpp index cd37c30a06..c6f68d1766 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -36,7 +36,6 @@ #include "modelconfig.hpp" #include "stringutils.hpp" #include "systeminfo.hpp" -#include "utils/env_guard.hpp" namespace ovms { @@ -91,15 +90,6 @@ Config& Config::parse(int argc, char** argv) { bool Config::parse(ServerSettingsImpl* serverSettings, ModelsSettingsImpl* modelsSettings) { this->serverSettings = *serverSettings; this->modelsSettings = *modelsSettings; - static EnvGuard envGuard; -#if defined(__linux__) || defined(_WIN32) - if (this->serverSettings.logLevel == "DEBUG" || this->serverSettings.logLevel == "TRACE") { - envGuard.set("OPENVINO_LOG_LEVEL", "4"); - } -#endif - if (GetEnvVar("OVMS_GRAPH_QUEUE_OFF").empty()) { - envGuard.set("OVMS_GRAPH_QUEUE_OFF", "1"); - } return validate(); } diff --git a/src/logging.cpp b/src/logging.cpp index c0974c3a4e..b58cab2e31 100644 --- a/src/logging.cpp +++ b/src/logging.cpp @@ -23,6 +23,8 @@ #endif #include +#include "src/utils/env_guard.hpp" + namespace ovms { std::shared_ptr gcs_logger = std::make_shared("gcs"); @@ -163,6 +165,9 @@ void configure_logger(const std::string& log_level, const std::string& log_path) FLAGS_minloglevel = google::GLOG_ERROR; #endif #endif + if (log_level == "DEBUG" || log_level == "TRACE") { + SetEnvironmentVar("OPENVINO_LOG_LEVEL", "4"); + } } } // namespace ovms diff --git a/src/mediapipe_internal/graphqueue.cpp b/src/mediapipe_internal/graphqueue.cpp index f4688bd546..f72066fe15 100644 --- a/src/mediapipe_internal/graphqueue.cpp +++ b/src/mediapipe_internal/graphqueue.cpp @@ -51,59 +51,100 @@ GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::sh std::unordered_map> observers; for (auto& name : config.output_stream()) { std::string streamName = getStreamName(name); - auto holder = std::make_shared(); - holder->current = std::make_shared(); - observers[streamName] = holder; + auto observerHolder = std::make_shared(); + observerHolder->current = std::make_shared(); + observers[streamName] = observerHolder; } auto graphHelper = std::make_shared(std::move(observers)); - graphHelper->graph = std::make_unique<::mediapipe::CalculatorGraph>(); - graphHelper->currentTimestamp = ::mediapipe::Timestamp(0); - - auto absStatus = graphHelper->graph->Initialize(config); - if (!absStatus.ok()) { - SPDLOG_ERROR("Graph queue initialization failed: {}", absStatus.ToString()); - throw std::runtime_error(absStatus.ToString()); - } - for (const auto& [streamName, holder] : graphHelper->outStreamObservers) { - // Lambda captures holder (shared_ptr) by value — safe regardless of map layout - absStatus = graphHelper->graph->ObserveOutputStream(streamName, [holder](const ::mediapipe::Packet& packet) -> absl::Status { return holder->current->handlePacket(packet); }); - if (!absStatus.ok()) { - SPDLOG_ERROR("Graph queue ObserveOutputStream failed: {}", absStatus.ToString()); - throw std::runtime_error(absStatus.ToString()); - } - } for (const auto& [nodeName, _] : sidePacketMaps->genAiServableMap) { graphHelper->genAiExecutionContextMap[nodeName] = std::make_shared(); } - std::map inputSidePackets; - buildInputSidePackets(inputSidePackets, *sidePacketMaps); - // Override execution context with per-graph instance - inputSidePackets[LLM_EXECUTION_CONTEXT_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(graphHelper->genAiExecutionContextMap).At(::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE)); - absStatus = graphHelper->graph->StartRun(inputSidePackets); + auto absStatus = graphHelper->initialize(config, *sidePacketMaps); if (!absStatus.ok()) { - SPDLOG_ERROR("Graph queue StartRun failed: {}", absStatus.ToString()); + SPDLOG_ERROR("Graph queue initialization failed: {}", absStatus.ToString()); throw std::runtime_error(absStatus.ToString()); } - inferRequests.emplace_back(std::move(graphHelper)); + this->inferRequests.emplace_back(std::move(graphHelper)); } } -GraphQueue::~GraphQueue() { - for (auto& graphHelper : inferRequests) { - auto absStatus = graphHelper->graph->WaitUntilIdle(); +GraphHelper::~GraphHelper() { + if (!graph) { + return; + } + auto absStatus = graph->WaitUntilIdle(); + if (!absStatus.ok()) { + SPDLOG_DEBUG("GraphHelper WaitUntilIdle error: {}", absStatus.ToString()); + } + absStatus = graph->CloseAllPacketSources(); + if (!absStatus.ok()) { + SPDLOG_DEBUG("GraphHelper CloseAllPacketSources error: {}", absStatus.ToString()); + } + absStatus = graph->WaitUntilDone(); + if (!absStatus.ok()) { + SPDLOG_DEBUG("GraphHelper WaitUntilDone error: {}", absStatus.ToString()); + } + graph->Cancel(); +} + +absl::Status GraphHelper::initialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps) { + this->graph = std::make_unique<::mediapipe::CalculatorGraph>(); + this->currentTimestamp = ::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE); + auto absStatus = this->graph->Initialize(config); + if (!absStatus.ok()) { + SPDLOG_ERROR("Graph initialize failed: {}", absStatus.ToString()); + return absStatus; + } + for (const auto& [streamName, observerHolder] : this->outStreamObservers) { + absStatus = this->graph->ObserveOutputStream(streamName, [observerHolder](const ::mediapipe::Packet& packet) -> absl::Status { + return observerHolder->current->handlePacket(packet); + }); if (!absStatus.ok()) { - SPDLOG_DEBUG("Graph queue WaitUntilIdle error: {}", absStatus.ToString()); + SPDLOG_ERROR("Graph ObserveOutputStream failed: {}", absStatus.ToString()); + return absStatus; } - absStatus = graphHelper->graph->CloseAllPacketSources(); + } + std::map inputSidePackets; + buildInputSidePackets(inputSidePackets, sidePacketMaps); + inputSidePackets[LLM_EXECUTION_CONTEXT_SESSION_SIDE_PACKET_TAG] = + mediapipe::MakePacket(this->genAiExecutionContextMap) + .At(::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE)); + absStatus = this->graph->StartRun(inputSidePackets); + if (!absStatus.ok()) { + SPDLOG_ERROR("Graph StartRun failed: {}", absStatus.ToString()); + return absStatus; + } + return absl::OkStatus(); +} + +void GraphHelper::reinitialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps) { + SPDLOG_DEBUG("Reinitializing graph after error"); + // Tear down the old graph (best-effort, errors expected since graph is in bad state) + if (this->graph) { + auto absStatus = this->graph->CloseAllPacketSources(); if (!absStatus.ok()) { - SPDLOG_DEBUG("Graph queue CloseAllPacketSources error: {}", absStatus.ToString()); + SPDLOG_DEBUG("reinitialize: CloseAllPacketSources: {}", absStatus.ToString()); } - absStatus = graphHelper->graph->WaitUntilDone(); + absStatus = this->graph->WaitUntilDone(); if (!absStatus.ok()) { - SPDLOG_DEBUG("Graph queue WaitUntilDone error: {}", absStatus.ToString()); + SPDLOG_DEBUG("reinitialize: WaitUntilDone: {}", absStatus.ToString()); } - graphHelper->graph->Cancel(); - graphHelper->graph.reset(); + this->graph->Cancel(); + } + // Reset observers to null sentinel + for (const auto& [streamName, observerHolder] : this->outStreamObservers) { + observerHolder->current = std::make_shared(); + } + // Reset execution contexts + for (auto& [nodeName, ctx] : this->genAiExecutionContextMap) { + ctx->reset(); + } + auto absStatus = initialize(config, sidePacketMaps); + if (!absStatus.ok()) { + SPDLOG_ERROR("Graph reinitialize failed: {}", absStatus.ToString()); + return; } + SPDLOG_DEBUG("Graph reinitialized successfully"); } +GraphQueue::~GraphQueue() = default; } // namespace ovms diff --git a/src/mediapipe_internal/graphqueue.hpp b/src/mediapipe_internal/graphqueue.hpp index d97ee5d18f..9af74d5a08 100644 --- a/src/mediapipe_internal/graphqueue.hpp +++ b/src/mediapipe_internal/graphqueue.hpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -39,6 +40,8 @@ #pragma GCC diagnostic pop #pragma warning(pop) +#include "src/logging.hpp" + #include "graph_executor_constants.hpp" #include "graph_side_packets.hpp" #include "outputstreamobserver.hpp" @@ -65,7 +68,49 @@ struct GraphHelper { genAiExecutionContextMap(std::move(gh.genAiExecutionContextMap)), currentTimestamp(gh.currentTimestamp) {} GraphHelper& operator=(GraphHelper&&) = delete; + ~GraphHelper(); + // Creates a fresh CalculatorGraph, initializes it with the config, + // wires up output stream observers, builds side packets and starts the run. + absl::Status initialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps); + // Tears down the current (errored) graph and rebuilds a fresh one + // with the same observers and side packets. Called when inference + // encounters a graph error to avoid returning a poisoned graph to the pool. + void reinitialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps); }; + +// RAII guard that reinitializes the graph if inference exits with an error. +// Construct before the first graph interaction (packet push). Call dismiss() +// on the success path. If not dismissed, the destructor rebuilds the graph +// so the next request from the pool gets a clean graph. +class GraphReinitGuard { + GraphHelper& helper; + const ::mediapipe::CalculatorGraphConfig& config; + const GraphSidePackets& sidePacketMaps; + bool dismissed = false; + +public: + GraphReinitGuard(GraphHelper& helper, + const ::mediapipe::CalculatorGraphConfig& config, + const GraphSidePackets& sidePacketMaps) : + helper(helper), + config(config), + sidePacketMaps(sidePacketMaps) {} + void dismiss() { dismissed = true; } + ~GraphReinitGuard() { + if (!dismissed) { + try { + helper.reinitialize(config, sidePacketMaps); + } catch (const std::exception& e) { + SPDLOG_ERROR("GraphReinitGuard: reinitialize threw: {}", e.what()); + } catch (...) { + SPDLOG_ERROR("GraphReinitGuard: reinitialize threw unknown exception"); + } + } + } + GraphReinitGuard(const GraphReinitGuard&) = delete; + GraphReinitGuard& operator=(const GraphReinitGuard&) = delete; +}; + // we need to keep Graph alive during MP reload hence shared_ptr class GraphQueue : public Queue> { std::shared_ptr sidePacketMaps; diff --git a/src/mediapipe_internal/mediapipegraphexecutor.hpp b/src/mediapipe_internal/mediapipegraphexecutor.hpp index f0f0428740..8dae471191 100644 --- a/src/mediapipe_internal/mediapipegraphexecutor.hpp +++ b/src/mediapipe_internal/mediapipegraphexecutor.hpp @@ -186,7 +186,6 @@ class MediapipeGraphExecutor { return Status(StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR, "Input side packets are not supported for graphs with queue enabled"); } - ::mediapipe::CalculatorGraph& graph = this->guard->graph; auto llmContextStatus = initializeLlmExecutionContexts(this->sidePacketMaps.genAiServableMap, this->guard->graphHelper->genAiExecutionContextMap); if (!llmContextStatus.ok()) { return llmContextStatus; @@ -199,7 +198,10 @@ class MediapipeGraphExecutor { guard->graphHelper->outStreamObservers.at(name)->current = std::make_shared>(name, this->outputTypes.at(name), *this, *request, *response); } + GraphReinitGuard reinitOnFailureGuard(*this->guard->graphHelper, this->config, this->sidePacketMaps); + size_t numberOfPacketsCreated = 0; + ::mediapipe::CalculatorGraph& graph = this->guard->graph; auto ovms_status = createAndPushPacketsImpl( std::shared_ptr(request, [](const RequestType*) {}), this->inputTypes, @@ -227,6 +229,7 @@ class MediapipeGraphExecutor { } resetLlmExecutionContexts(this->guard->graphHelper->genAiExecutionContextMap); MP_RETURN_ON_FAIL(status, "graph wait until idle", mediapipeAbslToOvmsStatus(status.code())); + reinitOnFailureGuard.dismiss(); // Increment timestamp for next request reusing this graph from the queue this->guard->graphHelper->currentTimestamp = ::mediapipe::Timestamp(this->guard->graphHelper->currentTimestamp.Value() + 1); SPDLOG_DEBUG("Received all output stream packets for graph: {}", this->name); @@ -363,7 +366,6 @@ class MediapipeGraphExecutor { "Input side packets are not supported for graphs with queue enabled"); } MetricGaugeGuard currentGraphs(this->mediapipeServableMetricReporter->currentGraphs.get()); - ::mediapipe::CalculatorGraph& graph = this->guard->graph; auto llmContextStatus = initializeLlmExecutionContexts(this->sidePacketMaps.genAiServableMap, this->guard->graphHelper->genAiExecutionContextMap); if (!llmContextStatus.ok()) { return llmContextStatus; @@ -393,7 +395,10 @@ class MediapipeGraphExecutor { executionContext, this->mediapipeServableMetricReporter); } + GraphReinitGuard reinitOnFailureGuard(*this->guard->graphHelper, this->config, this->sidePacketMaps); + size_t numberOfPacketsCreated = 0; + ::mediapipe::CalculatorGraph& graph = this->guard->graph; { OVMS_PROFILE_SCOPE("Mediapipe graph deserializing first request"); bool isSuccess = true; @@ -450,6 +455,7 @@ class MediapipeGraphExecutor { } resetLlmExecutionContexts(this->guard->graphHelper->genAiExecutionContextMap); MP_RETURN_ON_FAIL(status, "graph wait until idle", mediapipeAbslToOvmsStatus(status.code())); + reinitOnFailureGuard.dismiss(); // Increment timestamp for next request reusing this graph from the queue this->guard->graphHelper->currentTimestamp = ::mediapipe::Timestamp(this->guard->graphHelper->currentTimestamp.Value() + 1); SPDLOG_DEBUG("Graph {}: Done streaming execution (queue path)", this->name); diff --git a/src/test/mediapipe/calculators/BUILD b/src/test/mediapipe/calculators/BUILD index a62497082b..bc4d3eed6d 100644 --- a/src/test/mediapipe/calculators/BUILD +++ b/src/test/mediapipe/calculators/BUILD @@ -78,6 +78,7 @@ cc_library( "ovms_calculator.cc", "ovms_image_input_calculator.cc", "ovms_kfs_calculator.cc", + "error_on_negative_test_calculator.cpp", "streaming_test_calculator.cpp", "two_input_calculator.cpp", ], diff --git a/src/test/mediapipe/calculators/error_on_negative_test_calculator.cpp b/src/test/mediapipe/calculators/error_on_negative_test_calculator.cpp new file mode 100644 index 0000000000..b84d99ca65 --- /dev/null +++ b/src/test/mediapipe/calculators/error_on_negative_test_calculator.cpp @@ -0,0 +1,49 @@ +//***************************************************************************** +// Copyright 2026 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//***************************************************************************** +#include + +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include "mediapipe/framework/calculator_framework.h" +#pragma GCC diagnostic pop + +namespace mediapipe { + +class ErrorOnNegativeTestCalculator : public CalculatorBase { +public: + static absl::Status GetContract(CalculatorContract* cc) { + cc->Inputs().Index(0).Set(); + cc->Outputs().Index(0).Set(); + return absl::OkStatus(); + } + absl::Status Open(CalculatorContext* cc) final { return absl::OkStatus(); } + absl::Status Close(CalculatorContext* cc) final { return absl::OkStatus(); } + absl::Status Process(CalculatorContext* cc) final { + ov::Tensor input = cc->Inputs().Index(0).Get(); + if (static_cast(input.data())[0] < 0.0f) { + return absl::InvalidArgumentError("Negative input value"); + } + ov::Tensor output(input.get_element_type(), input.get_shape()); + std::memcpy(output.data(), input.data(), input.get_byte_size()); + cc->Outputs().Index(0).Add(new ov::Tensor(output), cc->InputTimestamp()); + return absl::OkStatus(); + } +}; + +REGISTER_CALCULATOR(ErrorOnNegativeTestCalculator); +} // namespace mediapipe diff --git a/src/test/mediapipeflow_test.cpp b/src/test/mediapipeflow_test.cpp index 4be6666009..7cbefc0051 100644 --- a/src/test/mediapipeflow_test.cpp +++ b/src/test/mediapipeflow_test.cpp @@ -3855,6 +3855,7 @@ TEST(WhitelistRegistered, MediapipeCalculatorsList) { "EndLoopTensorCalculator", "EndLoopTfLiteTensorCalculator", "ErrorInProcessTestCalculator", + "ErrorOnNegativeTestCalculator", "ExceptionDuringCloseCalculator", "ExceptionDuringGetContractCalculator", "ExceptionDuringOpenCalculator", @@ -4307,3 +4308,73 @@ TEST(MediapipeGraphQueueSizeDirective, EnvVarOVMS_GRAPH_QUEUE_OFF_NotSetDoesNotD ASSERT_EQ(status, ovms::StatusCode::OK); EXPECT_GT(def.getMediapipeGraphConfig().getInitialQueueSize(), 0); } + +// --- Graph queue reinit guard tests --- + +class UnaryQueueReinitTest : public ::testing::Test { +protected: + const std::string name{"reinit_test_graph"}; + const std::string version{"1"}; + ExecutionContext executionContext{ExecutionContext::Interface::GRPC, ExecutionContext::Method::ModelInfer}; + std::unique_ptr reporter; + std::shared_ptr sidePackets; + std::shared_ptr queue; + ::mediapipe::CalculatorGraphConfig config; + + void SetUp() override { + reporter = std::make_unique(nullptr, nullptr, ""); + sidePackets = std::make_shared(); + const std::string pbTxt{R"( +input_stream: "in" +output_stream: "out" +node { + calculator: "ErrorOnNegativeTestCalculator" + input_stream: "in" + output_stream: "out" +} + )"}; + ASSERT_TRUE(::google::protobuf::TextFormat::ParseFromString(pbTxt, &config)); + queue = std::make_shared(config, sidePackets, 1); + } + + void prepareInferRequest(KFSRequest& request, float value) { + request.Clear(); + *request.mutable_model_name() = "my_graph"; + *request.mutable_model_version() = "1"; + prepareKFSInferInputTensor(request, "in", std::tuple{{1}, ovms::Precision::FP32}, std::vector{value}, false); + request.mutable_parameters()->operator[]("OVMS_MP_TIMESTAMP").set_int64_param(0); + } +}; + +TEST_F(UnaryQueueReinitTest, GraphIsReinitializedAfterCalculatorError) { + KFSRequest request; + KFSResponse response; + { + GraphIdGuard guard(queue); + MediapipeGraphExecutor executor{ + name, version, config, + {{"in", mediapipe_packet_type_enum::OVTENSOR}}, + {{"out", mediapipe_packet_type_enum::OVTENSOR}}, + {"in"}, {"out"}, *sidePackets, nullptr, reporter.get(), + std::move(guard)}; + prepareInferRequest(request, -1.0f); + auto status = executor.infer(&request, &response, executionContext); + ASSERT_FALSE(status.ok()); + EXPECT_EQ(status.getCode(), StatusCode::MEDIAPIPE_EXECUTION_ERROR); + } + // Executor destroyed → GraphIdGuard returns graph to pool. + // The reinit guard rebuilt the graph before returning the error. + // Second request with valid (positive) input should succeed. + { + GraphIdGuard guard(queue); + MediapipeGraphExecutor executor{ + name, version, config, + {{"in", mediapipe_packet_type_enum::OVTENSOR}}, + {{"out", mediapipe_packet_type_enum::OVTENSOR}}, + {"in"}, {"out"}, *sidePackets, nullptr, reporter.get(), + std::move(guard)}; + prepareInferRequest(request, 2.0f); + auto status = executor.infer(&request, &response, executionContext); + ASSERT_TRUE(status.ok()); + } +} diff --git a/src/utils/BUILD b/src/utils/BUILD index 90bc499b81..752ea60773 100644 --- a/src/utils/BUILD +++ b/src/utils/BUILD @@ -34,8 +34,7 @@ ovms_cc_library( hdrs = ["env_guard.hpp",], srcs = ["env_guard.cpp",], deps = [ - "@ovms//src:libovmslogging", - "@com_google_googletest//:gtest", + "@com_github_gabime_spdlog//:spdlog", ], visibility = ["//visibility:public"], ) \ No newline at end of file diff --git a/src/utils/env_guard.cpp b/src/utils/env_guard.cpp index 4a5ee3d114..645d9233b2 100644 --- a/src/utils/env_guard.cpp +++ b/src/utils/env_guard.cpp @@ -15,7 +15,7 @@ //***************************************************************************** #include "env_guard.hpp" -#include "../logging.hpp" +#include #include