From 9736310614486238c89df8562b386f19a3a00ed6 Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 4 May 2026 16:59:45 +0200 Subject: [PATCH 1/8] feat: adjusted provider interface to match node execution result --- crates/taurus-core/src/runtime/remote/mod.rs | 4 ++-- .../providers/remote/nats_remote_runtime.rs | 22 +++++++++++++------ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/crates/taurus-core/src/runtime/remote/mod.rs b/crates/taurus-core/src/runtime/remote/mod.rs index 5b3cd9f..40c92fe 100644 --- a/crates/taurus-core/src/runtime/remote/mod.rs +++ b/crates/taurus-core/src/runtime/remote/mod.rs @@ -4,7 +4,7 @@ //! trait without coupling the core engine to a specific transport. use async_trait::async_trait; -use tucana::{aquila::ActionExecutionRequest, shared::Value}; +use tucana::{aquila::ActionExecutionRequest, shared::{NodeExecutionResult}}; use crate::types::errors::runtime_error::RuntimeError; @@ -19,5 +19,5 @@ pub struct RemoteExecution { #[async_trait] pub trait RemoteRuntime { /// Execute a remote node invocation and return its resulting value. - async fn execute_remote(&self, execution: RemoteExecution) -> Result; + async fn execute_remote(&self, execution: RemoteExecution) -> Result; } diff --git a/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs b/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs index e7b09e7..43f757e 100644 --- a/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs +++ b/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs @@ -4,7 +4,7 @@ use taurus_core::runtime::remote::{RemoteExecution, RemoteRuntime}; use taurus_core::types::errors::runtime_error::RuntimeError; use tonic::async_trait; use tucana::aquila::ActionExecutionResponse; -use tucana::shared::Value; +use tucana::shared::{NodeExecutionResult}; pub struct NATSRemoteRuntime { client: Client, @@ -18,7 +18,7 @@ impl NATSRemoteRuntime { #[async_trait] impl RemoteRuntime for NATSRemoteRuntime { - async fn execute_remote(&self, execution: RemoteExecution) -> Result { + async fn execute_remote(&self, execution: RemoteExecution) -> Result { let topic = format!( "action.{}.{}", execution.target_service, execution.request.execution_identifier @@ -43,8 +43,18 @@ impl RemoteRuntime for NATSRemoteRuntime { }; let decode_result = ActionExecutionResponse::decode(message.payload); - let _execution_result = match decode_result { - Ok(r) => r, + match decode_result { + Ok(r) => match r.node_result { + Some(res) => Ok(res), + None => { + log::error!("RemoteRuntimeExeption: recieved execution result without an body"); + return Err(RuntimeError::new( + "T-PROV-000003", + "RemoteRuntimeExeption", + "Recieved empty action execution response", + )); + }, + }, Err(err) => { log::error!( "RemoteRuntimeExeption: failed to decode NATS message: {}", @@ -56,8 +66,6 @@ impl RemoteRuntime for NATSRemoteRuntime { "Failed to read Remote Response", )); } - }; - - unimplemented!("Taurus needs to handle text executions (issue nr #185)") + } } } From 346d33a2e71d543d094cab933dddb56f2691b198 Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 4 May 2026 17:43:37 +0200 Subject: [PATCH 2/8] feat: updated test exec to latest tucana interface --- flows/03_for_each.json | 6 +++++- flows/05_if_control.json | 6 +++++- flows/06_if_else_control.json | 12 ++++++++++-- flows/07_simple_return.json | 18 +++++++++++++++--- flows/09_filter_return.json | 24 ++++++++++++++++++++---- flows/10_multiple_respond.json | 26 +++++++++++++++++++++----- 6 files changed, 76 insertions(+), 16 deletions(-) diff --git a/flows/03_for_each.json b/flows/03_for_each.json index 383688f..6851a59 100644 --- a/flows/03_for_each.json +++ b/flows/03_for_each.json @@ -61,7 +61,11 @@ "databaseId": "12", "runtimeParameterId": "consumer", "value": { - "nodeFunctionId": "6" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "6" + } } } ] diff --git a/flows/05_if_control.json b/flows/05_if_control.json index 6aa4251..bc9847f 100644 --- a/flows/05_if_control.json +++ b/flows/05_if_control.json @@ -148,7 +148,11 @@ "databaseId": "50", "runtimeParameterId": "runnable", "value": { - "nodeFunctionId": "12" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "12" + } } } ], diff --git a/flows/06_if_else_control.json b/flows/06_if_else_control.json index 04e9d12..4c05804 100644 --- a/flows/06_if_else_control.json +++ b/flows/06_if_else_control.json @@ -148,14 +148,22 @@ "databaseId": "63", "runtimeParameterId": "runnable", "value": { - "nodeFunctionId": "19" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "19" + } } }, { "databaseId": "68", "runtimeParameterId": "else_runnable", "value": { - "nodeFunctionId": "17" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "17" + } } } ] diff --git a/flows/07_simple_return.json b/flows/07_simple_return.json index 25111d8..eae7552 100644 --- a/flows/07_simple_return.json +++ b/flows/07_simple_return.json @@ -19,7 +19,11 @@ "expected_result": { "http_status_code": 200, "headers": {}, - "payload": [null, null, "username"] + "payload": [ + null, + null, + "username" + ] } } ], @@ -86,7 +90,11 @@ "databaseId": "31", "runtimeParameterId": "transform", "value": { - "nodeFunctionId": "10" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "10" + } } } ], @@ -137,7 +145,11 @@ "databaseId": "35", "runtimeParameterId": "runnable", "value": { - "nodeFunctionId": "7" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "7" + } } } ], diff --git a/flows/09_filter_return.json b/flows/09_filter_return.json index aa3cb79..accef8f 100644 --- a/flows/09_filter_return.json +++ b/flows/09_filter_return.json @@ -121,7 +121,11 @@ "databaseId": "15", "runtimeParameterId": "predicate", "value": { - "nodeFunctionId": "8" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "8" + } } } ], @@ -179,14 +183,22 @@ "databaseId": "21", "runtimeParameterId": "runnable", "value": { - "nodeFunctionId": "1" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "1" + } } }, { "databaseId": "22", "runtimeParameterId": "else_runnable", "value": { - "nodeFunctionId": "3" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "3" + } } } ], @@ -293,7 +305,11 @@ "databaseId": "24", "runtimeParameterId": "transform", "value": { - "nodeFunctionId": "6" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "6" + } } } ], diff --git a/flows/10_multiple_respond.json b/flows/10_multiple_respond.json index 6332d09..592b624 100644 --- a/flows/10_multiple_respond.json +++ b/flows/10_multiple_respond.json @@ -50,7 +50,7 @@ "headers": { "Content-Type": "text/plain" }, - "payload": null + "payload": [] }, "expected_result": { "headers": { @@ -108,7 +108,11 @@ "databaseId": "15", "runtimeParameterId": "predicate", "value": { - "nodeFunctionId": "8" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "8" + } } } ], @@ -166,14 +170,22 @@ "databaseId": "21", "runtimeParameterId": "runnable", "value": { - "nodeFunctionId": "1" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "1" + } } }, { "databaseId": "22", "runtimeParameterId": "else_runnable", "value": { - "nodeFunctionId": "3" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "3" + } } } ], @@ -280,7 +292,11 @@ "databaseId": "24", "runtimeParameterId": "transform", "value": { - "nodeFunctionId": "6" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "6" + } } } ], From 45cc1d1fba30584e1b8df401d6e0ce0c14ee9168 Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 4 May 2026 17:49:07 +0200 Subject: [PATCH 3/8] feat: refactored internal store to use node execution results instead of node results --- .../src/runtime/engine/compiler.rs | 55 ++++++++++++- .../src/runtime/engine/executor.rs | 82 +++++++++++-------- .../src/runtime/execution/render.rs | 15 ++-- .../src/runtime/execution/store.rs | 12 +-- .../src/runtime/execution/trace.rs | 19 ++--- .../src/runtime/execution/value_store.rs | 80 +++++++++++++----- crates/taurus-core/src/runtime/remote/mod.rs | 8 +- .../src/types/errors/runtime_error.rs | 36 +++++++- .../providers/remote/nats_remote_runtime.rs | 21 +++-- 9 files changed, 229 insertions(+), 99 deletions(-) diff --git a/crates/taurus-core/src/runtime/engine/compiler.rs b/crates/taurus-core/src/runtime/engine/compiler.rs index 33a5d8f..f0894ae 100644 --- a/crates/taurus-core/src/runtime/engine/compiler.rs +++ b/crates/taurus-core/src/runtime/engine/compiler.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; -use tucana::shared::{NodeFunction, node_value}; +use tucana::shared::{NodeFunction, node_value, sub_flow}; use crate::{ runtime::engine::model::{ @@ -27,6 +27,15 @@ pub enum CompileError { node_id: i64, parameter_index: usize, }, + SubFlowExecutionReferenceMissing { + node_id: i64, + parameter_index: usize, + }, + SubFlowFunctionIdentifierUnsupported { + node_id: i64, + parameter_index: usize, + function_identifier: String, + }, } impl CompileError { @@ -64,6 +73,29 @@ impl CompileError { node_id, parameter_index ), ), + CompileError::SubFlowExecutionReferenceMissing { + node_id, + parameter_index, + } => RuntimeError::new( + "T-CORE-000105", + "FlowCompileError", + format!( + "Node {} parameter {} sub_flow is missing execution reference", + node_id, parameter_index + ), + ), + CompileError::SubFlowFunctionIdentifierUnsupported { + node_id, + parameter_index, + function_identifier, + } => RuntimeError::new( + "T-CORE-000106", + "FlowCompileError", + format!( + "Node {} parameter {} uses unsupported sub_flow function identifier {}", + node_id, parameter_index, function_identifier + ), + ), } } } @@ -125,7 +157,26 @@ pub fn compile_flow( let arg = match value { node_value::Value::LiteralValue(v) => CompiledArg::Literal(v.clone()), node_value::Value::ReferenceValue(r) => CompiledArg::Reference(r.clone()), - node_value::Value::SubFlow(_sub_flow) => unimplemented!("Taurus needs to handle SubFlows (issue nr #184)"), + node_value::Value::SubFlow(sub_flow) => { + match sub_flow.execution_reference.as_ref() { + Some(sub_flow::ExecutionReference::StartingNodeId(node_id)) => { + CompiledArg::DeferredNode(*node_id) + } + Some(sub_flow::ExecutionReference::FunctionIdentifier(identifier)) => { + return Err(CompileError::SubFlowFunctionIdentifierUnsupported { + node_id: node.database_id, + parameter_index, + function_identifier: identifier.clone(), + }); + } + None => { + return Err(CompileError::SubFlowExecutionReferenceMissing { + node_id: node.database_id, + parameter_index, + }); + } + } + } }; parameters.push(CompiledParameter { diff --git a/crates/taurus-core/src/runtime/engine/executor.rs b/crates/taurus-core/src/runtime/engine/executor.rs index ea2acc1..f6fec8d 100644 --- a/crates/taurus-core/src/runtime/engine/executor.rs +++ b/crates/taurus-core/src/runtime/engine/executor.rs @@ -5,9 +5,10 @@ use std::collections::HashMap; use futures_lite::future::block_on; use tucana::aquila::ActionExecutionRequest; +use tucana::shared::node_execution_result::Result as TucanaNodeResult; use tucana::shared::reference_value::Target; use tucana::shared::value::Kind; -use tucana::shared::{Struct, Value}; +use tucana::shared::{NodeExecutionResult as TucanaNodeExecutionResult, Struct, Value}; use uuid::Uuid; use crate::handler::argument::{Argument, ParameterNode}; @@ -56,7 +57,7 @@ struct ExecutionResult { } /// Result of executing exactly one compiled node. -struct NodeExecutionResult { +struct NodeResult { signal: Signal, frame_id: Option, } @@ -150,25 +151,24 @@ impl<'a> EngineExecutor<'a> { } } - fn execute_single_node( - &self, - node_idx: usize, - value_store: &mut ValueStore, - ) -> NodeExecutionResult { + fn execute_single_node(&self, node_idx: usize, value_store: &mut ValueStore) -> NodeResult { let node = &self.flow.nodes[node_idx]; // InputType references resolve against the currently running node. value_store.set_current_node_id(node.id); let frame_id = self.trace_enter(node, value_store); let signal = match &node.execution_target { - NodeExecutionTarget::Local => self.execute_local_node(node, value_store, frame_id), + NodeExecutionTarget::Local => { + let signal = self.execute_local_node(node, value_store, frame_id); + self.commit_result(node.id, signal, value_store) + } NodeExecutionTarget::Remote { service } => { self.execute_remote_node(node, service, value_store, frame_id) } }; self.trace_exit(frame_id, &signal, value_store); - NodeExecutionResult { signal, frame_id } + NodeResult { signal, frame_id } } fn execute_local_node( @@ -190,14 +190,11 @@ impl<'a> EngineExecutor<'a> { let mut args = match self.build_args(node, value_store, frame_id) { Ok(args) => args, - Err(err) => { - value_store.insert_error(node.id, err.clone()); - return Signal::Failure(err); - } + Err(err) => return Signal::Failure(err), }; if let Some(signal) = self.force_eager_args(entry, &mut args, value_store, frame_id) { - return self.commit_result(node.id, signal, value_store); + return signal; } // Handler-owned runtime calls (for lazy args / callbacks) re-enter the same executor. @@ -211,8 +208,7 @@ impl<'a> EngineExecutor<'a> { child_result.signal }; - let signal = (entry.handler)(&args, value_store, &mut run); - self.commit_result(node.id, signal, value_store) + (entry.handler)(&args, value_store, &mut run) } fn execute_remote_node( @@ -225,20 +221,21 @@ impl<'a> EngineExecutor<'a> { let remote_runtime = match self.remote { Some(remote) => remote, None => { - return Signal::Failure(RuntimeError::new( - "T-CORE-000003", - "RemoteRuntimeNotConfigured", - "Remote runtime not configured", - )); + return self.commit_result( + node.id, + Signal::Failure(RuntimeError::new( + "T-CORE-000003", + "RemoteRuntimeNotConfigured", + "Remote runtime not configured", + )), + value_store, + ); } }; let mut args = match self.build_args(node, value_store, frame_id) { Ok(args) => args, - Err(err) => { - value_store.insert_error(node.id, err.clone()); - return Signal::Failure(err); - } + Err(err) => return self.commit_result(node.id, Signal::Failure(err), value_store), }; let values = match self.resolve_remote_args(&mut args, value_store, frame_id) { @@ -248,21 +245,16 @@ impl<'a> EngineExecutor<'a> { let request = match self.build_remote_request(node, values) { Ok(request) => request, - Err(err) => { - value_store.insert_error(node.id, err.clone()); - return Signal::Failure(err); - } + Err(err) => return self.commit_result(node.id, Signal::Failure(err), value_store), }; - let signal = match block_on(remote_runtime.execute_remote(RemoteExecution { + match block_on(remote_runtime.execute_remote(RemoteExecution { target_service: service.to_string(), request, })) { - Ok(value) => Signal::Success(value), - Err(err) => Signal::Failure(err), - }; - - self.commit_result(node.id, signal, value_store) + Ok(result) => self.commit_remote_result(node.id, result, value_store), + Err(err) => self.commit_result(node.id, Signal::Failure(err), value_store), + } } fn build_args( @@ -522,6 +514,26 @@ impl<'a> EngineExecutor<'a> { } } + fn commit_remote_result( + &self, + node_id: i64, + result: TucanaNodeExecutionResult, + value_store: &mut ValueStore, + ) -> Signal { + value_store.insert_node_result(node_id, result.clone()); + match result.result { + Some(TucanaNodeResult::Success(value)) => Signal::Success(value), + Some(TucanaNodeResult::Error(error)) => { + Signal::Failure(RuntimeError::from_tucana_error(&error)) + } + None => Signal::Failure(RuntimeError::new( + "T-CORE-000006", + "NodeExecutionResultMissingOutcome", + "Remote node execution result is missing success/error outcome", + )), + } + } + fn trace_enter(&self, node: &CompiledNode, value_store: &ValueStore) -> Option { self.tracer.map(|tracer| { tracer.borrow_mut().enter_node( diff --git a/crates/taurus-core/src/runtime/execution/render.rs b/crates/taurus-core/src/runtime/execution/render.rs index d34a130..e0442c1 100644 --- a/crates/taurus-core/src/runtime/execution/render.rs +++ b/crates/taurus-core/src/runtime/execution/render.rs @@ -2,8 +2,10 @@ use std::collections::HashMap; +use tucana::shared::node_execution_result::Result as TucanaNodeResult; + use crate::runtime::execution::trace::{ - ArgKind, EdgeKind, Outcome, StoreDiff, StoreResultStatus, TraceFrame, TraceRun, + ArgKind, EdgeKind, Outcome, StoreDiff, TraceFrame, TraceRun, }; struct TraceTheme; @@ -266,14 +268,17 @@ fn render_store_diff( } for set in &diff.result_sets { - let status = match set.status { - StoreResultStatus::Success => "success", - StoreResultStatus::Error => "error", + let status = match &set.result.result { + Some(TucanaNodeResult::Success(_)) => "success", + Some(TucanaNodeResult::Error(_)) => "error", + None => "empty", }; out.push_str(&format!( - "{step:04} {prefix}{continuation}{store:<5} result.set node={} [{}] {}\n", + "{step:04} {prefix}{continuation}{store:<5} result.set node={} [{}] started_at={} finished_at={} {}\n", set.node_id, status, + set.result.started_at, + set.result.finished_at, set.preview, step = *step, prefix = prefix, diff --git a/crates/taurus-core/src/runtime/execution/store.rs b/crates/taurus-core/src/runtime/execution/store.rs index 371647d..3cde097 100644 --- a/crates/taurus-core/src/runtime/execution/store.rs +++ b/crates/taurus-core/src/runtime/execution/store.rs @@ -2,18 +2,10 @@ use std::collections::HashMap; -use tucana::shared::Value; +use tucana::shared::{NodeExecutionResult, Value}; -use crate::types::errors::runtime_error::RuntimeError; use crate::types::execution::ids::{FrameId, NodeId}; -/// Runtime outcome persisted per node. -#[derive(Debug, Clone)] -pub enum NodeOutcome { - Success(Value), - Failure(RuntimeError), -} - /// Input slot key for runtime-provided temporary inputs (for iterators/predicates). #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct InputSlotKey { @@ -25,7 +17,7 @@ pub struct InputSlotKey { /// Store that captures mutable runtime execution state. #[derive(Debug, Clone, Default)] pub struct ExecutionStore { - pub node_outcomes: HashMap, + pub node_results: HashMap, pub input_slots: HashMap, pub flow_input: Option, pub current_node: Option, diff --git a/crates/taurus-core/src/runtime/execution/trace.rs b/crates/taurus-core/src/runtime/execution/trace.rs index 9733566..940656f 100644 --- a/crates/taurus-core/src/runtime/execution/trace.rs +++ b/crates/taurus-core/src/runtime/execution/trace.rs @@ -9,6 +9,8 @@ use std::collections::HashMap; use std::time::Instant; +use tucana::shared::NodeExecutionResult; + /// Relationship between two execution frames. #[derive(Debug, Clone)] pub enum EdgeKind { @@ -69,20 +71,13 @@ pub enum Outcome { } /// One stored node result entry at snapshot time. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq)] pub struct StoreResultEntry { pub node_id: i64, - pub status: StoreResultStatus, + pub result: NodeExecutionResult, pub preview: String, } -/// Result status in the value store. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum StoreResultStatus { - Success, - Error, -} - /// One temporary input slot entry at snapshot time. #[derive(Debug, Clone, PartialEq, Eq)] pub struct StoreInputSlotEntry { @@ -93,7 +88,7 @@ pub struct StoreInputSlotEntry { } /// Value store snapshot attached to a frame boundary. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq)] pub struct StoreSnapshot { pub current_node_id: i64, pub flow_input_preview: String, @@ -102,7 +97,7 @@ pub struct StoreSnapshot { } /// Per-frame store changes between `store_before` and `store_after`. -#[derive(Debug, Clone, Default, PartialEq, Eq)] +#[derive(Debug, Clone, Default, PartialEq)] pub struct StoreDiff { pub current_node_changed: Option<(i64, i64)>, pub result_sets: Vec, @@ -129,7 +124,7 @@ impl StoreDiff { for (node_id, after_entry) in &after_results { match before_results.get(node_id) { Some(before_entry) - if before_entry.status == after_entry.status + if before_entry.result == after_entry.result && before_entry.preview == after_entry.preview => {} _ => result_sets.push((*after_entry).clone()), } diff --git a/crates/taurus-core/src/runtime/execution/value_store.rs b/crates/taurus-core/src/runtime/execution/value_store.rs index 20d852d..88af691 100644 --- a/crates/taurus-core/src/runtime/execution/value_store.rs +++ b/crates/taurus-core/src/runtime/execution/value_store.rs @@ -1,12 +1,12 @@ //! Mutable value store used by runtime execution to resolve references. use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; -use tucana::shared::{InputType, ReferenceValue, Value, value::Kind}; +use tucana::shared::node_execution_result::Result as TucanaNodeResult; +use tucana::shared::{InputType, NodeExecutionResult, ReferenceValue, Value, value::Kind}; -use crate::runtime::execution::trace::{ - StoreInputSlotEntry, StoreResultEntry, StoreResultStatus, StoreSnapshot, -}; +use crate::runtime::execution::trace::{StoreInputSlotEntry, StoreResultEntry, StoreSnapshot}; use crate::types::errors::runtime_error::RuntimeError; #[derive(Clone)] @@ -18,7 +18,7 @@ pub enum ValueStoreResult { #[derive(Default)] pub struct ValueStore { - results: HashMap, + results: HashMap, input_types: HashMap, flow_input: Value, current_node_id: i64, @@ -101,7 +101,13 @@ impl ValueStore { fn get_result(&mut self, id: i64) -> ValueStoreResult { match self.results.get(&id) { - Some(result) => result.clone(), + Some(result) => match &result.result { + Some(TucanaNodeResult::Success(value)) => ValueStoreResult::Success(value.clone()), + Some(TucanaNodeResult::Error(error)) => { + ValueStoreResult::Error(RuntimeError::from_tucana_error(error)) + } + None => ValueStoreResult::NotFound, + }, None => ValueStoreResult::NotFound, } } @@ -130,12 +136,36 @@ impl ValueStore { } pub fn insert_success(&mut self, id: i64, value: Value) { - self.results.insert(id, ValueStoreResult::Success(value)); + let ts = now_unix_ms(); + self.insert_node_result( + id, + NodeExecutionResult { + node_id: id, + started_at: ts, + finished_at: ts, + parameter_results: Vec::new(), + result: Some(TucanaNodeResult::Success(value)), + }, + ); } pub fn insert_error(&mut self, id: i64, runtime_error: RuntimeError) { - self.results - .insert(id, ValueStoreResult::Error(runtime_error)); + let ts = now_unix_ms(); + self.insert_node_result( + id, + NodeExecutionResult { + node_id: id, + started_at: ts, + finished_at: ts, + parameter_results: Vec::new(), + result: Some(TucanaNodeResult::Error(runtime_error.as_tucana_error())), + }, + ); + } + + pub fn insert_node_result(&mut self, id: i64, mut result: NodeExecutionResult) { + result.node_id = id; + self.results.insert(id, result); } pub fn push_runtime_trace_label(&mut self, label: String) { @@ -149,19 +179,18 @@ impl ValueStore { pub fn trace_snapshot(&self) -> StoreSnapshot { let mut results = Vec::with_capacity(self.results.len()); for (node_id, result) in &self.results { - match result { - ValueStoreResult::Success(value) => results.push(StoreResultEntry { - node_id: *node_id, - status: StoreResultStatus::Success, - preview: preview_value(value), - }), - ValueStoreResult::Error(err) => results.push(StoreResultEntry { - node_id: *node_id, - status: StoreResultStatus::Error, - preview: format!("{}:{} {}", err.code, err.category, err.message), - }), - ValueStoreResult::NotFound => {} - } + let preview = match &result.result { + Some(TucanaNodeResult::Success(value)) => preview_value(value), + Some(TucanaNodeResult::Error(err)) => { + format!("{}:{} {}", err.code, err.category, err.message) + } + None => "empty-result".to_string(), + }; + results.push(StoreResultEntry { + node_id: *node_id, + result: result.clone(), + preview, + }); } results.sort_by_key(|entry| entry.node_id); @@ -185,6 +214,13 @@ impl ValueStore { } } +fn now_unix_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|it| it.as_millis() as i64) + .unwrap_or(0) +} + fn preview_value(value: &Value) -> String { match value.kind.as_ref() { Some(Kind::NumberValue(v)) => crate::value::number_to_string(v), diff --git a/crates/taurus-core/src/runtime/remote/mod.rs b/crates/taurus-core/src/runtime/remote/mod.rs index 40c92fe..f94c3ec 100644 --- a/crates/taurus-core/src/runtime/remote/mod.rs +++ b/crates/taurus-core/src/runtime/remote/mod.rs @@ -4,7 +4,7 @@ //! trait without coupling the core engine to a specific transport. use async_trait::async_trait; -use tucana::{aquila::ActionExecutionRequest, shared::{NodeExecutionResult}}; +use tucana::{aquila::ActionExecutionRequest, shared::NodeExecutionResult}; use crate::types::errors::runtime_error::RuntimeError; @@ -13,11 +13,13 @@ pub struct RemoteExecution { pub target_service: String, /// Execution request payload expected by the remote runtime. pub request: ActionExecutionRequest, - } #[async_trait] pub trait RemoteRuntime { /// Execute a remote node invocation and return its resulting value. - async fn execute_remote(&self, execution: RemoteExecution) -> Result; + async fn execute_remote( + &self, + execution: RemoteExecution, + ) -> Result; } diff --git a/crates/taurus-core/src/types/errors/runtime_error.rs b/crates/taurus-core/src/types/errors/runtime_error.rs index edafa7c..3f91ea3 100644 --- a/crates/taurus-core/src/types/errors/runtime_error.rs +++ b/crates/taurus-core/src/types/errors/runtime_error.rs @@ -12,7 +12,9 @@ use std::fmt::{Display, Formatter}; use std::time::{SystemTime, UNIX_EPOCH}; use tucana::shared::value::Kind::{NumberValue, StringValue, StructValue}; -use tucana::shared::{NumberValue as ProtoNumberValue, Struct, Value, number_value}; +use tucana::shared::{ + Error as TucanaError, NumberValue as ProtoNumberValue, Struct, Value, number_value, +}; /// Runtime execution failure representation. #[derive(Debug, Clone, PartialEq)] @@ -140,6 +142,38 @@ impl RuntimeError { })), } } + + /// Convert to transport-level shared error object. + pub fn as_tucana_error(&self) -> TucanaError { + TucanaError { + code: self.code.clone(), + category: self.category.clone(), + message: self.message.clone(), + timestamp: self.timestamp_unix_ms as i64, + version: self.version.clone(), + dependencies: self.dependencies.clone(), + details: Some(Struct { + fields: self.details.clone(), + }), + } + } + + /// Build a runtime error from a transport-level shared error object. + pub fn from_tucana_error(error: &TucanaError) -> Self { + Self { + code: error.code.clone(), + category: error.category.clone(), + message: error.message.clone(), + timestamp_unix_ms: error.timestamp.max(0) as u64, + version: error.version.clone(), + dependencies: error.dependencies.clone(), + details: error + .details + .as_ref() + .map(|it| it.fields.clone()) + .unwrap_or_default(), + } + } } impl Default for RuntimeError { diff --git a/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs b/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs index 43f757e..cd472c8 100644 --- a/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs +++ b/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs @@ -4,7 +4,7 @@ use taurus_core::runtime::remote::{RemoteExecution, RemoteRuntime}; use taurus_core::types::errors::runtime_error::RuntimeError; use tonic::async_trait; use tucana::aquila::ActionExecutionResponse; -use tucana::shared::{NodeExecutionResult}; +use tucana::shared::NodeExecutionResult; pub struct NATSRemoteRuntime { client: Client, @@ -18,7 +18,10 @@ impl NATSRemoteRuntime { #[async_trait] impl RemoteRuntime for NATSRemoteRuntime { - async fn execute_remote(&self, execution: RemoteExecution) -> Result { + async fn execute_remote( + &self, + execution: RemoteExecution, + ) -> Result { let topic = format!( "action.{}.{}", execution.target_service, execution.request.execution_identifier @@ -47,13 +50,13 @@ impl RemoteRuntime for NATSRemoteRuntime { Ok(r) => match r.node_result { Some(res) => Ok(res), None => { - log::error!("RemoteRuntimeExeption: recieved execution result without an body"); - return Err(RuntimeError::new( - "T-PROV-000003", - "RemoteRuntimeExeption", - "Recieved empty action execution response", - )); - }, + log::error!("RemoteRuntimeExeption: recieved execution result without an body"); + return Err(RuntimeError::new( + "T-PROV-000003", + "RemoteRuntimeExeption", + "Recieved empty action execution response", + )); + } }, Err(err) => { log::error!( From e4e015cd7325b27c979b7d349eb5c71b161a865a Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 4 May 2026 18:05:49 +0200 Subject: [PATCH 4/8] feat: taurus will react to nats subject test_execution --- crates/taurus/src/app/worker.rs | 254 +++++++++++++++++++++++++++++--- 1 file changed, 232 insertions(+), 22 deletions(-) diff --git a/crates/taurus/src/app/worker.rs b/crates/taurus/src/app/worker.rs index 7ee59b4..2a16768 100644 --- a/crates/taurus/src/app/worker.rs +++ b/crates/taurus/src/app/worker.rs @@ -1,12 +1,16 @@ use std::time::Instant; +use std::time::{SystemTime, UNIX_EPOCH}; use futures_lite::StreamExt; use prost::Message; use taurus_core::runtime::engine::{EmitType, ExecutionEngine, ExecutionId, RespondEmitter}; +use taurus_core::types::errors::runtime_error::RuntimeError; +use taurus_core::types::signal::Signal; use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter; use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime; use tokio::task::JoinHandle; -use tucana::shared::{ExecutionFlow, RuntimeUsage, Value}; +use tucana::shared::execution_result; +use tucana::shared::{ExecutionFlow, ExecutionResult, RuntimeUsage, Value}; use crate::client::runtime_usage::TaurusRuntimeUsageService; @@ -18,7 +22,7 @@ pub fn spawn_worker( runtime_usage_service: Option, ) -> JoinHandle<()> { tokio::spawn(async move { - let mut subscription = match client + let mut execution_subscription = match client .queue_subscribe(String::from("execution.*"), "taurus".into()) .await { @@ -32,30 +36,75 @@ pub fn spawn_worker( } }; - while let Some(message) = subscription.next().await { - process_message( - message, - &engine, - &nats_remote, - &runtime_emitter, - runtime_usage_service.as_ref(), - ) - .await; + let mut test_execution_subscription = match client + .queue_subscribe(String::from("test_executions.*"), "taurus".into()) + .await + { + Ok(subscription) => { + log::info!("Subscribed to 'test_executions.*'"); + subscription + } + Err(err) => { + log::error!("Failed to subscribe to 'test_executions.*': {:?}", err); + return; + } + }; + + let mut execution_closed = false; + let mut test_execution_closed = false; + + while !(execution_closed && test_execution_closed) { + tokio::select! { + message = execution_subscription.next(), if !execution_closed => { + match message { + Some(message) => { + process_execution_message( + message, + &engine, + &nats_remote, + &runtime_emitter, + runtime_usage_service.as_ref(), + ).await; + } + None => { + execution_closed = true; + log::warn!("Subscription 'execution.*' ended"); + } + } + } + message = test_execution_subscription.next(), if !test_execution_closed => { + match message { + Some(message) => { + process_test_execution_message( + &client, + message, + &engine, + &nats_remote, + runtime_usage_service.as_ref(), + ).await; + } + None => { + test_execution_closed = true; + log::warn!("Subscription 'test_executions.*' ended"); + } + } + } + } } log::info!("NATS worker loop ended"); }) } -async fn process_message( +async fn process_execution_message( message: async_nats::Message, engine: &ExecutionEngine, nats_remote: &NATSRemoteRuntime, runtime_emitter: &NATSRespondEmitter, runtime_usage_service: Option<&TaurusRuntimeUsageService>, ) { - let requested_execution_id = - parse_execution_id_from_subject(&message.subject).unwrap_or_else(|| { + let requested_execution_id = parse_execution_id_from_subject(&message.subject, "execution") + .unwrap_or_else(|| { let generated = ExecutionId::new_v4(); log::warn!( "Expected subject format 'execution.', got '{}'; generated execution id {}", @@ -83,7 +132,7 @@ async fn process_message( let respond_emitter = |execution_id, emit_type: EmitType, value: Value| { runtime_emitter.emit(execution_id, emit_type, value); }; - let runtime_usage = execute_flow( + let run_result = execute_flow( requested_execution_id, flow, engine, @@ -96,39 +145,200 @@ async fn process_message( ); if let Some(usage_service) = runtime_usage_service { - usage_service.update_runtime_usage(runtime_usage).await; + usage_service + .update_runtime_usage(run_result.runtime_usage) + .await; } } +async fn process_test_execution_message( + client: &async_nats::Client, + message: async_nats::Message, + engine: &ExecutionEngine, + nats_remote: &NATSRemoteRuntime, + runtime_usage_service: Option<&TaurusRuntimeUsageService>, +) { + let requested_execution_id = + match parse_execution_id_from_subject(&message.subject, "test_executions") { + Some(res) => res, + None => { + log::error!("Failed to extract execution uuid from {}", &message.subject); + return; + } + }; + + let flow: ExecutionFlow = match ExecutionFlow::decode(&*message.payload) { + Ok(flow) => flow, + Err(err) => { + log::error!( + "Failed to deserialize test execution flow: {:?}, payload: {:?}", + err, + &message.payload + ); + let result = build_decode_error_result(requested_execution_id); + respond_to_test_execution_request(client, &message, result).await; + return; + } + }; + + let run_result = execute_flow(requested_execution_id, flow, engine, nats_remote, None); + + if let Some(usage_service) = runtime_usage_service { + usage_service + .update_runtime_usage(run_result.runtime_usage.clone()) + .await; + } + + let execution_result = build_execution_result( + run_result.execution_id, + run_result.flow_id, + run_result.started_at, + run_result.finished_at, + run_result.input, + run_result.signal, + ); + respond_to_test_execution_request(client, &message, execution_result).await; +} + +#[derive(Clone)] +struct FlowRunResult { + execution_id: ExecutionId, + flow_id: i64, + started_at: i64, + finished_at: i64, + input: Option, + signal: Signal, + runtime_usage: RuntimeUsage, +} + fn execute_flow( execution_id: ExecutionId, flow: ExecutionFlow, engine: &ExecutionEngine, nats_remote: &NATSRemoteRuntime, respond_emitter: Option<&dyn RespondEmitter>, -) -> RuntimeUsage { +) -> FlowRunResult { + let started_at = now_unix_ms(); let start = Instant::now(); let flow_id = flow.flow_id; - let (_signal, _reason) = engine.execute_flow_with_execution_id( + let input = flow.input_value.clone(); + let (signal, _reason) = engine.execute_flow_with_execution_id( execution_id, flow, Some(nats_remote), respond_emitter, true, ); + let finished_at = now_unix_ms(); let duration_millis = start.elapsed().as_millis() as i64; - RuntimeUsage { + FlowRunResult { + execution_id, flow_id, - duration: duration_millis, + started_at, + finished_at, + input, + signal, + runtime_usage: RuntimeUsage { + flow_id, + duration: duration_millis, + }, } } -fn parse_execution_id_from_subject(subject: &async_nats::Subject) -> Option { +fn parse_execution_id_from_subject( + subject: &async_nats::Subject, + prefix: &str, +) -> Option { let raw = subject.as_str(); let mut parts = raw.split('.'); match (parts.next(), parts.next(), parts.next()) { - (Some("execution"), Some(uuid), None) => ExecutionId::parse_str(uuid).ok(), + (Some(found_prefix), Some(uuid), None) if found_prefix == prefix => { + ExecutionId::parse_str(uuid).ok() + } _ => None, } } + +fn build_execution_result( + execution_id: ExecutionId, + flow_id: i64, + started_at: i64, + finished_at: i64, + input: Option, + signal: Signal, +) -> ExecutionResult { + let result = match signal { + Signal::Success(value) | Signal::Return(value) | Signal::Respond(value) => { + Some(execution_result::Result::Success(value)) + } + Signal::Failure(err) => Some(execution_result::Result::Error(err.as_tucana_error())), + Signal::Stop => Some(execution_result::Result::Success(Value { + kind: Some(tucana::shared::value::Kind::NullValue(0)), + })), + }; + + ExecutionResult { + execution_identifier: execution_id.to_string(), + flow_id, + started_at, + finished_at, + input, + node_execution_results: Vec::new(), + result, + } +} + +fn build_decode_error_result(execution_id: ExecutionId) -> ExecutionResult { + let now = now_unix_ms(); + let runtime_error = RuntimeError::new( + "T-TAURUS-000001", + "ExecutionFlowDecodeError", + "Failed to decode test execution flow payload", + ); + + ExecutionResult { + execution_identifier: execution_id.to_string(), + flow_id: 0, + started_at: now, + finished_at: now, + input: None, + node_execution_results: Vec::new(), + result: Some(execution_result::Result::Error( + runtime_error.as_tucana_error(), + )), + } +} + +async fn respond_to_test_execution_request( + client: &async_nats::Client, + message: &async_nats::Message, + result: ExecutionResult, +) { + let Some(reply_subject) = message.reply.as_ref() else { + log::warn!( + "Received test execution request without reply subject on '{}'; cannot return ExecutionResult", + message.subject + ); + return; + }; + + if let Err(err) = client + .publish(reply_subject.clone(), result.encode_to_vec().into()) + .await + { + log::error!( + "Failed to publish test execution response on '{}': {:?}", + reply_subject, + err + ); + } +} + +fn now_unix_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|it| it.as_millis() as i64) + .unwrap_or(0) +} + From 3dd00d5a138174fd9d5f27e59c206c55f45ab014 Mon Sep 17 00:00:00 2001 From: Raphael Date: Sat, 30 May 2026 15:48:58 +0200 Subject: [PATCH 5/8] feat: added nats to workflow --- .github/workflows/build-and-test.yml | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 326c472..30aff8f 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -6,6 +6,11 @@ on: jobs: taurus: runs-on: ubuntu-latest + services: + nats: + image: nats:2 + ports: + - 4222:4222 steps: - uses: actions/checkout@v6 @@ -15,8 +20,11 @@ jobs: run: PATH=${{ runner.temp }}/proto/bin:$PATH cargo build env: RUST_BACKTRACE: 'full' - - name: Run Tests - run: cargo test - - name: Run Test Suite + - name: Run workspace tests + run: cargo test --workspace --all-targets + - name: Run NATS test execution request tests + run: cargo test -p taurus test_execution_request -- --ignored --nocapture + env: + NATS_URL: nats://127.0.0.1:4222 + - name: Run tests package flow suite run: cargo run --package tests - From 78ea0ce7316a006b6994df66740db30771c699ab Mon Sep 17 00:00:00 2001 From: Raphael Date: Sat, 30 May 2026 15:49:11 +0200 Subject: [PATCH 6/8] dependencies: added serde as dev dependency --- Cargo.lock | 2 ++ crates/taurus/Cargo.toml | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index baa2632..833730b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1812,6 +1812,8 @@ dependencies = [ "log", "prost", "rand 0.10.1", + "serde", + "serde_json", "taurus-core", "taurus-provider", "tokio", diff --git a/crates/taurus/Cargo.toml b/crates/taurus/Cargo.toml index b6cb26a..9d54c3a 100644 --- a/crates/taurus/Cargo.toml +++ b/crates/taurus/Cargo.toml @@ -18,3 +18,7 @@ tonic-health = { workspace = true } tonic = { workspace = true } taurus-core = { workspace = true } taurus-provider = { workspace = true } + +[dev-dependencies] +serde = { workspace = true } +serde_json = { workspace = true } From 58b945d886a43f1d750356f67a4fb9d33fac33d4 Mon Sep 17 00:00:00 2001 From: Raphael Date: Sat, 30 May 2026 15:49:28 +0200 Subject: [PATCH 7/8] feat: added execution test --- crates/taurus/src/app/worker.rs | 193 ++++++++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) diff --git a/crates/taurus/src/app/worker.rs b/crates/taurus/src/app/worker.rs index 2a16768..d53486f 100644 --- a/crates/taurus/src/app/worker.rs +++ b/crates/taurus/src/app/worker.rs @@ -342,3 +342,196 @@ fn now_unix_ms() -> i64 { .unwrap_or(0) } +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + use std::time::Duration; + + use prost::Message; + use serde::Deserialize; + use taurus_core::runtime::engine::ExecutionEngine; + use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter; + use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime; + use tucana::shared::{ + ValidationFlow, execution_result, + helper::value::{from_json_value, to_json_value}, + }; + + #[derive(Deserialize)] + struct FixtureInput { + input: Option, + expected_result: serde_json::Value, + } + + #[derive(Deserialize)] + struct FlowFixture { + inputs: Vec, + flow: ValidationFlow, + } + + static NATS_TEST_LOCK: Mutex<()> = Mutex::new(()); + + #[test] + #[ignore = "requires a running NATS server at NATS_URL or nats://127.0.0.1:4222"] + fn test_execution_request_returns_execution_result_over_nats() { + let _lock = NATS_TEST_LOCK + .lock() + .expect("NATS test lock should not be poisoned"); + runtime().block_on(async { + let client = connect_test_nats().await; + let worker = spawn_test_worker(client.clone()); + wait_for_worker_subscription().await; + + let execution_id = ExecutionId::new_v4(); + let fixture = load_fixture("flows/01_return_object.json"); + let expected_result = fixture.inputs[0].expected_result.clone(); + let flow = execution_flow_from_fixture(fixture); + let response = request_execution_result(&client, execution_id, flow.encode_to_vec()) + .await + .expect("test execution request should receive an ExecutionResult response"); + + worker.abort(); + + assert_eq!(response.execution_identifier, execution_id.to_string()); + assert_eq!(response.flow_id, flow.flow_id); + assert!(response.started_at > 0); + assert!(response.finished_at >= response.started_at); + assert_eq!(response.input, None); + assert!(response.node_execution_results.is_empty()); + + match response.result { + Some(execution_result::Result::Success(value)) => { + assert_eq!(to_json_value(value), expected_result); + } + other => panic!("expected successful test execution result, got {:?}", other), + } + }); + } + + #[test] + #[ignore = "requires a running NATS server at NATS_URL or nats://127.0.0.1:4222"] + fn test_execution_request_returns_decode_error_over_nats() { + let _lock = NATS_TEST_LOCK + .lock() + .expect("NATS test lock should not be poisoned"); + runtime().block_on(async { + let client = connect_test_nats().await; + let worker = spawn_test_worker(client.clone()); + wait_for_worker_subscription().await; + + let execution_id = ExecutionId::new_v4(); + let response = + request_execution_result(&client, execution_id, b"not protobuf".to_vec()) + .await + .expect("malformed test execution request should receive an error response"); + + worker.abort(); + + assert_eq!(response.execution_identifier, execution_id.to_string()); + assert_eq!(response.flow_id, 0); + assert!(response.started_at > 0); + assert!(response.finished_at >= response.started_at); + assert_eq!(response.input, None); + assert!(response.node_execution_results.is_empty()); + + match response.result { + Some(execution_result::Result::Error(error)) => { + assert_eq!(error.code, "T-TAURUS-000001"); + assert_eq!(error.category, "ExecutionFlowDecodeError"); + assert_eq!( + error.message, + "Failed to decode test execution flow payload" + ); + } + other => panic!("expected decode error result, got {:?}", other), + } + }); + } + + fn runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("test runtime should build") + } + + async fn connect_test_nats() -> async_nats::Client { + let nats_url = + std::env::var("NATS_URL").unwrap_or_else(|_| "nats://127.0.0.1:4222".to_string()); + async_nats::connect(&nats_url) + .await + .unwrap_or_else(|err| panic!("failed to connect to NATS at {nats_url}: {err}")) + } + + fn spawn_test_worker(client: async_nats::Client) -> tokio::task::JoinHandle<()> { + let engine = ExecutionEngine::new(); + let nats_remote = NATSRemoteRuntime::new(client.clone()); + let runtime_emitter = NATSRespondEmitter::new(client.clone()); + spawn_worker(client, engine, nats_remote, runtime_emitter, None) + } + + async fn wait_for_worker_subscription() { + tokio::time::sleep(Duration::from_millis(250)).await; + } + + async fn request_execution_result( + client: &async_nats::Client, + execution_id: ExecutionId, + payload: Vec, + ) -> Result { + let subject = format!("test_executions.{execution_id}"); + + for attempt in 1..=10 { + match tokio::time::timeout( + Duration::from_secs(2), + client.request(subject.clone(), payload.clone().into()), + ) + .await + { + Ok(Ok(message)) => { + return ExecutionResult::decode(&*message.payload) + .map_err(|err| format!("failed to decode ExecutionResult: {err}")); + } + Ok(Err(err)) if attempt < 10 => { + log::debug!( + "test execution request attempt {} failed before subscription was ready: {:?}", + attempt, + err + ); + tokio::time::sleep(Duration::from_millis(100)).await; + } + Ok(Err(err)) => return Err(format!("NATS request failed: {err}")), + Err(_) if attempt < 10 => { + tokio::time::sleep(Duration::from_millis(100)).await; + } + Err(_) => return Err("timed out waiting for test execution response".to_string()), + } + } + + Err("test execution request did not complete".to_string()) + } + + fn load_fixture(path: &str) -> FlowFixture { + let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .join("../..") + .join(path); + let content = std::fs::read_to_string(&path) + .unwrap_or_else(|err| panic!("failed to read fixture {}: {err}", path.display())); + serde_json::from_str(&content) + .unwrap_or_else(|err| panic!("failed to parse fixture {}: {err}", path.display())) + } + + fn execution_flow_from_fixture(fixture: FlowFixture) -> ExecutionFlow { + ExecutionFlow { + flow_id: fixture.flow.flow_id, + project_id: fixture.flow.project_id, + starting_node_id: fixture.flow.starting_node_id, + node_functions: fixture.flow.node_functions, + input_value: fixture + .inputs + .first() + .and_then(|input| input.input.clone().map(from_json_value)), + } + } +} From 99c3af9e3a1fb95d812079e97f153cd7fa322d9a Mon Sep 17 00:00:00 2001 From: Raphael Date: Sat, 30 May 2026 16:05:47 +0200 Subject: [PATCH 8/8] feat: adjustments from code review --- crates/taurus-core/src/lib.rs | 1 + crates/taurus-core/src/runtime/engine.rs | 12 ++++--- .../src/runtime/execution/value_store.rs | 18 +++++----- crates/taurus-core/src/time.rs | 10 ++++++ .../src/types/errors/runtime_error.rs | 12 ++----- .../providers/remote/nats_remote_runtime.rs | 18 +++++----- crates/taurus/src/app/worker.rs | 35 ++++++++----------- docs/errors.md | 5 +++ 8 files changed, 59 insertions(+), 52 deletions(-) create mode 100644 crates/taurus-core/src/time.rs diff --git a/crates/taurus-core/src/lib.rs b/crates/taurus-core/src/lib.rs index d742c93..a31a888 100644 --- a/crates/taurus-core/src/lib.rs +++ b/crates/taurus-core/src/lib.rs @@ -5,5 +5,6 @@ mod handler; pub mod runtime; +pub mod time; pub mod types; pub mod value; diff --git a/crates/taurus-core/src/runtime/engine.rs b/crates/taurus-core/src/runtime/engine.rs index f2f1e53..220f07e 100644 --- a/crates/taurus-core/src/runtime/engine.rs +++ b/crates/taurus-core/src/runtime/engine.rs @@ -170,8 +170,8 @@ mod tests { use crate::types::exit_reason::ExitReason; use std::cell::RefCell; use tucana::shared::{ - InputType, ListValue, NodeParameter, NodeValue, ReferenceValue, Struct, Value, node_value, - reference_value, value::Kind, + InputType, ListValue, NodeParameter, NodeValue, ReferenceValue, Struct, SubFlow, Value, + node_value, reference_value, sub_flow::ExecutionReference, value::Kind, }; fn literal_param(database_id: i64, runtime_parameter_id: &str, value: Value) -> NodeParameter { @@ -190,9 +190,11 @@ mod tests { database_id, runtime_parameter_id: runtime_parameter_id.to_string(), value: Some(NodeValue { - value: Some(node_value::Value::SubFlow(unimplemented!( - "Taurus needs to handle SubFlows (issue nr #184)" - ))), + value: Some(node_value::Value::SubFlow(SubFlow { + signature: String::new(), + settings: Vec::new(), + execution_reference: Some(ExecutionReference::StartingNodeId(node_id)), + })), }), cast: None, } diff --git a/crates/taurus-core/src/runtime/execution/value_store.rs b/crates/taurus-core/src/runtime/execution/value_store.rs index 88af691..1da8f68 100644 --- a/crates/taurus-core/src/runtime/execution/value_store.rs +++ b/crates/taurus-core/src/runtime/execution/value_store.rs @@ -1,12 +1,12 @@ //! Mutable value store used by runtime execution to resolve references. use std::collections::HashMap; -use std::time::{SystemTime, UNIX_EPOCH}; use tucana::shared::node_execution_result::Result as TucanaNodeResult; use tucana::shared::{InputType, NodeExecutionResult, ReferenceValue, Value, value::Kind}; use crate::runtime::execution::trace::{StoreInputSlotEntry, StoreResultEntry, StoreSnapshot}; +use crate::time::now_unix_ms; use crate::types::errors::runtime_error::RuntimeError; #[derive(Clone)] @@ -106,7 +106,14 @@ impl ValueStore { Some(TucanaNodeResult::Error(error)) => { ValueStoreResult::Error(RuntimeError::from_tucana_error(error)) } - None => ValueStoreResult::NotFound, + None => ValueStoreResult::Error(RuntimeError::new( + "T-CORE-000006", + "NodeExecutionResultMissingOutcome", + format!( + "Node {} execution result is missing success/error outcome", + id + ), + )), }, None => ValueStoreResult::NotFound, } @@ -214,13 +221,6 @@ impl ValueStore { } } -fn now_unix_ms() -> i64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|it| it.as_millis() as i64) - .unwrap_or(0) -} - fn preview_value(value: &Value) -> String { match value.kind.as_ref() { Some(Kind::NumberValue(v)) => crate::value::number_to_string(v), diff --git a/crates/taurus-core/src/time.rs b/crates/taurus-core/src/time.rs new file mode 100644 index 0000000..e3a90e4 --- /dev/null +++ b/crates/taurus-core/src/time.rs @@ -0,0 +1,10 @@ +//! Shared time helpers for runtime metadata. + +use std::time::{SystemTime, UNIX_EPOCH}; + +pub fn now_unix_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|it| it.as_millis() as i64) + .unwrap_or(0) +} diff --git a/crates/taurus-core/src/types/errors/runtime_error.rs b/crates/taurus-core/src/types/errors/runtime_error.rs index 3f91ea3..afb4e85 100644 --- a/crates/taurus-core/src/types/errors/runtime_error.rs +++ b/crates/taurus-core/src/types/errors/runtime_error.rs @@ -9,13 +9,14 @@ use std::collections::HashMap; use std::error::Error; use std::fmt::{Display, Formatter}; -use std::time::{SystemTime, UNIX_EPOCH}; use tucana::shared::value::Kind::{NumberValue, StringValue, StructValue}; use tucana::shared::{ Error as TucanaError, NumberValue as ProtoNumberValue, Struct, Value, number_value, }; +use crate::time::now_unix_ms; + /// Runtime execution failure representation. #[derive(Debug, Clone, PartialEq)] pub struct RuntimeError { @@ -46,7 +47,7 @@ impl RuntimeError { code: code.into(), category: category.into(), message: message.into(), - timestamp_unix_ms: now_unix_ms(), + timestamp_unix_ms: now_unix_ms() as u64, version: env!("CARGO_PKG_VERSION").to_string(), dependencies: HashMap::new(), details: HashMap::new(), @@ -189,10 +190,3 @@ impl Display for RuntimeError { write!(f, "[{}] {}: {}", self.code, self.category, self.message) } } - -fn now_unix_ms() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|it| it.as_millis() as u64) - .unwrap_or(0) -} diff --git a/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs b/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs index cd472c8..5861964 100644 --- a/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs +++ b/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs @@ -34,12 +34,12 @@ impl RemoteRuntime for NATSRemoteRuntime { Ok(r) => r, Err(err) => { log::error!( - "RemoteRuntimeExeption: failed to handle NATS message: {}", + "RemoteRuntimeException: failed to handle NATS message: {}", err ); return Err(RuntimeError::new( "T-PROV-000001", - "RemoteRuntimeExeption", + "RemoteRuntimeException", "Failed to receive any response messages from a remote runtime.", )); } @@ -50,22 +50,22 @@ impl RemoteRuntime for NATSRemoteRuntime { Ok(r) => match r.node_result { Some(res) => Ok(res), None => { - log::error!("RemoteRuntimeExeption: recieved execution result without an body"); - return Err(RuntimeError::new( + log::error!("RemoteRuntimeException: received execution result without a body"); + Err(RuntimeError::new( "T-PROV-000003", - "RemoteRuntimeExeption", - "Recieved empty action execution response", - )); + "RemoteRuntimeException", + "Received empty action execution response", + )) } }, Err(err) => { log::error!( - "RemoteRuntimeExeption: failed to decode NATS message: {}", + "RemoteRuntimeException: failed to decode NATS message: {}", err ); return Err(RuntimeError::new( "T-PROV-000002", - "RemoteRuntimeExeption", + "RemoteRuntimeException", "Failed to read Remote Response", )); } diff --git a/crates/taurus/src/app/worker.rs b/crates/taurus/src/app/worker.rs index d53486f..a953a86 100644 --- a/crates/taurus/src/app/worker.rs +++ b/crates/taurus/src/app/worker.rs @@ -1,9 +1,9 @@ use std::time::Instant; -use std::time::{SystemTime, UNIX_EPOCH}; use futures_lite::StreamExt; use prost::Message; use taurus_core::runtime::engine::{EmitType, ExecutionEngine, ExecutionId, RespondEmitter}; +use taurus_core::time::now_unix_ms; use taurus_core::types::errors::runtime_error::RuntimeError; use taurus_core::types::signal::Signal; use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter; @@ -37,15 +37,15 @@ pub fn spawn_worker( }; let mut test_execution_subscription = match client - .queue_subscribe(String::from("test_executions.*"), "taurus".into()) + .queue_subscribe(String::from("test_execution.*"), "taurus".into()) .await { Ok(subscription) => { - log::info!("Subscribed to 'test_executions.*'"); + log::info!("Subscribed to 'test_execution.*'"); subscription } Err(err) => { - log::error!("Failed to subscribe to 'test_executions.*': {:?}", err); + log::error!("Failed to subscribe to 'test_execution.*': {:?}", err); return; } }; @@ -85,7 +85,7 @@ pub fn spawn_worker( } None => { test_execution_closed = true; - log::warn!("Subscription 'test_executions.*' ended"); + log::warn!("Subscription 'test_execution.*' ended"); } } } @@ -158,8 +158,16 @@ async fn process_test_execution_message( nats_remote: &NATSRemoteRuntime, runtime_usage_service: Option<&TaurusRuntimeUsageService>, ) { + if message.reply.is_none() { + log::warn!( + "Received test execution request without reply subject on '{}'; ignoring request", + message.subject + ); + return; + } + let requested_execution_id = - match parse_execution_id_from_subject(&message.subject, "test_executions") { + match parse_execution_id_from_subject(&message.subject, "test_execution") { Some(res) => res, None => { log::error!("Failed to extract execution uuid from {}", &message.subject); @@ -335,13 +343,6 @@ async fn respond_to_test_execution_request( } } -fn now_unix_ms() -> i64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|it| it.as_millis() as i64) - .unwrap_or(0) -} - #[cfg(test)] mod tests { use super::*; @@ -381,7 +382,6 @@ mod tests { runtime().block_on(async { let client = connect_test_nats().await; let worker = spawn_test_worker(client.clone()); - wait_for_worker_subscription().await; let execution_id = ExecutionId::new_v4(); let fixture = load_fixture("flows/01_return_object.json"); @@ -418,7 +418,6 @@ mod tests { runtime().block_on(async { let client = connect_test_nats().await; let worker = spawn_test_worker(client.clone()); - wait_for_worker_subscription().await; let execution_id = ExecutionId::new_v4(); let response = @@ -471,16 +470,12 @@ mod tests { spawn_worker(client, engine, nats_remote, runtime_emitter, None) } - async fn wait_for_worker_subscription() { - tokio::time::sleep(Duration::from_millis(250)).await; - } - async fn request_execution_result( client: &async_nats::Client, execution_id: ExecutionId, payload: Vec, ) -> Result { - let subject = format!("test_executions.{execution_id}"); + let subject = format!("test_execution.{execution_id}"); for attempt in 1..=10 { match tokio::time::timeout( diff --git a/docs/errors.md b/docs/errors.md index 6355d41..72aa2dc 100644 --- a/docs/errors.md +++ b/docs/errors.md @@ -10,6 +10,7 @@ This document is the canonical catalog for runtime error codes emitted by Taurus - `T-STD-XXXXX`: Errors originating inside standard function implementations under `runtime/functions/*`. - `T-CORE-XXXXXX`: Errors originating from core runtime infrastructure. +- `T-TAURUS-XXXXXX`: Errors originating from the Taurus runtime app layer. - `T-PROV-XXXXXX`: Errors originating from provider integrations (transport adapters, remote runtime connectors). ## Code Table @@ -23,10 +24,13 @@ This document is the canonical catalog for runtime error codes emitted by Taurus | `T-CORE-000003` | Engine | Flow requires remote execution but no remote runtime adapter was configured. | Node execution target is remote while `RemoteRuntime` is `None`. | `runtime/engine/executor.rs` | | `T-CORE-000004` | Engine | Reference lookup failed in the execution value store. | Missing prior node result, missing flow input path, or unresolved input reference. | `runtime/engine/executor.rs` | | `T-CORE-000005` | Engine | Remote request cannot be assembled because parameter metadata and resolved values diverge. | Parameter count mismatch during remote request materialization. | `runtime/engine/executor.rs` | +| `T-CORE-000006` | Engine | Node execution result exists without a success/error outcome. | Provider or value store returned a `NodeExecutionResult` with no `result` field. | `runtime/engine/executor.rs`, `runtime/execution/value_store.rs` | | `T-CORE-000101` | Compiler | Flow compilation failed because a node id appears more than once. | Duplicate `database_id` in input nodes. | `runtime/engine/compiler.rs` | | `T-CORE-000102` | Compiler | Flow compilation failed because the declared start node is absent. | `start_node_id` not found in node list. | `runtime/engine/compiler.rs` | | `T-CORE-000103` | Compiler | Flow compilation failed because a `next` edge points to a missing node. | `next_node_id` references unknown node id. | `runtime/engine/compiler.rs` | | `T-CORE-000104` | Compiler | Flow compilation failed because a parameter is structurally incomplete. | Parameter has no value payload in IR. | `runtime/engine/compiler.rs` | +| `T-CORE-000105` | Compiler | Flow compilation failed because a sub-flow parameter is missing its execution reference. | `sub_flow.execution_reference` is absent. | `runtime/engine/compiler.rs` | +| `T-CORE-000106` | Compiler | Flow compilation failed because a sub-flow parameter uses an unsupported execution reference. | `sub_flow.execution_reference` is not a starting node id. | `runtime/engine/compiler.rs` | | `T-CORE-000201` | Handler | Handler argument arity contract was violated before function execution began. | `args!`/`no_args!` macro expected different argument count. | `handler/macros.rs` | | `T-CORE-000202` | Handler | Handler argument type conversion failed during typed extraction. | `TryFromArgument` expected type does not match provided argument. | `handler/argument.rs` | | `T-CORE-000301` | App Error Mapping | Application configuration failure mapped into runtime error format. | Invalid/missing runtime config surfaced as `Error::Configuration`. | `types/errors/error.rs` | @@ -35,6 +39,7 @@ This document is the canonical catalog for runtime error codes emitted by Taurus | `T-CORE-000304` | App Error Mapping | Serialization/deserialization failure mapped into runtime error format. | Encoding/decoding/parsing failure surfaced as `Error::Serialization`. | `types/errors/error.rs` | | `T-CORE-000399` | App Error Mapping | Internal application failure mapped into runtime error format. | Catch-all non-domain internal failure surfaced as `Error::Internal`. | `types/errors/error.rs` | | `T-CORE-999999` | Runtime Error Fallback | Default fallback runtime error code when no explicit mapping is provided. | `RuntimeError::default()` used as defensive fallback. | `types/errors/runtime_error.rs` | +| `T-TAURUS-000001` | Taurus App | Test execution request payload could not be decoded as an execution flow. | Malformed or schema-incompatible payload published to the test execution NATS subject. | `taurus/src/app/worker.rs` | | `T-PROV-000001` | Provider Remote Runtime | Remote request to NATS did not yield a valid response message. | NATS request failed or timed out while waiting for remote runtime answer. | `taurus-provider/providers/remote/nats_remote_runtime.rs` | | `T-PROV-000002` | Provider Remote Runtime | Remote runtime response could not be decoded into expected protobuf structure. | Received payload is malformed, truncated, or schema-incompatible for `ExecutionResult`. | `taurus-provider/providers/remote/nats_remote_runtime.rs` | | `T-PROV-000003` | Provider Remote Runtime | Remote runtime response decoded, but contained no concrete result field. | `ExecutionResult` exists but `result` is `None` (protocol contract violation). | `taurus-provider/providers/remote/nats_remote_runtime.rs` |