diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 326c472..30aff8f 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -6,6 +6,11 @@ on: jobs: taurus: runs-on: ubuntu-latest + services: + nats: + image: nats:2 + ports: + - 4222:4222 steps: - uses: actions/checkout@v6 @@ -15,8 +20,11 @@ jobs: run: PATH=${{ runner.temp }}/proto/bin:$PATH cargo build env: RUST_BACKTRACE: 'full' - - name: Run Tests - run: cargo test - - name: Run Test Suite + - name: Run workspace tests + run: cargo test --workspace --all-targets + - name: Run NATS test execution request tests + run: cargo test -p taurus test_execution_request -- --ignored --nocapture + env: + NATS_URL: nats://127.0.0.1:4222 + - name: Run tests package flow suite run: cargo run --package tests - diff --git a/Cargo.lock b/Cargo.lock index 6e7362f..51ef96e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -346,7 +346,7 @@ dependencies = [ "tokio", "tonic", "tonic-health", - "tucana 0.0.70", + "tucana", "walkdir", ] @@ -1072,7 +1072,7 @@ dependencies = [ "taurus-provider", "tokio", "tonic", - "tucana 0.0.68", + "tucana", ] [[package]] @@ -1848,12 +1848,14 @@ dependencies = [ "log", "prost", "rand 0.10.1", + "serde", + "serde_json", "taurus-core", "taurus-provider", "tokio", "tonic", "tonic-health", - "tucana 0.0.68", + "tucana", ] [[package]] @@ -1866,7 +1868,7 @@ dependencies = [ "log", "rand 0.10.1", "serde_json", - "tucana 0.0.68", + "tucana", "ureq", "uuid", ] @@ -1887,7 +1889,7 @@ dependencies = [ "tokio", "tonic", "tonic-health", - "tucana 0.0.68", + "tucana", ] [[package]] @@ -1912,7 +1914,7 @@ dependencies = [ "serde", "serde_json", "taurus-core", - "tucana 0.0.68", + "tucana", ] [[package]] @@ -2238,26 +2240,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tucana" -version = "0.0.68" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abae78f798d1203bbcce361ba4cb4c500f8fe64e56d16ba3a5a0854e285377eb" -dependencies = [ - "pbjson", - "pbjson-build", - "pbjson-types", - "prost", - "prost-build", - "prost-types", - "serde", - "serde_json", - "tonic", - "tonic-build", - "tonic-prost", - "tonic-prost-build", -] - [[package]] name = "tucana" version = "0.0.70" diff --git a/Cargo.toml b/Cargo.toml index 98e48fe..7bd6c25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ edition = "2024" [workspace.dependencies] async-trait = "0.1.89" code0-flow = { version = "0.0.33" } -tucana = { version = "0.0.68" } +tucana = { version = "0.0.70" } tokio = { version = "1.44.1", features = ["rt-multi-thread", "signal"] } log = "0.4.27" futures-lite = "2.6.0" diff --git a/crates/taurus-core/src/handler/argument.rs b/crates/taurus-core/src/handler/argument.rs index c661a15..2105138 100644 --- a/crates/taurus-core/src/handler/argument.rs +++ b/crates/taurus-core/src/handler/argument.rs @@ -7,12 +7,56 @@ use tucana::shared::value::Kind; use tucana::shared::{ListValue, NumberValue, Struct, Value}; use crate::value::{number_to_f64, number_to_i64_lossy}; +use std::fmt; +use tucana::shared::SubFlowSetting; + +#[derive(Clone)] +pub struct FunctionThunk { + pub identifier: String, + pub parameter_index: i64, + pub settings: Vec, +} + +impl fmt::Debug for FunctionThunk { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FunctionThunk") + .field("identifier", &self.identifier) + .field("parameter_index", &self.parameter_index) + .field("settings_len", &self.settings.len()) + .finish() + } +} + +#[derive(Clone)] +pub enum Thunk { + Node(i64), + Function(FunctionThunk), +} + +impl fmt::Debug for Thunk { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Thunk::Node(node_id) => write!(f, "{}", node_id), + Thunk::Function(function) => function.fmt(f), + } + } +} + +impl Thunk { + pub fn trace_target(&self) -> String { + match self { + Thunk::Node(node_id) => format!("node={}", node_id), + Thunk::Function(function) => format!("function={}", function.identifier), + } + } +} + #[derive(Clone, Debug)] pub enum Argument { /// Eager value that can be consumed immediately by a handler. Eval(Value), - /// Deferred node execution handle, evaluated by calling `run(node_id)`. - Thunk(i64), + /// Deferred execution handle, evaluated by calling `run(thunk)`. + Thunk(Thunk), } #[derive(Clone, Copy, Debug)] diff --git a/crates/taurus-core/src/handler/registry.rs b/crates/taurus-core/src/handler/registry.rs index 388b566..993d9ae 100644 --- a/crates/taurus-core/src/handler/registry.rs +++ b/crates/taurus-core/src/handler/registry.rs @@ -1,6 +1,6 @@ //! Runtime handler registry and callable function signatures. -use crate::handler::argument::{Argument, ParameterNode}; +use crate::handler::argument::{Argument, ParameterNode, Thunk}; use crate::runtime::execution::value_store::ValueStore; use crate::runtime::functions::ALL_FUNCTION_SETS; use crate::types::signal::Signal; @@ -8,12 +8,14 @@ use std::collections::HashMap; /// Handler function type. /// - For eager params, the executor will already convert them to Argument::Eval(Value). -/// - For lazy params, the executor will pass Argument::Thunk(node_id). -/// - If a handler wants to execute a lazy arg, it calls run(node_id). -pub type HandlerFn = fn( +/// - For lazy params, the executor will pass Argument::Thunk(thunk). +/// - If a handler wants to execute a lazy arg, it calls run(thunk). +pub type ThunkRunner<'runner> = dyn FnMut(&Thunk, &mut ValueStore) -> Signal + 'runner; + +pub type HandlerFn = for<'runner> fn( args: &[Argument], ctx: &mut ValueStore, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + run: &mut ThunkRunner<'runner>, ) -> Signal; #[derive(Clone, Copy)] diff --git a/crates/taurus-core/src/lib.rs b/crates/taurus-core/src/lib.rs index d742c93..a31a888 100644 --- a/crates/taurus-core/src/lib.rs +++ b/crates/taurus-core/src/lib.rs @@ -5,5 +5,6 @@ mod handler; pub mod runtime; +pub mod time; pub mod types; pub mod value; diff --git a/crates/taurus-core/src/runtime/engine.rs b/crates/taurus-core/src/runtime/engine.rs index c7e4599..13cbc88 100644 --- a/crates/taurus-core/src/runtime/engine.rs +++ b/crates/taurus-core/src/runtime/engine.rs @@ -170,8 +170,9 @@ mod tests { use crate::types::exit_reason::ExitReason; use std::cell::RefCell; use tucana::shared::{ - InputType, ListValue, NodeParameter, NodeValue, ReferenceValue, Struct, Value, node_value, - reference_value, value::Kind, + InputType, ListValue, NodeParameter, NodeValue, ReferenceValue, Struct, SubFlow, + SubFlowSetting, Value, node_value, reference_value, sub_flow::ExecutionReference, + value::Kind, }; fn literal_param(database_id: i64, runtime_parameter_id: &str, value: Value) -> NodeParameter { @@ -181,6 +182,7 @@ mod tests { value: Some(NodeValue { value: Some(node_value::Value::LiteralValue(value)), }), + cast: None, } } @@ -189,8 +191,49 @@ mod tests { database_id, runtime_parameter_id: runtime_parameter_id.to_string(), value: Some(NodeValue { - value: Some(node_value::Value::NodeFunctionId(node_id)), + value: Some(node_value::Value::SubFlow(SubFlow { + signature: String::new(), + settings: Vec::new(), + execution_reference: Some(ExecutionReference::StartingNodeId(node_id)), + })), + }), + cast: None, + } + } + + fn function_thunk_param( + database_id: i64, + runtime_parameter_id: &str, + function_identifier: &str, + settings: Vec, + ) -> NodeParameter { + NodeParameter { + database_id, + runtime_parameter_id: runtime_parameter_id.to_string(), + value: Some(NodeValue { + value: Some(node_value::Value::SubFlow(SubFlow { + signature: String::new(), + settings, + execution_reference: Some(ExecutionReference::FunctionIdentifier( + function_identifier.to_string(), + )), + })), }), + cast: None, + } + } + + fn subflow_setting( + identifier: &str, + default_value: Option, + optional: bool, + hidden: bool, + ) -> SubFlowSetting { + SubFlowSetting { + identifier: identifier.to_string(), + default_value, + optional: Some(optional), + hidden: Some(hidden), } } @@ -208,6 +251,7 @@ mod tests { paths: Vec::new(), })), }), + cast: None, } } @@ -256,6 +300,13 @@ mod tests { } } + fn expect_success(signal: Signal) -> Value { + match signal { + Signal::Success(value) => value, + other => panic!("expected success, got {:?}", other), + } + } + fn input_type_ref_param( database_id: i64, runtime_parameter_id: &str, @@ -276,6 +327,7 @@ mod tests { paths: Vec::new(), })), }), + cast: None, } } @@ -426,6 +478,247 @@ mod tests { } } + #[test] + fn function_subflow_map_executes_function_identifier_with_iteration_input() { + let engine = ExecutionEngine::new(); + + let map_node = node( + 1, + "std::list::map", + vec![ + literal_param(100, "list", list_value(vec![int_value(1), int_value(2)])), + function_thunk_param( + 101, + "transform", + "std::number::add", + vec![ + subflow_setting("lhs", None, false, false), + subflow_setting("rhs", Some(int_value(2)), false, true), + ], + ), + ], + None, + ); + + let (signal, reason) = engine.execute_graph(1, vec![map_node], None, None, None, false); + + assert_eq!(reason, ExitReason::Success); + assert_eq!( + expect_success(signal), + list_value(vec![int_value(3), int_value(4)]) + ); + } + + #[test] + fn function_subflow_filter_executes_predicate_identifier() { + let engine = ExecutionEngine::new(); + + let filter_node = node( + 1, + "std::list::filter", + vec![ + literal_param( + 100, + "list", + list_value(vec![int_value(1), int_value(4), int_value(7)]), + ), + function_thunk_param( + 101, + "predicate", + "std::number::is_greater", + vec![ + subflow_setting("lhs", None, false, false), + subflow_setting("rhs", Some(int_value(3)), false, true), + ], + ), + ], + None, + ); + + let (signal, reason) = engine.execute_graph(1, vec![filter_node], None, None, None, false); + + assert_eq!(reason, ExitReason::Success); + assert_eq!( + expect_success(signal), + list_value(vec![int_value(4), int_value(7)]) + ); + } + + #[test] + fn function_subflow_default_replaces_null_callback_input() { + let engine = ExecutionEngine::new(); + + let map_node = node( + 1, + "std::list::map", + vec![ + literal_param(100, "list", list_value(vec![null_value(), int_value(5)])), + function_thunk_param( + 101, + "transform", + "std::control::value", + vec![subflow_setting("value", Some(int_value(9)), false, false)], + ), + ], + None, + ); + + let (signal, reason) = engine.execute_graph(1, vec![map_node], None, None, None, false); + + assert_eq!(reason, ExitReason::Success); + assert_eq!( + expect_success(signal), + list_value(vec![int_value(9), int_value(5)]) + ); + } + + #[test] + fn function_subflow_hidden_setting_always_uses_default() { + let engine = ExecutionEngine::new(); + + let map_node = node( + 1, + "std::list::map", + vec![ + literal_param(100, "list", list_value(vec![int_value(1), int_value(2)])), + function_thunk_param( + 101, + "transform", + "std::control::value", + vec![subflow_setting("value", Some(int_value(9)), false, true)], + ), + ], + None, + ); + + let (signal, reason) = engine.execute_graph(1, vec![map_node], None, None, None, false); + + assert_eq!(reason, ExitReason::Success); + assert_eq!( + expect_success(signal), + list_value(vec![int_value(9), int_value(9)]) + ); + } + + #[test] + fn function_subflow_optional_missing_setting_uses_null() { + let engine = ExecutionEngine::new(); + + let if_node = node( + 1, + "std::control::if", + vec![ + literal_param( + 100, + "condition", + Value { + kind: Some(Kind::BoolValue(true)), + }, + ), + function_thunk_param( + 101, + "runnable", + "std::control::value", + vec![subflow_setting("value", None, true, false)], + ), + ], + None, + ); + + let (signal, reason) = engine.execute_graph(1, vec![if_node], None, None, None, false); + + assert_eq!(reason, ExitReason::Success); + assert_eq!(expect_success(signal), null_value()); + } + + #[test] + fn function_subflow_required_missing_setting_fails() { + let engine = ExecutionEngine::new(); + + let if_node = node( + 1, + "std::control::if", + vec![ + literal_param( + 100, + "condition", + Value { + kind: Some(Kind::BoolValue(true)), + }, + ), + function_thunk_param( + 101, + "runnable", + "std::control::value", + vec![subflow_setting("value", None, false, false)], + ), + ], + None, + ); + + let (signal, reason) = engine.execute_graph(1, vec![if_node], None, None, None, false); + + assert_eq!(reason, ExitReason::Failure); + match signal { + Signal::Failure(err) => assert_eq!(err.code, "T-CORE-000107"), + other => panic!("expected missing setting failure, got {:?}", other), + } + } + + #[test] + fn function_subflow_unknown_function_identifier_fails_when_executed() { + let engine = ExecutionEngine::new(); + + let if_node = node( + 1, + "std::control::if", + vec![ + literal_param( + 100, + "condition", + Value { + kind: Some(Kind::BoolValue(true)), + }, + ), + function_thunk_param(101, "runnable", "std::missing::function", Vec::new()), + ], + None, + ); + + let (signal, reason) = engine.execute_graph(1, vec![if_node], None, None, None, false); + + assert_eq!(reason, ExitReason::Failure); + match signal { + Signal::Failure(err) => assert_eq!(err.code, "T-CORE-000002"), + other => panic!("expected function-not-found failure, got {:?}", other), + } + } + + #[test] + fn function_subflow_can_be_forced_as_eager_argument() { + let engine = ExecutionEngine::new(); + + let add_node = node( + 1, + "std::number::add", + vec![ + function_thunk_param( + 100, + "lhs", + "std::control::value", + vec![subflow_setting("value", Some(int_value(40)), false, true)], + ), + literal_param(101, "rhs", int_value(2)), + ], + None, + ); + + let (signal, reason) = engine.execute_graph(1, vec![add_node], None, None, None, false); + + assert_eq!(reason, ExitReason::Success); + assert_eq!(expect_success(signal), int_value(42)); + } + #[test] fn emitter_emits_start_and_finish_for_successful_execution() { let engine = ExecutionEngine::new(); diff --git a/crates/taurus-core/src/runtime/engine/compiler.rs b/crates/taurus-core/src/runtime/engine/compiler.rs index 3f4ae57..ef21b03 100644 --- a/crates/taurus-core/src/runtime/engine/compiler.rs +++ b/crates/taurus-core/src/runtime/engine/compiler.rs @@ -2,11 +2,12 @@ use std::collections::HashMap; -use tucana::shared::{NodeFunction, node_value}; +use tucana::shared::{NodeFunction, node_value, sub_flow}; use crate::{ runtime::engine::model::{ - CompiledArg, CompiledFlow, CompiledNode, CompiledParameter, NodeExecutionTarget, + CompiledArg, CompiledFlow, CompiledNode, CompiledParameter, CompiledThunk, + NodeExecutionTarget, }, types::errors::runtime_error::RuntimeError, }; @@ -27,6 +28,10 @@ pub enum CompileError { node_id: i64, parameter_index: usize, }, + SubFlowExecutionReferenceMissing { + node_id: i64, + parameter_index: usize, + }, } impl CompileError { @@ -64,6 +69,17 @@ impl CompileError { node_id, parameter_index ), ), + CompileError::SubFlowExecutionReferenceMissing { + node_id, + parameter_index, + } => RuntimeError::new( + "T-CORE-000105", + "FlowCompileError", + format!( + "Node {} parameter {} sub_flow is missing execution reference", + node_id, parameter_index + ), + ), } } } @@ -125,7 +141,26 @@ pub fn compile_flow( let arg = match value { node_value::Value::LiteralValue(v) => CompiledArg::Literal(v.clone()), node_value::Value::ReferenceValue(r) => CompiledArg::Reference(r.clone()), - node_value::Value::NodeFunctionId(id) => CompiledArg::DeferredNode(*id), + node_value::Value::SubFlow(sub_flow) => { + match sub_flow.execution_reference.as_ref() { + Some(sub_flow::ExecutionReference::StartingNodeId(node_id)) => { + CompiledArg::Deferred(CompiledThunk::Node(*node_id)) + } + Some(sub_flow::ExecutionReference::FunctionIdentifier(identifier)) => { + CompiledArg::Deferred(CompiledThunk::Function { + identifier: identifier.clone(), + parameter_index: parameter_index as i64, + settings: sub_flow.settings.clone(), + }) + } + None => { + return Err(CompileError::SubFlowExecutionReferenceMissing { + node_id: node.database_id, + parameter_index, + }); + } + } + } }; parameters.push(CompiledParameter { diff --git a/crates/taurus-core/src/runtime/engine/executor.rs b/crates/taurus-core/src/runtime/engine/executor.rs index c8aeb27..fc28e44 100644 --- a/crates/taurus-core/src/runtime/engine/executor.rs +++ b/crates/taurus-core/src/runtime/engine/executor.rs @@ -4,16 +4,22 @@ use std::cell::RefCell; use std::collections::HashMap; use futures_lite::future::block_on; -use tucana::aquila::ExecutionRequest; +use tucana::aquila::ActionExecutionRequest; +use tucana::shared::node_execution_result::Result as TucanaNodeResult; use tucana::shared::reference_value::Target; use tucana::shared::value::Kind; -use tucana::shared::{Struct, Value}; +use tucana::shared::{ + InputType, NodeExecutionResult as TucanaNodeExecutionResult, ReferenceValue, Struct, + SubFlowSetting, Value, +}; use uuid::Uuid; -use crate::handler::argument::{Argument, ParameterNode}; +use crate::handler::argument::{Argument, FunctionThunk, ParameterNode, Thunk}; use crate::handler::registry::{FunctionStore, HandlerFunctionEntry}; use crate::runtime::engine::emitter::{EmitType, ExecutionId, RespondEmitter}; -use crate::runtime::engine::model::{CompiledArg, CompiledFlow, CompiledNode, NodeExecutionTarget}; +use crate::runtime::engine::model::{ + CompiledArg, CompiledFlow, CompiledNode, CompiledThunk, NodeExecutionTarget, +}; use crate::runtime::execution::trace::{ ArgKind, ArgTrace, EdgeKind, Outcome, ReferenceKind, TraceRun, }; @@ -56,7 +62,7 @@ struct ExecutionResult { } /// Result of executing exactly one compiled node. -struct NodeExecutionResult { +struct NodeResult { signal: Signal, frame_id: Option, } @@ -150,25 +156,93 @@ impl<'a> EngineExecutor<'a> { } } - fn execute_single_node( + fn execute_thunk(&self, thunk: &Thunk, value_store: &mut ValueStore) -> ExecutionResult { + match thunk { + Thunk::Node(node_id) => self.execute_from_node_id(*node_id, value_store), + Thunk::Function(function) => self.execute_function_thunk(function, value_store), + } + } + + fn execute_function_thunk( &self, - node_idx: usize, + function: &FunctionThunk, value_store: &mut ValueStore, - ) -> NodeExecutionResult { + ) -> ExecutionResult { + let entry = match self.handlers.get(function.identifier.as_str()).copied() { + Some(entry) => entry, + None => { + return ExecutionResult { + signal: Signal::Failure(RuntimeError::new( + "T-CORE-000002", + "FunctionNotFound", + format!("Function {} not found", function.identifier), + )), + root_frame: None, + }; + } + }; + + let frame_id = self.trace_enter_function( + value_store.get_current_node_id(), + function.identifier.as_str(), + value_store, + ); + + let mut args = match self.build_function_thunk_args(function, value_store, frame_id) { + Ok(args) => args, + Err(err) => { + let signal = Signal::Failure(err); + self.trace_exit(frame_id, &signal, value_store); + return ExecutionResult { + signal, + root_frame: frame_id, + }; + } + }; + + let signal = + if let Some(signal) = self.force_eager_args(&entry, &mut args, value_store, frame_id) { + signal + } else { + let mut run = |thunk: &Thunk, store: &mut ValueStore| { + self.trace_mark_thunk_executed(frame_id, thunk); + let label = store.pop_runtime_trace_label(); + let child_result = self.execute_thunk(thunk, store); + if let (Some(parent), Some(child)) = (frame_id, child_result.root_frame) { + self.trace_link_child(parent, child, EdgeKind::RuntimeCall { label }); + } + child_result.signal + }; + + (entry.handler)(&args, value_store, &mut run) + }; + + self.trace_exit(frame_id, &signal, value_store); + + ExecutionResult { + signal, + root_frame: frame_id, + } + } + + fn execute_single_node(&self, node_idx: usize, value_store: &mut ValueStore) -> NodeResult { let node = &self.flow.nodes[node_idx]; // InputType references resolve against the currently running node. value_store.set_current_node_id(node.id); let frame_id = self.trace_enter(node, value_store); let signal = match &node.execution_target { - NodeExecutionTarget::Local => self.execute_local_node(node, value_store, frame_id), + NodeExecutionTarget::Local => { + let signal = self.execute_local_node(node, value_store, frame_id); + self.commit_result(node.id, signal, value_store) + } NodeExecutionTarget::Remote { service } => { self.execute_remote_node(node, service, value_store, frame_id) } }; self.trace_exit(frame_id, &signal, value_store); - NodeExecutionResult { signal, frame_id } + NodeResult { signal, frame_id } } fn execute_local_node( @@ -190,29 +264,25 @@ impl<'a> EngineExecutor<'a> { let mut args = match self.build_args(node, value_store, frame_id) { Ok(args) => args, - Err(err) => { - value_store.insert_error(node.id, err.clone()); - return Signal::Failure(err); - } + Err(err) => return Signal::Failure(err), }; if let Some(signal) = self.force_eager_args(entry, &mut args, value_store, frame_id) { - return self.commit_result(node.id, signal, value_store); + return signal; } // Handler-owned runtime calls (for lazy args / callbacks) re-enter the same executor. - let mut run = |node_id: i64, store: &mut ValueStore| { - self.trace_mark_thunk_executed_by_node(frame_id, node_id); + let mut run = |thunk: &Thunk, store: &mut ValueStore| { + self.trace_mark_thunk_executed(frame_id, thunk); let label = store.pop_runtime_trace_label(); - let child_result = self.execute_from_node_id(node_id, store); + let child_result = self.execute_thunk(thunk, store); if let (Some(parent), Some(child)) = (frame_id, child_result.root_frame) { self.trace_link_child(parent, child, EdgeKind::RuntimeCall { label }); } child_result.signal }; - let signal = (entry.handler)(&args, value_store, &mut run); - self.commit_result(node.id, signal, value_store) + (entry.handler)(&args, value_store, &mut run) } fn execute_remote_node( @@ -225,20 +295,21 @@ impl<'a> EngineExecutor<'a> { let remote_runtime = match self.remote { Some(remote) => remote, None => { - return Signal::Failure(RuntimeError::new( - "T-CORE-000003", - "RemoteRuntimeNotConfigured", - "Remote runtime not configured", - )); + return self.commit_result( + node.id, + Signal::Failure(RuntimeError::new( + "T-CORE-000003", + "RemoteRuntimeNotConfigured", + "Remote runtime not configured", + )), + value_store, + ); } }; let mut args = match self.build_args(node, value_store, frame_id) { Ok(args) => args, - Err(err) => { - value_store.insert_error(node.id, err.clone()); - return Signal::Failure(err); - } + Err(err) => return self.commit_result(node.id, Signal::Failure(err), value_store), }; let values = match self.resolve_remote_args(&mut args, value_store, frame_id) { @@ -248,21 +319,16 @@ impl<'a> EngineExecutor<'a> { let request = match self.build_remote_request(node, values) { Ok(request) => request, - Err(err) => { - value_store.insert_error(node.id, err.clone()); - return Signal::Failure(err); - } + Err(err) => return self.commit_result(node.id, Signal::Failure(err), value_store), }; - let signal = match block_on(remote_runtime.execute_remote(RemoteExecution { + match block_on(remote_runtime.execute_remote(RemoteExecution { target_service: service.to_string(), request, })) { - Ok(value) => Signal::Success(value), - Err(err) => Signal::Failure(err), - }; - - self.commit_result(node.id, signal, value_store) + Ok(result) => self.commit_remote_result(node.id, result, value_store), + Err(err) => self.commit_result(node.id, Signal::Failure(err), value_store), + } } fn build_args( @@ -385,20 +451,22 @@ impl<'a> EngineExecutor<'a> { )); } }, - CompiledArg::DeferredNode(node_id) => { + CompiledArg::Deferred(thunk) => { + let thunk = compiled_thunk_to_argument(thunk); + let target = thunk.trace_target(); self.trace_record_arg( frame_id, ArgTrace { index, kind: ArgKind::Thunk { - node_id: *node_id, + target: target.clone(), eager: false, executed: false, }, - preview: format!("thunk({})", node_id), + preview: format!("thunk({})", target), }, ); - args.push(Argument::Thunk(*node_id)); + args.push(Argument::Thunk(thunk)); } } } @@ -406,6 +474,40 @@ impl<'a> EngineExecutor<'a> { Ok(args) } + fn build_function_thunk_args( + &self, + function: &FunctionThunk, + value_store: &mut ValueStore, + frame_id: Option, + ) -> Result, RuntimeError> { + let mut args = Vec::with_capacity(function.settings.len()); + let current_node_id = value_store.get_current_node_id(); + + for (index, setting) in function.settings.iter().enumerate() { + let input_type = InputType { + node_id: current_node_id, + parameter_index: function.parameter_index, + input_index: index as i64, + }; + let value = resolve_function_setting(function, setting, input_type, value_store)?; + self.trace_record_arg( + frame_id, + ArgTrace { + index, + kind: ArgKind::Literal, + preview: format!( + "setting({}) -> {}", + setting.identifier, + preview_value(&value) + ), + }, + ); + args.push(Argument::Eval(value)); + } + + Ok(args) + } + fn force_eager_args( &self, entry: &HandlerFunctionEntry, @@ -417,10 +519,10 @@ impl<'a> EngineExecutor<'a> { let mode = entry.param_mode(index); if matches!(mode, ParameterNode::Eager) - && let Argument::Thunk(node_id) = *argument + && let Argument::Thunk(thunk) = argument { self.trace_mark_thunk(frame_id, index, true, true); - let child = self.execute_from_node_id(node_id, value_store); + let child = self.execute_thunk(thunk, value_store); if let (Some(parent), Some(child_root)) = (frame_id, child.root_frame) { self.trace_link_child( parent, @@ -454,10 +556,10 @@ impl<'a> EngineExecutor<'a> { for (index, argument) in args.iter_mut().enumerate() { match argument { Argument::Eval(value) => values.push(value.clone()), - Argument::Thunk(node_id) => { + Argument::Thunk(thunk) => { // Remote execution always receives materialized values, never thunks. self.trace_mark_thunk(frame_id, index, true, true); - let child = self.execute_from_node_id(*node_id, value_store); + let child = self.execute_thunk(thunk, value_store); if let (Some(parent), Some(child_root)) = (frame_id, child.root_frame) { self.trace_link_child( parent, @@ -485,7 +587,7 @@ impl<'a> EngineExecutor<'a> { &self, node: &CompiledNode, values: Vec, - ) -> Result { + ) -> Result { if node.parameters.len() != values.len() { return Err(RuntimeError::new( "T-CORE-000005", @@ -499,7 +601,7 @@ impl<'a> EngineExecutor<'a> { fields.insert(parameter.runtime_parameter_id.clone(), value); } - Ok(ExecutionRequest { + Ok(ActionExecutionRequest { execution_identifier: Uuid::new_v4().to_string(), function_identifier: node.handler_id.clone(), parameters: Some(Struct { fields }), @@ -522,13 +624,40 @@ impl<'a> EngineExecutor<'a> { } } + fn commit_remote_result( + &self, + node_id: i64, + result: TucanaNodeExecutionResult, + value_store: &mut ValueStore, + ) -> Signal { + value_store.insert_node_result(node_id, result.clone()); + match result.result { + Some(TucanaNodeResult::Success(value)) => Signal::Success(value), + Some(TucanaNodeResult::Error(error)) => { + Signal::Failure(RuntimeError::from_tucana_error(&error)) + } + None => Signal::Failure(RuntimeError::new( + "T-CORE-000006", + "NodeExecutionResultMissingOutcome", + "Remote node execution result is missing success/error outcome", + )), + } + } + fn trace_enter(&self, node: &CompiledNode, value_store: &ValueStore) -> Option { + self.trace_enter_function(node.id, node.handler_id.as_str(), value_store) + } + + fn trace_enter_function( + &self, + node_id: i64, + function_name: &str, + value_store: &ValueStore, + ) -> Option { self.tracer.map(|tracer| { - tracer.borrow_mut().enter_node( - node.id, - node.handler_id.as_str(), - value_store.trace_snapshot(), - ) + tracer + .borrow_mut() + .enter_node(node_id, function_name, value_store.trace_snapshot()) }) } @@ -586,15 +715,89 @@ impl<'a> EngineExecutor<'a> { } } - fn trace_mark_thunk_executed_by_node(&self, frame_id: Option, node_id: i64) { + fn trace_mark_thunk_executed(&self, frame_id: Option, thunk: &Thunk) { if let (Some(frame_id), Some(tracer)) = (frame_id, self.tracer) { tracer .borrow_mut() - .mark_thunk_executed_by_node(frame_id, node_id); + .mark_thunk_executed(frame_id, thunk.trace_target().as_str()); } } } +fn compiled_thunk_to_argument(thunk: &CompiledThunk) -> Thunk { + match thunk { + CompiledThunk::Node(node_id) => Thunk::Node(*node_id), + CompiledThunk::Function { + identifier, + parameter_index, + settings, + } => Thunk::Function(FunctionThunk { + identifier: identifier.clone(), + parameter_index: *parameter_index, + settings: settings.clone(), + }), + } +} + +fn resolve_function_setting( + function: &FunctionThunk, + setting: &SubFlowSetting, + input_type: InputType, + value_store: &mut ValueStore, +) -> Result { + if setting.hidden.unwrap_or(false) { + return Ok(setting_default_or_null(setting)); + } + + let reference = ReferenceValue { + target: Some(Target::InputType(input_type)), + paths: Vec::new(), + }; + + match value_store.get(reference) { + ValueStoreResult::Success(value) => { + if is_null_value(&value) + && let Some(default_value) = setting.default_value.clone() + { + Ok(default_value) + } else { + Ok(value) + } + } + ValueStoreResult::Error(err) => Err(err), + ValueStoreResult::NotFound => { + if let Some(default_value) = setting.default_value.clone() { + Ok(default_value) + } else if setting.optional.unwrap_or(false) { + Ok(null_value()) + } else { + Err(RuntimeError::new( + "T-CORE-000107", + "SubFlowSettingValueMissing", + format!( + "Required sub_flow setting {} for function {} is missing", + setting.identifier, function.identifier + ), + )) + } + } + } +} + +fn setting_default_or_null(setting: &SubFlowSetting) -> Value { + setting.default_value.clone().unwrap_or_else(null_value) +} + +fn is_null_value(value: &Value) -> bool { + matches!(value.kind.as_ref(), None | Some(Kind::NullValue(_))) +} + +fn null_value() -> Value { + Value { + kind: Some(Kind::NullValue(0)), + } +} + fn preview_value(value: &Value) -> String { // Trace previews are deterministic and human-readable for debugging snapshots. format_value_json(value) diff --git a/crates/taurus-core/src/runtime/engine/model.rs b/crates/taurus-core/src/runtime/engine/model.rs index 50ff1e4..97eeefa 100644 --- a/crates/taurus-core/src/runtime/engine/model.rs +++ b/crates/taurus-core/src/runtime/engine/model.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; -use tucana::shared::{ReferenceValue, Value}; +use tucana::shared::{ReferenceValue, SubFlowSetting, Value}; #[derive(Debug, Clone)] pub enum NodeExecutionTarget { @@ -18,7 +18,17 @@ pub enum NodeExecutionTarget { pub enum CompiledArg { Literal(Value), Reference(ReferenceValue), - DeferredNode(i64), + Deferred(CompiledThunk), +} + +#[derive(Debug, Clone)] +pub enum CompiledThunk { + Node(i64), + Function { + identifier: String, + parameter_index: i64, + settings: Vec, + }, } /// Compiled parameter binding. diff --git a/crates/taurus-core/src/runtime/execution/render.rs b/crates/taurus-core/src/runtime/execution/render.rs index d34a130..ef7d89a 100644 --- a/crates/taurus-core/src/runtime/execution/render.rs +++ b/crates/taurus-core/src/runtime/execution/render.rs @@ -2,8 +2,10 @@ use std::collections::HashMap; +use tucana::shared::node_execution_result::Result as TucanaNodeResult; + use crate::runtime::execution::trace::{ - ArgKind, EdgeKind, Outcome, StoreDiff, StoreResultStatus, TraceFrame, TraceRun, + ArgKind, EdgeKind, Outcome, StoreDiff, TraceFrame, TraceRun, }; struct TraceTheme; @@ -158,13 +160,13 @@ fn render_frame( format!("reference {:?} ({})", reference, hit_state) } ArgKind::Thunk { - node_id, + target, eager, executed, } => { let mode = if *eager { "eager" } else { "lazy" }; let executed_state = if *executed { "executed" } else { "deferred" }; - format!("thunk node={} {} {}", node_id, mode, executed_state) + format!("thunk {} {} {}", target, mode, executed_state) } }; out.push_str(&format!( @@ -266,14 +268,17 @@ fn render_store_diff( } for set in &diff.result_sets { - let status = match set.status { - StoreResultStatus::Success => "success", - StoreResultStatus::Error => "error", + let status = match &set.result.result { + Some(TucanaNodeResult::Success(_)) => "success", + Some(TucanaNodeResult::Error(_)) => "error", + None => "empty", }; out.push_str(&format!( - "{step:04} {prefix}{continuation}{store:<5} result.set node={} [{}] {}\n", + "{step:04} {prefix}{continuation}{store:<5} result.set node={} [{}] started_at={} finished_at={} {}\n", set.node_id, status, + set.result.started_at, + set.result.finished_at, set.preview, step = *step, prefix = prefix, diff --git a/crates/taurus-core/src/runtime/execution/store.rs b/crates/taurus-core/src/runtime/execution/store.rs index 371647d..3cde097 100644 --- a/crates/taurus-core/src/runtime/execution/store.rs +++ b/crates/taurus-core/src/runtime/execution/store.rs @@ -2,18 +2,10 @@ use std::collections::HashMap; -use tucana::shared::Value; +use tucana::shared::{NodeExecutionResult, Value}; -use crate::types::errors::runtime_error::RuntimeError; use crate::types::execution::ids::{FrameId, NodeId}; -/// Runtime outcome persisted per node. -#[derive(Debug, Clone)] -pub enum NodeOutcome { - Success(Value), - Failure(RuntimeError), -} - /// Input slot key for runtime-provided temporary inputs (for iterators/predicates). #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct InputSlotKey { @@ -25,7 +17,7 @@ pub struct InputSlotKey { /// Store that captures mutable runtime execution state. #[derive(Debug, Clone, Default)] pub struct ExecutionStore { - pub node_outcomes: HashMap, + pub node_results: HashMap, pub input_slots: HashMap, pub flow_input: Option, pub current_node: Option, diff --git a/crates/taurus-core/src/runtime/execution/trace.rs b/crates/taurus-core/src/runtime/execution/trace.rs index 9733566..070726b 100644 --- a/crates/taurus-core/src/runtime/execution/trace.rs +++ b/crates/taurus-core/src/runtime/execution/trace.rs @@ -9,6 +9,8 @@ use std::collections::HashMap; use std::time::Instant; +use tucana::shared::NodeExecutionResult; + /// Relationship between two execution frames. #[derive(Debug, Clone)] pub enum EdgeKind { @@ -29,7 +31,7 @@ pub enum ArgKind { hit: bool, }, Thunk { - node_id: i64, + target: String, eager: bool, executed: bool, }, @@ -69,20 +71,13 @@ pub enum Outcome { } /// One stored node result entry at snapshot time. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq)] pub struct StoreResultEntry { pub node_id: i64, - pub status: StoreResultStatus, + pub result: NodeExecutionResult, pub preview: String, } -/// Result status in the value store. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum StoreResultStatus { - Success, - Error, -} - /// One temporary input slot entry at snapshot time. #[derive(Debug, Clone, PartialEq, Eq)] pub struct StoreInputSlotEntry { @@ -93,7 +88,7 @@ pub struct StoreInputSlotEntry { } /// Value store snapshot attached to a frame boundary. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq)] pub struct StoreSnapshot { pub current_node_id: i64, pub flow_input_preview: String, @@ -102,7 +97,7 @@ pub struct StoreSnapshot { } /// Per-frame store changes between `store_before` and `store_after`. -#[derive(Debug, Clone, Default, PartialEq, Eq)] +#[derive(Debug, Clone, Default, PartialEq)] pub struct StoreDiff { pub current_node_changed: Option<(i64, i64)>, pub result_sets: Vec, @@ -129,7 +124,7 @@ impl StoreDiff { for (node_id, after_entry) in &after_results { match before_results.get(node_id) { Some(before_entry) - if before_entry.status == after_entry.status + if before_entry.result == after_entry.result && before_entry.preview == after_entry.preview => {} _ => result_sets.push((*after_entry).clone()), } diff --git a/crates/taurus-core/src/runtime/execution/tracer.rs b/crates/taurus-core/src/runtime/execution/tracer.rs index 894060d..8d712b9 100644 --- a/crates/taurus-core/src/runtime/execution/tracer.rs +++ b/crates/taurus-core/src/runtime/execution/tracer.rs @@ -14,7 +14,7 @@ pub trait ExecutionTracer { fn record_arg(&mut self, frame_id: u64, arg: ArgTrace); fn link_child(&mut self, parent_frame: u64, child_frame: u64, edge: EdgeKind); fn mark_thunk(&mut self, frame_id: u64, arg_index: usize, eager: bool, executed: bool); - fn mark_thunk_executed_by_node(&mut self, frame_id: u64, node_id: i64); + fn mark_thunk_executed(&mut self, frame_id: u64, target: &str); fn exit_node(&mut self, frame_id: u64, outcome: Outcome, store_after: StoreSnapshot); } @@ -141,16 +141,16 @@ impl ExecutionTracer for Tracer { } } - fn mark_thunk_executed_by_node(&mut self, frame_id: u64, node_id: i64) { + fn mark_thunk_executed(&mut self, frame_id: u64, target: &str) { let frame = self.get_frame_mut(frame_id); if let Some(arg) = frame.args.iter_mut().find(|a| { matches!( - a.kind, + &a.kind, ArgKind::Thunk { - node_id: current_node, + target: current_target, executed: false, .. - } if current_node == node_id + } if current_target == target ) }) && let ArgTrace { kind: diff --git a/crates/taurus-core/src/runtime/execution/value_store.rs b/crates/taurus-core/src/runtime/execution/value_store.rs index 20d852d..1da8f68 100644 --- a/crates/taurus-core/src/runtime/execution/value_store.rs +++ b/crates/taurus-core/src/runtime/execution/value_store.rs @@ -2,11 +2,11 @@ use std::collections::HashMap; -use tucana::shared::{InputType, ReferenceValue, Value, value::Kind}; +use tucana::shared::node_execution_result::Result as TucanaNodeResult; +use tucana::shared::{InputType, NodeExecutionResult, ReferenceValue, Value, value::Kind}; -use crate::runtime::execution::trace::{ - StoreInputSlotEntry, StoreResultEntry, StoreResultStatus, StoreSnapshot, -}; +use crate::runtime::execution::trace::{StoreInputSlotEntry, StoreResultEntry, StoreSnapshot}; +use crate::time::now_unix_ms; use crate::types::errors::runtime_error::RuntimeError; #[derive(Clone)] @@ -18,7 +18,7 @@ pub enum ValueStoreResult { #[derive(Default)] pub struct ValueStore { - results: HashMap, + results: HashMap, input_types: HashMap, flow_input: Value, current_node_id: i64, @@ -101,7 +101,20 @@ impl ValueStore { fn get_result(&mut self, id: i64) -> ValueStoreResult { match self.results.get(&id) { - Some(result) => result.clone(), + Some(result) => match &result.result { + Some(TucanaNodeResult::Success(value)) => ValueStoreResult::Success(value.clone()), + Some(TucanaNodeResult::Error(error)) => { + ValueStoreResult::Error(RuntimeError::from_tucana_error(error)) + } + None => ValueStoreResult::Error(RuntimeError::new( + "T-CORE-000006", + "NodeExecutionResultMissingOutcome", + format!( + "Node {} execution result is missing success/error outcome", + id + ), + )), + }, None => ValueStoreResult::NotFound, } } @@ -130,12 +143,36 @@ impl ValueStore { } pub fn insert_success(&mut self, id: i64, value: Value) { - self.results.insert(id, ValueStoreResult::Success(value)); + let ts = now_unix_ms(); + self.insert_node_result( + id, + NodeExecutionResult { + node_id: id, + started_at: ts, + finished_at: ts, + parameter_results: Vec::new(), + result: Some(TucanaNodeResult::Success(value)), + }, + ); } pub fn insert_error(&mut self, id: i64, runtime_error: RuntimeError) { - self.results - .insert(id, ValueStoreResult::Error(runtime_error)); + let ts = now_unix_ms(); + self.insert_node_result( + id, + NodeExecutionResult { + node_id: id, + started_at: ts, + finished_at: ts, + parameter_results: Vec::new(), + result: Some(TucanaNodeResult::Error(runtime_error.as_tucana_error())), + }, + ); + } + + pub fn insert_node_result(&mut self, id: i64, mut result: NodeExecutionResult) { + result.node_id = id; + self.results.insert(id, result); } pub fn push_runtime_trace_label(&mut self, label: String) { @@ -149,19 +186,18 @@ impl ValueStore { pub fn trace_snapshot(&self) -> StoreSnapshot { let mut results = Vec::with_capacity(self.results.len()); for (node_id, result) in &self.results { - match result { - ValueStoreResult::Success(value) => results.push(StoreResultEntry { - node_id: *node_id, - status: StoreResultStatus::Success, - preview: preview_value(value), - }), - ValueStoreResult::Error(err) => results.push(StoreResultEntry { - node_id: *node_id, - status: StoreResultStatus::Error, - preview: format!("{}:{} {}", err.code, err.category, err.message), - }), - ValueStoreResult::NotFound => {} - } + let preview = match &result.result { + Some(TucanaNodeResult::Success(value)) => preview_value(value), + Some(TucanaNodeResult::Error(err)) => { + format!("{}:{} {}", err.code, err.category, err.message) + } + None => "empty-result".to_string(), + }; + results.push(StoreResultEntry { + node_id: *node_id, + result: result.clone(), + preview, + }); } results.sort_by_key(|entry| entry.node_id); diff --git a/crates/taurus-core/src/runtime/functions/array.rs b/crates/taurus-core/src/runtime/functions/array.rs index 7686c5e..41780c7 100644 --- a/crates/taurus-core/src/runtime/functions/array.rs +++ b/crates/taurus-core/src/runtime/functions/array.rs @@ -76,9 +76,9 @@ fn fail(category: &str, message: impl Into) -> Signal { fn parse_array_and_thunk<'a>( op_name: &str, args: &'a [Argument], -) -> Result<(&'a Value, i64), Signal> { +) -> Result<(&'a Value, &'a crate::handler::argument::Thunk), Signal> { match args { - [Argument::Eval(array_v), Argument::Thunk(thunk)] => Ok((array_v, *thunk)), + [Argument::Eval(array_v), Argument::Thunk(thunk)] => Ok((array_v, thunk)), _ => Err(fail( "InvalidArgumentRuntimeError", format!( @@ -103,8 +103,8 @@ fn run_with_unary_input( input_type: InputType, iter_index: usize, item: &Value, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, - thunk_node: i64, + run: &mut crate::handler::registry::ThunkRunner<'_>, + thunk_node: &crate::handler::argument::Thunk, ) -> Signal { ctx.insert_input_type(input_type, item.clone()); ctx.push_runtime_trace_label(format!("iter={} value={}", iter_index, preview_value(item))); @@ -120,8 +120,8 @@ fn run_with_binary_inputs( cmp_index: usize, left: &Value, right: &Value, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, - thunk_node: i64, + run: &mut crate::handler::registry::ThunkRunner<'_>, + thunk_node: &crate::handler::argument::Thunk, ) -> Signal { ctx.insert_input_type(left_input, left.clone()); ctx.insert_input_type(right_input, right.clone()); @@ -178,7 +178,7 @@ fn comparator_ordering(signal: Signal, reverse: bool) -> Result Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { // array, index args!(args => array: ListValue, index: f64); @@ -203,7 +203,7 @@ fn at( fn concat( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs_v: Value, rhs_v: Value); @@ -237,7 +237,7 @@ fn concat( fn filter( args: &[Argument], ctx: &mut ValueStore, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { let (array_v, predicate_node) = match parse_array_and_thunk("filter", args) { Ok(data) => data, @@ -273,7 +273,7 @@ fn filter( fn find( args: &[Argument], ctx: &mut ValueStore, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { let (array_v, predicate_node) = match parse_array_and_thunk("find", args) { Ok(data) => data, @@ -308,7 +308,7 @@ fn find( fn find_last( args: &[Argument], ctx: &mut ValueStore, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { let (array_v, predicate_node) = match parse_array_and_thunk("find_last", args) { Ok(data) => data, @@ -344,7 +344,7 @@ fn find_last( fn find_index( args: &[Argument], ctx: &mut ValueStore, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { let (array_v, predicate_node) = match parse_array_and_thunk("find_index", args) { Ok(data) => data, @@ -380,7 +380,7 @@ fn find_index( fn first( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array: ListValue); @@ -397,7 +397,7 @@ fn first( fn last( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array: ListValue); match array.values.last() { @@ -413,7 +413,7 @@ fn last( fn for_each( args: &[Argument], ctx: &mut ValueStore, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { let (array_v, transform_node) = match parse_array_and_thunk("for_each", args) { Ok(data) => data, @@ -474,7 +474,7 @@ fn format_value_json(value: &Value) -> String { fn map( args: &[Argument], ctx: &mut ValueStore, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { let (array_v, transform_node) = match parse_array_and_thunk("map", args) { Ok(data) => data, @@ -505,7 +505,7 @@ fn map( fn push( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array_v: Value, item: Value); let Kind::ListValue(mut array) = array_v.kind.ok_or(()).unwrap_or(Kind::NullValue(0)) else { @@ -524,7 +524,7 @@ fn push( fn pop( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array_v: Value); let Kind::ListValue(mut array) = array_v.kind.ok_or(()).unwrap_or(Kind::NullValue(0)) else { @@ -543,7 +543,7 @@ fn pop( fn remove( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array_v: Value, item: Value); let Kind::ListValue(mut array) = array_v.kind.ok_or(()).unwrap_or(Kind::NullValue(0)) else { @@ -570,7 +570,7 @@ fn remove( fn is_empty( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array_v: Value); let Kind::ListValue(array) = array_v.kind.ok_or(()).unwrap_or(Kind::NullValue(0)) else { @@ -588,7 +588,7 @@ fn is_empty( fn size( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array_v: Value); let Kind::ListValue(array) = array_v.kind.ok_or(()).unwrap_or(Kind::NullValue(0)) else { @@ -604,7 +604,7 @@ fn size( fn index_of( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array_v: Value, item: Value); let Kind::ListValue(array) = array_v.kind.ok_or(()).unwrap_or(Kind::NullValue(0)) else { @@ -627,7 +627,7 @@ fn index_of( fn to_unique( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array_v: Value); let Kind::ListValue(array) = array_v.kind.ok_or(()).unwrap_or(Kind::NullValue(0)) else { @@ -653,7 +653,7 @@ fn to_unique( fn sort( args: &[Argument], ctx: &mut ValueStore, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { let (array_v, transform_node) = match parse_array_and_thunk("sort", args) { Ok(data) => data, @@ -718,7 +718,7 @@ fn sort( fn sort_reverse( args: &[Argument], ctx: &mut ValueStore, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { let (array_v, transform_node) = match parse_array_and_thunk("sort_reverse", args) { Ok(data) => data, @@ -783,7 +783,7 @@ fn sort_reverse( fn reverse( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array_v: Value); let Kind::ListValue(mut array) = array_v.kind.ok_or(()).unwrap_or(Kind::NullValue(0)) else { @@ -802,7 +802,7 @@ fn reverse( fn flat( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array_v: Value); let Kind::ListValue(array) = array_v.kind.ok_or(()).unwrap_or(Kind::NullValue(0)) else { @@ -829,7 +829,7 @@ fn flat( fn min( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array: ListValue); @@ -869,7 +869,7 @@ fn min( fn max( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array: ListValue); @@ -909,7 +909,7 @@ fn max( fn sum( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array: ListValue); @@ -949,7 +949,7 @@ fn sum( fn join( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => array: ListValue, separator: String); @@ -976,7 +976,7 @@ mod tests { Argument::Eval(v) } fn a_thunk(id: i64) -> Argument { - Argument::Thunk(id) + Argument::Thunk(crate::handler::argument::Thunk::Node(id)) } fn v_num(n: f64) -> Value { value_from_f64(n) @@ -1033,13 +1033,15 @@ mod tests { } } - fn dummy_run(_: i64, _: &mut ValueStore) -> Signal { + fn dummy_run(_: &crate::handler::argument::Thunk, _: &mut ValueStore) -> Signal { Signal::Success(Value { kind: Some(Kind::NullValue(0)), }) } - fn run_from_bools(seq: Vec) -> impl FnMut(i64, &mut ValueStore) -> Signal { + fn run_from_bools( + seq: Vec, + ) -> impl FnMut(&crate::handler::argument::Thunk, &mut ValueStore) -> Signal { let mut i = 0usize; move |_, _| { let b = *seq.get(i).unwrap_or(&false); @@ -1050,7 +1052,9 @@ mod tests { } } - fn run_from_values(seq: Vec) -> impl FnMut(i64, &mut ValueStore) -> Signal { + fn run_from_values( + seq: Vec, + ) -> impl FnMut(&crate::handler::argument::Thunk, &mut ValueStore) -> Signal { let mut i = 0usize; move |_, _| { let v = seq.get(i).cloned().unwrap_or(Value { @@ -1251,7 +1255,7 @@ mod tests { fn test_for_each_and_map() { let mut ctx = ValueStore::default(); let mut called = 0usize; - let mut run = |_, _ctx: &mut ValueStore| { + let mut run = |_: &crate::handler::argument::Thunk, _ctx: &mut ValueStore| { called += 1; Signal::Success(Value { kind: Some(Kind::NullValue(0)), @@ -1291,7 +1295,7 @@ mod tests { let return_value = v_str("early_return"); let mut for_each_calls = 0usize; - let mut for_each_run = |_, _ctx: &mut ValueStore| { + let mut for_each_run = |_: &crate::handler::argument::Thunk, _ctx: &mut ValueStore| { for_each_calls += 1; Signal::Return(return_value.clone()) }; @@ -1309,7 +1313,7 @@ mod tests { assert_eq!(for_each_calls, 2); let mut map_calls = 0usize; - let mut map_run = |_, _ctx: &mut ValueStore| { + let mut map_run = |_: &crate::handler::argument::Thunk, _ctx: &mut ValueStore| { map_calls += 1; Signal::Return(return_value.clone()) }; @@ -1338,7 +1342,7 @@ mod tests { let mut filter_index = 0usize; let filter_returns = [true, false, true]; - let mut filter_run = |_, _ctx: &mut ValueStore| { + let mut filter_run = |_: &crate::handler::argument::Thunk, _ctx: &mut ValueStore| { let out = filter_returns.get(filter_index).copied().unwrap_or(false); filter_index += 1; Signal::Return(v_bool(out)) @@ -1355,7 +1359,7 @@ mod tests { let mut find_index = 0usize; let find_returns = [false, true]; - let mut find_run = |_, _ctx: &mut ValueStore| { + let mut find_run = |_: &crate::handler::argument::Thunk, _ctx: &mut ValueStore| { let out = find_returns.get(find_index).copied().unwrap_or(false); find_index += 1; Signal::Return(v_bool(out)) diff --git a/crates/taurus-core/src/runtime/functions/boolean.rs b/crates/taurus-core/src/runtime/functions/boolean.rs index c097f54..7dffd93 100644 --- a/crates/taurus-core/src/runtime/functions/boolean.rs +++ b/crates/taurus-core/src/runtime/functions/boolean.rs @@ -24,7 +24,7 @@ pub(crate) const FUNCTIONS: &[FunctionRegistration] = &[ fn as_number( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: bool); Signal::Success(value_from_i64(if value { 1 } else { 0 })) @@ -33,7 +33,7 @@ fn as_number( fn as_text( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: bool); Signal::Success(Value { @@ -44,7 +44,7 @@ fn as_text( fn from_number( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => number: f64); let is_zero = number == 0.0; @@ -56,7 +56,7 @@ fn from_number( fn from_text( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => text: String); @@ -76,7 +76,7 @@ fn from_text( fn is_equal( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: bool, rhs: bool); Signal::Success(Value { @@ -87,7 +87,7 @@ fn is_equal( fn negate( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: bool); Signal::Success(Value { @@ -143,7 +143,7 @@ mod tests { } // dummy `run` closure (unused by these handlers) - fn dummy_run(_: i64, _: &mut ValueStore) -> Signal { + fn dummy_run(_: &crate::handler::argument::Thunk, _: &mut ValueStore) -> Signal { Signal::Success(Value { kind: Some(Kind::BoolValue(true)), }) diff --git a/crates/taurus-core/src/runtime/functions/control.rs b/crates/taurus-core/src/runtime/functions/control.rs index 6cb8a15..94954df 100644 --- a/crates/taurus-core/src/runtime/functions/control.rs +++ b/crates/taurus-core/src/runtime/functions/control.rs @@ -24,7 +24,7 @@ pub(crate) const FUNCTIONS: &[FunctionRegistration] = &[ fn stop( _args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { Signal::Stop } @@ -32,7 +32,7 @@ fn stop( fn value( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: Value); Signal::Success(value) @@ -41,7 +41,7 @@ fn value( fn r#return( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: Value); // The executor decides how far this return unwinds (one frame). @@ -51,7 +51,7 @@ fn r#return( fn r#if( args: &[Argument], ctx: &mut ValueStore, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { let [ Argument::Eval(Value { @@ -70,7 +70,7 @@ fn r#if( if *bool { // Branch execution is delegated to the executor through `run`. ctx.push_runtime_trace_label("branch=if".to_string()); - run(*if_pointer, ctx) + run(if_pointer, ctx) } else { Signal::Success(Value { kind: Some(Kind::NullValue(0)), @@ -81,7 +81,7 @@ fn r#if( fn if_else( args: &[Argument], ctx: &mut ValueStore, - run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { let [ Argument::Eval(Value { @@ -100,9 +100,9 @@ fn if_else( if *bool { ctx.push_runtime_trace_label("branch=if".to_string()); - run(*if_pointer, ctx) + run(if_pointer, ctx) } else { ctx.push_runtime_trace_label("branch=else".to_string()); - run(*else_pointer, ctx) + run(else_pointer, ctx) } } diff --git a/crates/taurus-core/src/runtime/functions/http.rs b/crates/taurus-core/src/runtime/functions/http.rs index 2ca6106..d88e76f 100644 --- a/crates/taurus-core/src/runtime/functions/http.rs +++ b/crates/taurus-core/src/runtime/functions/http.rs @@ -32,7 +32,7 @@ fn fail(category: &str, message: impl Into) -> Signal { fn respond( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => struct_val: Struct); @@ -93,7 +93,7 @@ fn respond( fn create_request( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => http_method: String, headers: Struct, http_url: String, payload: Value); let mut fields = std::collections::HashMap::new(); @@ -116,7 +116,7 @@ fn create_request( fn send_request( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => http_request: Struct); @@ -247,7 +247,7 @@ fn send_request( fn create_response( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => http_status_code: i64, headers: Struct, payload: Value); let mut fields = std::collections::HashMap::new(); @@ -731,7 +731,7 @@ mod tests { kind: Some(Kind::StructValue(request)), })]; let mut ctx = ValueStore::default(); - let mut run = |_: i64, _: &mut ValueStore| Signal::Stop; + let mut run = |_: &crate::handler::argument::Thunk, _: &mut ValueStore| Signal::Stop; let signal = send_request(&args, &mut ctx, &mut run); diff --git a/crates/taurus-core/src/runtime/functions/number.rs b/crates/taurus-core/src/runtime/functions/number.rs index 58c7346..05782a8 100644 --- a/crates/taurus-core/src/runtime/functions/number.rs +++ b/crates/taurus-core/src/runtime/functions/number.rs @@ -73,7 +73,7 @@ pub(crate) const FUNCTIONS: &[FunctionRegistration] = &[ fn has_digits( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); @@ -93,7 +93,7 @@ fn has_digits( fn remove_digits( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); match number_to_i64_lossy(&value) { @@ -109,7 +109,7 @@ fn remove_digits( fn add( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); // Preserve integer precision and overflow checks when both operands are integers. @@ -133,7 +133,7 @@ fn add( fn multiply( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); if let (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) = @@ -156,7 +156,7 @@ fn multiply( fn subtract( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); if let (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) = @@ -179,7 +179,7 @@ fn subtract( fn divide( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); @@ -214,7 +214,7 @@ fn divide( fn modulo( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); @@ -248,7 +248,7 @@ fn modulo( fn abs( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); if let Some(number_value::Number::Integer(i)) = value.number @@ -266,7 +266,7 @@ fn abs( fn is_positive( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -281,7 +281,7 @@ fn is_positive( fn is_greater( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); let lhs = match num_f64(&lhs) { @@ -300,7 +300,7 @@ fn is_greater( fn is_less( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); let lhs = match num_f64(&lhs) { @@ -319,7 +319,7 @@ fn is_less( fn is_zero( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -334,7 +334,7 @@ fn is_zero( fn square( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); if let Some(number_value::Number::Integer(i)) = value.number @@ -352,7 +352,7 @@ fn square( fn exponential( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => base: NumberValue, exponent: NumberValue); match (base.number, exponent.number) { @@ -381,7 +381,7 @@ fn exponential( fn pi( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { no_args!(args); Signal::Success(value_from_f64(f64::consts::PI)) @@ -390,7 +390,7 @@ fn pi( fn euler( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { no_args!(args); Signal::Success(value_from_f64(f64::consts::E)) @@ -399,7 +399,7 @@ fn euler( fn infinity( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { no_args!(args); Signal::Success(value_from_f64(f64::INFINITY)) @@ -408,7 +408,7 @@ fn infinity( fn round_up( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue, decimal_places: NumberValue); let decimal_places = match num_f64(&decimal_places) { @@ -432,7 +432,7 @@ fn round_up( fn round_down( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue, decimal_places: NumberValue); let decimal_places = match num_f64(&decimal_places) { @@ -456,7 +456,7 @@ fn round_down( fn round( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue, decimal_places: NumberValue); let decimal_places = match num_f64(&decimal_places) { @@ -480,7 +480,7 @@ fn round( fn square_root( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -493,7 +493,7 @@ fn square_root( fn root( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue, root: NumberValue); let value = match num_f64(&value) { @@ -510,7 +510,7 @@ fn root( fn log( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue, base: NumberValue); let value = match num_f64(&value) { @@ -556,7 +556,7 @@ fn log( fn ln( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -569,7 +569,7 @@ fn ln( fn from_text( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => string_value: String); @@ -589,7 +589,7 @@ fn from_text( fn as_text( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -604,7 +604,7 @@ fn as_text( fn min( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); if let (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) = @@ -626,7 +626,7 @@ fn min( fn max( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); if let (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) = @@ -648,7 +648,7 @@ fn max( fn negate( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); if let Some(number_value::Number::Integer(i)) = value.number @@ -666,7 +666,7 @@ fn negate( fn random( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => min: NumberValue, max: NumberValue); @@ -695,7 +695,7 @@ fn random( fn sin( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -708,7 +708,7 @@ fn sin( fn cos( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -721,7 +721,7 @@ fn cos( fn tan( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -734,7 +734,7 @@ fn tan( fn arcsin( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -747,7 +747,7 @@ fn arcsin( fn arccos( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -760,7 +760,7 @@ fn arccos( fn arctan( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -773,7 +773,7 @@ fn arctan( fn sinh( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -786,7 +786,7 @@ fn sinh( fn cosh( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue); let value = match num_f64(&value) { @@ -799,7 +799,7 @@ fn cosh( fn clamp( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: NumberValue, min: NumberValue, max: NumberValue); if let ( @@ -828,7 +828,7 @@ fn clamp( fn is_equal( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); let lhs = match num_f64(&lhs) { @@ -904,8 +904,8 @@ mod tests { } } - // dummy runner for handlers that accept `run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal` - fn dummy_run(_: i64, _: &mut ValueStore) -> Signal { + // dummy runner for handlers that accept `run: &mut crate::handler::registry::ThunkRunner<'_>` + fn dummy_run(_: &crate::handler::argument::Thunk, _: &mut ValueStore) -> Signal { Signal::Success(Value { kind: Some(Kind::NullValue(0)), }) diff --git a/crates/taurus-core/src/runtime/functions/object.rs b/crates/taurus-core/src/runtime/functions/object.rs index 0986ef4..283b629 100644 --- a/crates/taurus-core/src/runtime/functions/object.rs +++ b/crates/taurus-core/src/runtime/functions/object.rs @@ -22,7 +22,7 @@ pub(crate) const FUNCTIONS: &[FunctionRegistration] = &[ fn get( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => object: Struct, key: String); match object.fields.get(&key) { @@ -38,7 +38,7 @@ fn get( fn contains_key( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => object: Struct, key: String); let contains = object.fields.contains_key(&key); @@ -51,7 +51,7 @@ fn contains_key( fn size( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => object: Struct); Signal::Success(value_from_i64(object.fields.len() as i64)) @@ -60,7 +60,7 @@ fn size( fn keys( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => object: Struct); @@ -83,7 +83,7 @@ fn keys( fn set( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => object: Struct, key: String, value: Value); let mut new_object = object.clone(); @@ -159,8 +159,8 @@ mod tests { }) } - // dummy runner for handlers that accept `run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal` - fn dummy_run(_: i64, _: &mut ValueStore) -> Signal { + // dummy runner for handlers that accept `run: &mut crate::handler::registry::ThunkRunner<'_>` + fn dummy_run(_: &crate::handler::argument::Thunk, _: &mut ValueStore) -> Signal { Signal::Success(Value { kind: Some(Kind::NullValue(0)), }) diff --git a/crates/taurus-core/src/runtime/functions/text.rs b/crates/taurus-core/src/runtime/functions/text.rs index 383642b..8b44d98 100644 --- a/crates/taurus-core/src/runtime/functions/text.rs +++ b/crates/taurus-core/src/runtime/functions/text.rs @@ -59,7 +59,7 @@ fn arg_err>(msg: S) -> Signal { fn as_bytes( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); @@ -77,7 +77,7 @@ fn as_bytes( fn byte_size( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); Signal::Success(value_from_i64(value.len() as i64)) @@ -86,7 +86,7 @@ fn byte_size( fn capitalize( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); @@ -113,7 +113,7 @@ fn capitalize( fn uppercase( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); Signal::Success(Value { @@ -124,7 +124,7 @@ fn uppercase( fn lowercase( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); Signal::Success(Value { @@ -135,7 +135,7 @@ fn lowercase( fn swapcase( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); @@ -160,7 +160,7 @@ fn swapcase( fn trim( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); Signal::Success(Value { @@ -171,7 +171,7 @@ fn trim( fn chars( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); @@ -190,7 +190,7 @@ fn chars( fn at( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, index: tucana::shared::NumberValue); let index = match number_to_i64_lossy(&index) { @@ -222,7 +222,7 @@ fn at( fn append( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, suffix: String); Signal::Success(Value { @@ -233,7 +233,7 @@ fn append( fn prepend( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, prefix: String); Signal::Success(Value { @@ -244,7 +244,7 @@ fn prepend( fn insert( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, position: tucana::shared::NumberValue, text: String); let position = match number_to_i64_lossy(&position) { @@ -277,7 +277,7 @@ fn insert( fn length( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); Signal::Success(value_from_i64(value.chars().count() as i64)) @@ -286,7 +286,7 @@ fn length( fn remove( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, from: tucana::shared::NumberValue, to: tucana::shared::NumberValue); let from = match number_to_i64_lossy(&from) { @@ -334,7 +334,7 @@ fn remove( fn replace( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, old: String, new: String); let replaced = value.replace(&old, &new); @@ -346,7 +346,7 @@ fn replace( fn replace_first( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, old: String, new: String); let replaced = value.replacen(&old, &new, 1); @@ -358,7 +358,7 @@ fn replace_first( fn replace_last( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, old: String, new: String); @@ -384,7 +384,7 @@ fn replace_last( fn hex( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); @@ -402,7 +402,7 @@ fn hex( fn octal( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); @@ -420,7 +420,7 @@ fn octal( fn index_of( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, sub: String); @@ -433,7 +433,7 @@ fn index_of( fn contains( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, sub: String); Signal::Success(Value { @@ -444,7 +444,7 @@ fn contains( fn split( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, delimiter: String); @@ -463,7 +463,7 @@ fn split( fn reverse( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); @@ -476,7 +476,7 @@ fn reverse( fn starts_with( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, prefix: String); Signal::Success(Value { @@ -487,7 +487,7 @@ fn starts_with( fn ends_with( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, suffix: String); Signal::Success(Value { @@ -498,7 +498,7 @@ fn ends_with( fn to_ascii( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String); @@ -515,7 +515,7 @@ fn to_ascii( fn from_ascii( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { // Requires a TryFromArg impl for ListValue in your macro system. args!(args => list: ListValue); @@ -546,7 +546,7 @@ fn from_ascii( fn encode( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, encoding: String); @@ -565,7 +565,7 @@ fn encode( fn decode( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => value: String, encoding: String); @@ -600,7 +600,7 @@ fn decode( fn is_equal( args: &[Argument], _ctx: &mut ValueStore, - _run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal, + _run: &mut crate::handler::registry::ThunkRunner<'_>, ) -> Signal { args!(args => lhs: String, rhs: String); Signal::Success(Value { @@ -674,8 +674,8 @@ mod tests { } } - // dummy runner for handlers that accept `run: &mut dyn FnMut(i64, &mut ValueStore) -> Signal` - fn dummy_run(_: i64, _: &mut ValueStore) -> Signal { + // dummy runner for handlers that accept `run: &mut crate::handler::registry::ThunkRunner<'_>` + fn dummy_run(_: &crate::handler::argument::Thunk, _: &mut ValueStore) -> Signal { Signal::Success(Value { kind: Some(Kind::NullValue(0)), }) diff --git a/crates/taurus-core/src/runtime/remote/mod.rs b/crates/taurus-core/src/runtime/remote/mod.rs index f5ab662..f94c3ec 100644 --- a/crates/taurus-core/src/runtime/remote/mod.rs +++ b/crates/taurus-core/src/runtime/remote/mod.rs @@ -4,7 +4,7 @@ //! trait without coupling the core engine to a specific transport. use async_trait::async_trait; -use tucana::{aquila::ExecutionRequest, shared::Value}; +use tucana::{aquila::ActionExecutionRequest, shared::NodeExecutionResult}; use crate::types::errors::runtime_error::RuntimeError; @@ -12,11 +12,14 @@ pub struct RemoteExecution { /// Remote service identifier to route the call. pub target_service: String, /// Execution request payload expected by the remote runtime. - pub request: ExecutionRequest, + pub request: ActionExecutionRequest, } #[async_trait] pub trait RemoteRuntime { /// Execute a remote node invocation and return its resulting value. - async fn execute_remote(&self, execution: RemoteExecution) -> Result; + async fn execute_remote( + &self, + execution: RemoteExecution, + ) -> Result; } diff --git a/crates/taurus-core/src/time.rs b/crates/taurus-core/src/time.rs new file mode 100644 index 0000000..e3a90e4 --- /dev/null +++ b/crates/taurus-core/src/time.rs @@ -0,0 +1,10 @@ +//! Shared time helpers for runtime metadata. + +use std::time::{SystemTime, UNIX_EPOCH}; + +pub fn now_unix_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|it| it.as_millis() as i64) + .unwrap_or(0) +} diff --git a/crates/taurus-core/src/types/errors/runtime_error.rs b/crates/taurus-core/src/types/errors/runtime_error.rs index edafa7c..afb4e85 100644 --- a/crates/taurus-core/src/types/errors/runtime_error.rs +++ b/crates/taurus-core/src/types/errors/runtime_error.rs @@ -9,10 +9,13 @@ use std::collections::HashMap; use std::error::Error; use std::fmt::{Display, Formatter}; -use std::time::{SystemTime, UNIX_EPOCH}; use tucana::shared::value::Kind::{NumberValue, StringValue, StructValue}; -use tucana::shared::{NumberValue as ProtoNumberValue, Struct, Value, number_value}; +use tucana::shared::{ + Error as TucanaError, NumberValue as ProtoNumberValue, Struct, Value, number_value, +}; + +use crate::time::now_unix_ms; /// Runtime execution failure representation. #[derive(Debug, Clone, PartialEq)] @@ -44,7 +47,7 @@ impl RuntimeError { code: code.into(), category: category.into(), message: message.into(), - timestamp_unix_ms: now_unix_ms(), + timestamp_unix_ms: now_unix_ms() as u64, version: env!("CARGO_PKG_VERSION").to_string(), dependencies: HashMap::new(), details: HashMap::new(), @@ -140,6 +143,38 @@ impl RuntimeError { })), } } + + /// Convert to transport-level shared error object. + pub fn as_tucana_error(&self) -> TucanaError { + TucanaError { + code: self.code.clone(), + category: self.category.clone(), + message: self.message.clone(), + timestamp: self.timestamp_unix_ms as i64, + version: self.version.clone(), + dependencies: self.dependencies.clone(), + details: Some(Struct { + fields: self.details.clone(), + }), + } + } + + /// Build a runtime error from a transport-level shared error object. + pub fn from_tucana_error(error: &TucanaError) -> Self { + Self { + code: error.code.clone(), + category: error.category.clone(), + message: error.message.clone(), + timestamp_unix_ms: error.timestamp.max(0) as u64, + version: error.version.clone(), + dependencies: error.dependencies.clone(), + details: error + .details + .as_ref() + .map(|it| it.fields.clone()) + .unwrap_or_default(), + } + } } impl Default for RuntimeError { @@ -155,10 +190,3 @@ impl Display for RuntimeError { write!(f, "[{}] {}: {}", self.code, self.category, self.message) } } - -fn now_unix_ms() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|it| it.as_millis() as u64) - .unwrap_or(0) -} diff --git a/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs b/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs index 9ef694b..5861964 100644 --- a/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs +++ b/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs @@ -3,8 +3,8 @@ use prost::Message; use taurus_core::runtime::remote::{RemoteExecution, RemoteRuntime}; use taurus_core::types::errors::runtime_error::RuntimeError; use tonic::async_trait; -use tucana::aquila::ExecutionResult; -use tucana::shared::Value; +use tucana::aquila::ActionExecutionResponse; +use tucana::shared::NodeExecutionResult; pub struct NATSRemoteRuntime { client: Client, @@ -18,7 +18,10 @@ impl NATSRemoteRuntime { #[async_trait] impl RemoteRuntime for NATSRemoteRuntime { - async fn execute_remote(&self, execution: RemoteExecution) -> Result { + async fn execute_remote( + &self, + execution: RemoteExecution, + ) -> Result { let topic = format!( "action.{}.{}", execution.target_service, execution.request.execution_identifier @@ -31,51 +34,41 @@ impl RemoteRuntime for NATSRemoteRuntime { Ok(r) => r, Err(err) => { log::error!( - "RemoteRuntimeExeption: failed to handle NATS message: {}", + "RemoteRuntimeException: failed to handle NATS message: {}", err ); return Err(RuntimeError::new( "T-PROV-000001", - "RemoteRuntimeExeption", + "RemoteRuntimeException", "Failed to receive any response messages from a remote runtime.", )); } }; - let decode_result = ExecutionResult::decode(message.payload); - let execution_result = match decode_result { - Ok(r) => r, + let decode_result = ActionExecutionResponse::decode(message.payload); + match decode_result { + Ok(r) => match r.node_result { + Some(res) => Ok(res), + None => { + log::error!("RemoteRuntimeException: received execution result without a body"); + Err(RuntimeError::new( + "T-PROV-000003", + "RemoteRuntimeException", + "Received empty action execution response", + )) + } + }, Err(err) => { log::error!( - "RemoteRuntimeExeption: failed to decode NATS message: {}", + "RemoteRuntimeException: failed to decode NATS message: {}", err ); return Err(RuntimeError::new( "T-PROV-000002", - "RemoteRuntimeExeption", + "RemoteRuntimeException", "Failed to read Remote Response", )); } - }; - - match execution_result.result { - Some(result) => match result { - tucana::aquila::execution_result::Result::Success(value) => Ok(value), - tucana::aquila::execution_result::Result::Error(err) => { - let code = err.code.to_string(); - let description = match err.description { - Some(string) => string, - None => "Unknown Error".to_string(), - }; - let error = RuntimeError::new(code, "RemoteExecutionError", description); - Err(error) - } - }, - None => Err(RuntimeError::new( - "T-PROV-000003", - "RemoteRuntimeExeption", - "Result of Remote Response was empty.", - )), } } } diff --git a/crates/taurus-tests/src/main.rs b/crates/taurus-tests/src/main.rs index 2cf79d0..db6ffd2 100644 --- a/crates/taurus-tests/src/main.rs +++ b/crates/taurus-tests/src/main.rs @@ -132,7 +132,7 @@ impl Testable for Case { flow_input, None, None, - true, + false, ); match res { diff --git a/crates/taurus/Cargo.toml b/crates/taurus/Cargo.toml index b6cb26a..9d54c3a 100644 --- a/crates/taurus/Cargo.toml +++ b/crates/taurus/Cargo.toml @@ -18,3 +18,7 @@ tonic-health = { workspace = true } tonic = { workspace = true } taurus-core = { workspace = true } taurus-provider = { workspace = true } + +[dev-dependencies] +serde = { workspace = true } +serde_json = { workspace = true } diff --git a/crates/taurus/src/app/mod.rs b/crates/taurus/src/app/mod.rs index 93491ca..fe3712e 100644 --- a/crates/taurus/src/app/mod.rs +++ b/crates/taurus/src/app/mod.rs @@ -1,11 +1,10 @@ mod worker; -use std::sync::Arc; -use std::time::Duration; - use code0_flow::flow_config::load_env_file; use code0_flow::flow_config::mode::Mode::DYNAMIC; use code0_flow::flow_service::FlowUpdateService; +use std::sync::Arc; +use std::time::Duration; use taurus_core::runtime::engine::ExecutionEngine; use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter; use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime; @@ -13,7 +12,6 @@ use tokio::signal; use tokio::task::JoinHandle; use tokio::time::sleep; use tonic_health::pb::health_server::HealthServer; -use tucana::shared::{RuntimeFeature, Translation}; use crate::client::runtime_status::TaurusRuntimeStatusService; use crate::client::runtime_usage::TaurusRuntimeUsageService; @@ -124,7 +122,6 @@ async fn setup_dynamic_services_if_needed( config.aquila_url.clone(), config.aquila_token.clone(), "taurus".into(), - runtime_features(), ) .await, )); @@ -152,7 +149,9 @@ async fn setup_dynamic_services_if_needed( loop { interval.tick().await; status_service - .update_runtime_status(tucana::shared::execution_runtime_status::Status::Running) + .update_runtime_status( + tucana::shared::execution_runtime_status::Status::Running, + ) .await; } }); @@ -198,19 +197,6 @@ async fn push_definitions_until_success(config: &Config) { } } -fn runtime_features() -> Vec { - vec![RuntimeFeature { - name: vec![Translation { - code: "en-US".to_string(), - content: "Runtime".to_string(), - }], - description: vec![Translation { - code: "en-US".to_string(), - content: "Will execute incoming flows.".to_string(), - }], - }] -} - async fn update_stopped_status(runtime_status_service: Option<&Arc>) { if let Some(status_service) = runtime_status_service { status_service diff --git a/crates/taurus/src/app/worker.rs b/crates/taurus/src/app/worker.rs index 7ee59b4..a953a86 100644 --- a/crates/taurus/src/app/worker.rs +++ b/crates/taurus/src/app/worker.rs @@ -3,10 +3,14 @@ use std::time::Instant; use futures_lite::StreamExt; use prost::Message; use taurus_core::runtime::engine::{EmitType, ExecutionEngine, ExecutionId, RespondEmitter}; +use taurus_core::time::now_unix_ms; +use taurus_core::types::errors::runtime_error::RuntimeError; +use taurus_core::types::signal::Signal; use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter; use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime; use tokio::task::JoinHandle; -use tucana::shared::{ExecutionFlow, RuntimeUsage, Value}; +use tucana::shared::execution_result; +use tucana::shared::{ExecutionFlow, ExecutionResult, RuntimeUsage, Value}; use crate::client::runtime_usage::TaurusRuntimeUsageService; @@ -18,7 +22,7 @@ pub fn spawn_worker( runtime_usage_service: Option, ) -> JoinHandle<()> { tokio::spawn(async move { - let mut subscription = match client + let mut execution_subscription = match client .queue_subscribe(String::from("execution.*"), "taurus".into()) .await { @@ -32,30 +36,75 @@ pub fn spawn_worker( } }; - while let Some(message) = subscription.next().await { - process_message( - message, - &engine, - &nats_remote, - &runtime_emitter, - runtime_usage_service.as_ref(), - ) - .await; + let mut test_execution_subscription = match client + .queue_subscribe(String::from("test_execution.*"), "taurus".into()) + .await + { + Ok(subscription) => { + log::info!("Subscribed to 'test_execution.*'"); + subscription + } + Err(err) => { + log::error!("Failed to subscribe to 'test_execution.*': {:?}", err); + return; + } + }; + + let mut execution_closed = false; + let mut test_execution_closed = false; + + while !(execution_closed && test_execution_closed) { + tokio::select! { + message = execution_subscription.next(), if !execution_closed => { + match message { + Some(message) => { + process_execution_message( + message, + &engine, + &nats_remote, + &runtime_emitter, + runtime_usage_service.as_ref(), + ).await; + } + None => { + execution_closed = true; + log::warn!("Subscription 'execution.*' ended"); + } + } + } + message = test_execution_subscription.next(), if !test_execution_closed => { + match message { + Some(message) => { + process_test_execution_message( + &client, + message, + &engine, + &nats_remote, + runtime_usage_service.as_ref(), + ).await; + } + None => { + test_execution_closed = true; + log::warn!("Subscription 'test_execution.*' ended"); + } + } + } + } } log::info!("NATS worker loop ended"); }) } -async fn process_message( +async fn process_execution_message( message: async_nats::Message, engine: &ExecutionEngine, nats_remote: &NATSRemoteRuntime, runtime_emitter: &NATSRespondEmitter, runtime_usage_service: Option<&TaurusRuntimeUsageService>, ) { - let requested_execution_id = - parse_execution_id_from_subject(&message.subject).unwrap_or_else(|| { + let requested_execution_id = parse_execution_id_from_subject(&message.subject, "execution") + .unwrap_or_else(|| { let generated = ExecutionId::new_v4(); log::warn!( "Expected subject format 'execution.', got '{}'; generated execution id {}", @@ -83,7 +132,7 @@ async fn process_message( let respond_emitter = |execution_id, emit_type: EmitType, value: Value| { runtime_emitter.emit(execution_id, emit_type, value); }; - let runtime_usage = execute_flow( + let run_result = execute_flow( requested_execution_id, flow, engine, @@ -96,8 +145,78 @@ async fn process_message( ); if let Some(usage_service) = runtime_usage_service { - usage_service.update_runtime_usage(runtime_usage).await; + usage_service + .update_runtime_usage(run_result.runtime_usage) + .await; + } +} + +async fn process_test_execution_message( + client: &async_nats::Client, + message: async_nats::Message, + engine: &ExecutionEngine, + nats_remote: &NATSRemoteRuntime, + runtime_usage_service: Option<&TaurusRuntimeUsageService>, +) { + if message.reply.is_none() { + log::warn!( + "Received test execution request without reply subject on '{}'; ignoring request", + message.subject + ); + return; + } + + let requested_execution_id = + match parse_execution_id_from_subject(&message.subject, "test_execution") { + Some(res) => res, + None => { + log::error!("Failed to extract execution uuid from {}", &message.subject); + return; + } + }; + + let flow: ExecutionFlow = match ExecutionFlow::decode(&*message.payload) { + Ok(flow) => flow, + Err(err) => { + log::error!( + "Failed to deserialize test execution flow: {:?}, payload: {:?}", + err, + &message.payload + ); + let result = build_decode_error_result(requested_execution_id); + respond_to_test_execution_request(client, &message, result).await; + return; + } + }; + + let run_result = execute_flow(requested_execution_id, flow, engine, nats_remote, None); + + if let Some(usage_service) = runtime_usage_service { + usage_service + .update_runtime_usage(run_result.runtime_usage.clone()) + .await; } + + let execution_result = build_execution_result( + run_result.execution_id, + run_result.flow_id, + run_result.started_at, + run_result.finished_at, + run_result.input, + run_result.signal, + ); + respond_to_test_execution_request(client, &message, execution_result).await; +} + +#[derive(Clone)] +struct FlowRunResult { + execution_id: ExecutionId, + flow_id: i64, + started_at: i64, + finished_at: i64, + input: Option, + signal: Signal, + runtime_usage: RuntimeUsage, } fn execute_flow( @@ -106,29 +225,308 @@ fn execute_flow( engine: &ExecutionEngine, nats_remote: &NATSRemoteRuntime, respond_emitter: Option<&dyn RespondEmitter>, -) -> RuntimeUsage { +) -> FlowRunResult { + let started_at = now_unix_ms(); let start = Instant::now(); let flow_id = flow.flow_id; - let (_signal, _reason) = engine.execute_flow_with_execution_id( + let input = flow.input_value.clone(); + let (signal, _reason) = engine.execute_flow_with_execution_id( execution_id, flow, Some(nats_remote), respond_emitter, true, ); + let finished_at = now_unix_ms(); let duration_millis = start.elapsed().as_millis() as i64; - RuntimeUsage { + FlowRunResult { + execution_id, flow_id, - duration: duration_millis, + started_at, + finished_at, + input, + signal, + runtime_usage: RuntimeUsage { + flow_id, + duration: duration_millis, + }, } } -fn parse_execution_id_from_subject(subject: &async_nats::Subject) -> Option { +fn parse_execution_id_from_subject( + subject: &async_nats::Subject, + prefix: &str, +) -> Option { let raw = subject.as_str(); let mut parts = raw.split('.'); match (parts.next(), parts.next(), parts.next()) { - (Some("execution"), Some(uuid), None) => ExecutionId::parse_str(uuid).ok(), + (Some(found_prefix), Some(uuid), None) if found_prefix == prefix => { + ExecutionId::parse_str(uuid).ok() + } _ => None, } } + +fn build_execution_result( + execution_id: ExecutionId, + flow_id: i64, + started_at: i64, + finished_at: i64, + input: Option, + signal: Signal, +) -> ExecutionResult { + let result = match signal { + Signal::Success(value) | Signal::Return(value) | Signal::Respond(value) => { + Some(execution_result::Result::Success(value)) + } + Signal::Failure(err) => Some(execution_result::Result::Error(err.as_tucana_error())), + Signal::Stop => Some(execution_result::Result::Success(Value { + kind: Some(tucana::shared::value::Kind::NullValue(0)), + })), + }; + + ExecutionResult { + execution_identifier: execution_id.to_string(), + flow_id, + started_at, + finished_at, + input, + node_execution_results: Vec::new(), + result, + } +} + +fn build_decode_error_result(execution_id: ExecutionId) -> ExecutionResult { + let now = now_unix_ms(); + let runtime_error = RuntimeError::new( + "T-TAURUS-000001", + "ExecutionFlowDecodeError", + "Failed to decode test execution flow payload", + ); + + ExecutionResult { + execution_identifier: execution_id.to_string(), + flow_id: 0, + started_at: now, + finished_at: now, + input: None, + node_execution_results: Vec::new(), + result: Some(execution_result::Result::Error( + runtime_error.as_tucana_error(), + )), + } +} + +async fn respond_to_test_execution_request( + client: &async_nats::Client, + message: &async_nats::Message, + result: ExecutionResult, +) { + let Some(reply_subject) = message.reply.as_ref() else { + log::warn!( + "Received test execution request without reply subject on '{}'; cannot return ExecutionResult", + message.subject + ); + return; + }; + + if let Err(err) = client + .publish(reply_subject.clone(), result.encode_to_vec().into()) + .await + { + log::error!( + "Failed to publish test execution response on '{}': {:?}", + reply_subject, + err + ); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + use std::time::Duration; + + use prost::Message; + use serde::Deserialize; + use taurus_core::runtime::engine::ExecutionEngine; + use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter; + use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime; + use tucana::shared::{ + ValidationFlow, execution_result, + helper::value::{from_json_value, to_json_value}, + }; + + #[derive(Deserialize)] + struct FixtureInput { + input: Option, + expected_result: serde_json::Value, + } + + #[derive(Deserialize)] + struct FlowFixture { + inputs: Vec, + flow: ValidationFlow, + } + + static NATS_TEST_LOCK: Mutex<()> = Mutex::new(()); + + #[test] + #[ignore = "requires a running NATS server at NATS_URL or nats://127.0.0.1:4222"] + fn test_execution_request_returns_execution_result_over_nats() { + let _lock = NATS_TEST_LOCK + .lock() + .expect("NATS test lock should not be poisoned"); + runtime().block_on(async { + let client = connect_test_nats().await; + let worker = spawn_test_worker(client.clone()); + + let execution_id = ExecutionId::new_v4(); + let fixture = load_fixture("flows/01_return_object.json"); + let expected_result = fixture.inputs[0].expected_result.clone(); + let flow = execution_flow_from_fixture(fixture); + let response = request_execution_result(&client, execution_id, flow.encode_to_vec()) + .await + .expect("test execution request should receive an ExecutionResult response"); + + worker.abort(); + + assert_eq!(response.execution_identifier, execution_id.to_string()); + assert_eq!(response.flow_id, flow.flow_id); + assert!(response.started_at > 0); + assert!(response.finished_at >= response.started_at); + assert_eq!(response.input, None); + assert!(response.node_execution_results.is_empty()); + + match response.result { + Some(execution_result::Result::Success(value)) => { + assert_eq!(to_json_value(value), expected_result); + } + other => panic!("expected successful test execution result, got {:?}", other), + } + }); + } + + #[test] + #[ignore = "requires a running NATS server at NATS_URL or nats://127.0.0.1:4222"] + fn test_execution_request_returns_decode_error_over_nats() { + let _lock = NATS_TEST_LOCK + .lock() + .expect("NATS test lock should not be poisoned"); + runtime().block_on(async { + let client = connect_test_nats().await; + let worker = spawn_test_worker(client.clone()); + + let execution_id = ExecutionId::new_v4(); + let response = + request_execution_result(&client, execution_id, b"not protobuf".to_vec()) + .await + .expect("malformed test execution request should receive an error response"); + + worker.abort(); + + assert_eq!(response.execution_identifier, execution_id.to_string()); + assert_eq!(response.flow_id, 0); + assert!(response.started_at > 0); + assert!(response.finished_at >= response.started_at); + assert_eq!(response.input, None); + assert!(response.node_execution_results.is_empty()); + + match response.result { + Some(execution_result::Result::Error(error)) => { + assert_eq!(error.code, "T-TAURUS-000001"); + assert_eq!(error.category, "ExecutionFlowDecodeError"); + assert_eq!( + error.message, + "Failed to decode test execution flow payload" + ); + } + other => panic!("expected decode error result, got {:?}", other), + } + }); + } + + fn runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("test runtime should build") + } + + async fn connect_test_nats() -> async_nats::Client { + let nats_url = + std::env::var("NATS_URL").unwrap_or_else(|_| "nats://127.0.0.1:4222".to_string()); + async_nats::connect(&nats_url) + .await + .unwrap_or_else(|err| panic!("failed to connect to NATS at {nats_url}: {err}")) + } + + fn spawn_test_worker(client: async_nats::Client) -> tokio::task::JoinHandle<()> { + let engine = ExecutionEngine::new(); + let nats_remote = NATSRemoteRuntime::new(client.clone()); + let runtime_emitter = NATSRespondEmitter::new(client.clone()); + spawn_worker(client, engine, nats_remote, runtime_emitter, None) + } + + async fn request_execution_result( + client: &async_nats::Client, + execution_id: ExecutionId, + payload: Vec, + ) -> Result { + let subject = format!("test_execution.{execution_id}"); + + for attempt in 1..=10 { + match tokio::time::timeout( + Duration::from_secs(2), + client.request(subject.clone(), payload.clone().into()), + ) + .await + { + Ok(Ok(message)) => { + return ExecutionResult::decode(&*message.payload) + .map_err(|err| format!("failed to decode ExecutionResult: {err}")); + } + Ok(Err(err)) if attempt < 10 => { + log::debug!( + "test execution request attempt {} failed before subscription was ready: {:?}", + attempt, + err + ); + tokio::time::sleep(Duration::from_millis(100)).await; + } + Ok(Err(err)) => return Err(format!("NATS request failed: {err}")), + Err(_) if attempt < 10 => { + tokio::time::sleep(Duration::from_millis(100)).await; + } + Err(_) => return Err("timed out waiting for test execution response".to_string()), + } + } + + Err("test execution request did not complete".to_string()) + } + + fn load_fixture(path: &str) -> FlowFixture { + let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .join("../..") + .join(path); + let content = std::fs::read_to_string(&path) + .unwrap_or_else(|err| panic!("failed to read fixture {}: {err}", path.display())); + serde_json::from_str(&content) + .unwrap_or_else(|err| panic!("failed to parse fixture {}: {err}", path.display())) + } + + fn execution_flow_from_fixture(fixture: FlowFixture) -> ExecutionFlow { + ExecutionFlow { + flow_id: fixture.flow.flow_id, + project_id: fixture.flow.project_id, + starting_node_id: fixture.flow.starting_node_id, + node_functions: fixture.flow.node_functions, + input_value: fixture + .inputs + .first() + .and_then(|input| input.input.clone().map(from_json_value)), + } + } +} diff --git a/crates/taurus/src/client/runtime_status.rs b/crates/taurus/src/client/runtime_status.rs index 12a64dc..f2f21bb 100644 --- a/crates/taurus/src/client/runtime_status.rs +++ b/crates/taurus/src/client/runtime_status.rs @@ -9,37 +9,25 @@ use tucana::{ RuntimeStatusUpdateRequest, runtime_status_service_client::RuntimeStatusServiceClient, runtime_status_update_request::Status, }, - shared::{ExecutionRuntimeStatus, RuntimeFeature}, + shared::ExecutionRuntimeStatus, }; pub struct TaurusRuntimeStatusService { channel: Channel, identifier: String, - features: Vec, aquila_token: String, } impl TaurusRuntimeStatusService { - pub async fn from_url( - aquila_url: String, - aquila_token: String, - identifier: String, - features: Vec, - ) -> Self { + pub async fn from_url(aquila_url: String, aquila_token: String, identifier: String) -> Self { let channel = create_channel_with_retry("Aquila", aquila_url).await; - Self::new(channel, aquila_token, identifier, features) + Self::new(channel, aquila_token, identifier) } - pub fn new( - channel: Channel, - aquila_token: String, - identifier: String, - features: Vec, - ) -> Self { + pub fn new(channel: Channel, aquila_token: String, identifier: String) -> Self { TaurusRuntimeStatusService { channel, identifier, - features, aquila_token, } } @@ -68,7 +56,6 @@ impl TaurusRuntimeStatusService { status: status.into(), timestamp: timestamp as i64, identifier: self.identifier.clone(), - features: self.features.clone(), })), }, ); diff --git a/docs/errors.md b/docs/errors.md index 6355d41..1b2a1ca 100644 --- a/docs/errors.md +++ b/docs/errors.md @@ -10,6 +10,7 @@ This document is the canonical catalog for runtime error codes emitted by Taurus - `T-STD-XXXXX`: Errors originating inside standard function implementations under `runtime/functions/*`. - `T-CORE-XXXXXX`: Errors originating from core runtime infrastructure. +- `T-TAURUS-XXXXXX`: Errors originating from the Taurus runtime app layer. - `T-PROV-XXXXXX`: Errors originating from provider integrations (transport adapters, remote runtime connectors). ## Code Table @@ -23,10 +24,13 @@ This document is the canonical catalog for runtime error codes emitted by Taurus | `T-CORE-000003` | Engine | Flow requires remote execution but no remote runtime adapter was configured. | Node execution target is remote while `RemoteRuntime` is `None`. | `runtime/engine/executor.rs` | | `T-CORE-000004` | Engine | Reference lookup failed in the execution value store. | Missing prior node result, missing flow input path, or unresolved input reference. | `runtime/engine/executor.rs` | | `T-CORE-000005` | Engine | Remote request cannot be assembled because parameter metadata and resolved values diverge. | Parameter count mismatch during remote request materialization. | `runtime/engine/executor.rs` | +| `T-CORE-000006` | Engine | Node execution result exists without a success/error outcome. | Provider or value store returned a `NodeExecutionResult` with no `result` field. | `runtime/engine/executor.rs`, `runtime/execution/value_store.rs` | | `T-CORE-000101` | Compiler | Flow compilation failed because a node id appears more than once. | Duplicate `database_id` in input nodes. | `runtime/engine/compiler.rs` | | `T-CORE-000102` | Compiler | Flow compilation failed because the declared start node is absent. | `start_node_id` not found in node list. | `runtime/engine/compiler.rs` | | `T-CORE-000103` | Compiler | Flow compilation failed because a `next` edge points to a missing node. | `next_node_id` references unknown node id. | `runtime/engine/compiler.rs` | | `T-CORE-000104` | Compiler | Flow compilation failed because a parameter is structurally incomplete. | Parameter has no value payload in IR. | `runtime/engine/compiler.rs` | +| `T-CORE-000105` | Compiler | Flow compilation failed because a sub-flow parameter is missing its execution reference. | `sub_flow.execution_reference` is absent. | `runtime/engine/compiler.rs` | +| `T-CORE-000107` | Engine | Function sub-flow execution failed because a required setting value is missing. | A non-optional sub-flow setting has no callback input value and no default value. | `runtime/engine/executor.rs` | | `T-CORE-000201` | Handler | Handler argument arity contract was violated before function execution began. | `args!`/`no_args!` macro expected different argument count. | `handler/macros.rs` | | `T-CORE-000202` | Handler | Handler argument type conversion failed during typed extraction. | `TryFromArgument` expected type does not match provided argument. | `handler/argument.rs` | | `T-CORE-000301` | App Error Mapping | Application configuration failure mapped into runtime error format. | Invalid/missing runtime config surfaced as `Error::Configuration`. | `types/errors/error.rs` | @@ -35,6 +39,7 @@ This document is the canonical catalog for runtime error codes emitted by Taurus | `T-CORE-000304` | App Error Mapping | Serialization/deserialization failure mapped into runtime error format. | Encoding/decoding/parsing failure surfaced as `Error::Serialization`. | `types/errors/error.rs` | | `T-CORE-000399` | App Error Mapping | Internal application failure mapped into runtime error format. | Catch-all non-domain internal failure surfaced as `Error::Internal`. | `types/errors/error.rs` | | `T-CORE-999999` | Runtime Error Fallback | Default fallback runtime error code when no explicit mapping is provided. | `RuntimeError::default()` used as defensive fallback. | `types/errors/runtime_error.rs` | +| `T-TAURUS-000001` | Taurus App | Test execution request payload could not be decoded as an execution flow. | Malformed or schema-incompatible payload published to the test execution NATS subject. | `taurus/src/app/worker.rs` | | `T-PROV-000001` | Provider Remote Runtime | Remote request to NATS did not yield a valid response message. | NATS request failed or timed out while waiting for remote runtime answer. | `taurus-provider/providers/remote/nats_remote_runtime.rs` | | `T-PROV-000002` | Provider Remote Runtime | Remote runtime response could not be decoded into expected protobuf structure. | Received payload is malformed, truncated, or schema-incompatible for `ExecutionResult`. | `taurus-provider/providers/remote/nats_remote_runtime.rs` | | `T-PROV-000003` | Provider Remote Runtime | Remote runtime response decoded, but contained no concrete result field. | `ExecutionResult` exists but `result` is `None` (protocol contract violation). | `taurus-provider/providers/remote/nats_remote_runtime.rs` | diff --git a/flows/03_for_each.json b/flows/03_for_each.json index 383688f..6851a59 100644 --- a/flows/03_for_each.json +++ b/flows/03_for_each.json @@ -61,7 +61,11 @@ "databaseId": "12", "runtimeParameterId": "consumer", "value": { - "nodeFunctionId": "6" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "6" + } } } ] diff --git a/flows/05_if_control.json b/flows/05_if_control.json index 6aa4251..bc9847f 100644 --- a/flows/05_if_control.json +++ b/flows/05_if_control.json @@ -148,7 +148,11 @@ "databaseId": "50", "runtimeParameterId": "runnable", "value": { - "nodeFunctionId": "12" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "12" + } } } ], diff --git a/flows/06_if_else_control.json b/flows/06_if_else_control.json index 04e9d12..4c05804 100644 --- a/flows/06_if_else_control.json +++ b/flows/06_if_else_control.json @@ -148,14 +148,22 @@ "databaseId": "63", "runtimeParameterId": "runnable", "value": { - "nodeFunctionId": "19" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "19" + } } }, { "databaseId": "68", "runtimeParameterId": "else_runnable", "value": { - "nodeFunctionId": "17" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "17" + } } } ] diff --git a/flows/07_simple_return.json b/flows/07_simple_return.json index 25111d8..eae7552 100644 --- a/flows/07_simple_return.json +++ b/flows/07_simple_return.json @@ -19,7 +19,11 @@ "expected_result": { "http_status_code": 200, "headers": {}, - "payload": [null, null, "username"] + "payload": [ + null, + null, + "username" + ] } } ], @@ -86,7 +90,11 @@ "databaseId": "31", "runtimeParameterId": "transform", "value": { - "nodeFunctionId": "10" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "10" + } } } ], @@ -137,7 +145,11 @@ "databaseId": "35", "runtimeParameterId": "runnable", "value": { - "nodeFunctionId": "7" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "7" + } } } ], diff --git a/flows/09_filter_return.json b/flows/09_filter_return.json index aa3cb79..accef8f 100644 --- a/flows/09_filter_return.json +++ b/flows/09_filter_return.json @@ -121,7 +121,11 @@ "databaseId": "15", "runtimeParameterId": "predicate", "value": { - "nodeFunctionId": "8" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "8" + } } } ], @@ -179,14 +183,22 @@ "databaseId": "21", "runtimeParameterId": "runnable", "value": { - "nodeFunctionId": "1" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "1" + } } }, { "databaseId": "22", "runtimeParameterId": "else_runnable", "value": { - "nodeFunctionId": "3" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "3" + } } } ], @@ -293,7 +305,11 @@ "databaseId": "24", "runtimeParameterId": "transform", "value": { - "nodeFunctionId": "6" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "6" + } } } ], diff --git a/flows/10_multiple_respond.json b/flows/10_multiple_respond.json index 6332d09..592b624 100644 --- a/flows/10_multiple_respond.json +++ b/flows/10_multiple_respond.json @@ -50,7 +50,7 @@ "headers": { "Content-Type": "text/plain" }, - "payload": null + "payload": [] }, "expected_result": { "headers": { @@ -108,7 +108,11 @@ "databaseId": "15", "runtimeParameterId": "predicate", "value": { - "nodeFunctionId": "8" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "8" + } } } ], @@ -166,14 +170,22 @@ "databaseId": "21", "runtimeParameterId": "runnable", "value": { - "nodeFunctionId": "1" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "1" + } } }, { "databaseId": "22", "runtimeParameterId": "else_runnable", "value": { - "nodeFunctionId": "3" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "3" + } } } ], @@ -280,7 +292,11 @@ "databaseId": "24", "runtimeParameterId": "transform", "value": { - "nodeFunctionId": "6" + "sub_flow": { + "signature": "", + "settings": [], + "startingNodeId": "6" + } } } ], diff --git a/flows/11_function_subflow.json b/flows/11_function_subflow.json new file mode 100644 index 0000000..53d8e61 --- /dev/null +++ b/flows/11_function_subflow.json @@ -0,0 +1,74 @@ +{ + "name": "11_function_subflow", + "description": "This flow validates sub_flow execution by function identifier with settings and defaults", + "inputs": [ + { + "input": null, + "expected_result": [ + 3, + 4 + ] + } + ], + "flow": { + "startingNodeId": "1", + "nodeFunctions": [ + { + "definition_source": "taurus", + "databaseId": "1", + "runtimeFunctionId": "std::list::map", + "parameters": [ + { + "databaseId": "100", + "runtimeParameterId": "list", + "value": { + "literalValue": { + "listValue": { + "values": [ + { + "numberValue": { + "integer": "1" + } + }, + { + "numberValue": { + "integer": "2" + } + } + ] + } + } + } + }, + { + "databaseId": "101", + "runtimeParameterId": "transform", + "value": { + "sub_flow": { + "signature": "", + "settings": [ + { + "identifier": "lhs", + "optional": false, + "hidden": false + }, + { + "identifier": "rhs", + "defaultValue": { + "numberValue": { + "integer": "2" + } + }, + "optional": false, + "hidden": true + } + ], + "functionIdentifier": "std::number::add" + } + } + } + ] + } + ] + } +}