Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ on:
jobs:
taurus:
runs-on: ubuntu-latest
services:
nats:
image: nats:2
ports:
- 4222:4222

steps:
- uses: actions/checkout@v6
Expand All @@ -15,8 +20,11 @@ jobs:
run: PATH=${{ runner.temp }}/proto/bin:$PATH cargo build
env:
RUST_BACKTRACE: 'full'
- name: Run Tests
run: cargo test
- name: Run Test Suite
- name: Run workspace tests
run: cargo test --workspace --all-targets
- name: Run NATS test execution request tests
run: cargo test -p taurus test_execution_request -- --ignored --nocapture
env:
NATS_URL: nats://127.0.0.1:4222
- name: Run tests package flow suite
run: cargo run --package tests

2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/taurus-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@

mod handler;
pub mod runtime;
pub mod time;
pub mod types;
pub mod value;
12 changes: 7 additions & 5 deletions crates/taurus-core/src/runtime/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ mod tests {
use crate::types::exit_reason::ExitReason;
use std::cell::RefCell;
use tucana::shared::{
InputType, ListValue, NodeParameter, NodeValue, ReferenceValue, Struct, Value, node_value,
reference_value, value::Kind,
InputType, ListValue, NodeParameter, NodeValue, ReferenceValue, Struct, SubFlow, Value,
node_value, reference_value, sub_flow::ExecutionReference, value::Kind,
};

fn literal_param(database_id: i64, runtime_parameter_id: &str, value: Value) -> NodeParameter {
Expand All @@ -190,9 +190,11 @@ mod tests {
database_id,
runtime_parameter_id: runtime_parameter_id.to_string(),
value: Some(NodeValue {
value: Some(node_value::Value::SubFlow(unimplemented!(
"Taurus needs to handle SubFlows (issue nr #184)"
))),
value: Some(node_value::Value::SubFlow(SubFlow {
signature: String::new(),
settings: Vec::new(),
execution_reference: Some(ExecutionReference::StartingNodeId(node_id)),
})),
}),
cast: None,
}
Expand Down
55 changes: 53 additions & 2 deletions crates/taurus-core/src/runtime/engine/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::collections::HashMap;

use tucana::shared::{NodeFunction, node_value};
use tucana::shared::{NodeFunction, node_value, sub_flow};

use crate::{
runtime::engine::model::{
Expand All @@ -27,6 +27,15 @@ pub enum CompileError {
node_id: i64,
parameter_index: usize,
},
SubFlowExecutionReferenceMissing {
node_id: i64,
parameter_index: usize,
},
SubFlowFunctionIdentifierUnsupported {
node_id: i64,
parameter_index: usize,
function_identifier: String,
},
}

impl CompileError {
Expand Down Expand Up @@ -64,6 +73,29 @@ impl CompileError {
node_id, parameter_index
),
),
CompileError::SubFlowExecutionReferenceMissing {
node_id,
parameter_index,
} => RuntimeError::new(
"T-CORE-000105",
"FlowCompileError",
format!(
"Node {} parameter {} sub_flow is missing execution reference",
node_id, parameter_index
),
),
CompileError::SubFlowFunctionIdentifierUnsupported {
node_id,
parameter_index,
function_identifier,
} => RuntimeError::new(
"T-CORE-000106",
"FlowCompileError",
format!(
"Node {} parameter {} uses unsupported sub_flow function identifier {}",
node_id, parameter_index, function_identifier
),
),
Comment thread
raphael-goetz marked this conversation as resolved.
}
}
}
Expand Down Expand Up @@ -125,7 +157,26 @@ pub fn compile_flow(
let arg = match value {
node_value::Value::LiteralValue(v) => CompiledArg::Literal(v.clone()),
node_value::Value::ReferenceValue(r) => CompiledArg::Reference(r.clone()),
node_value::Value::SubFlow(_sub_flow) => unimplemented!("Taurus needs to handle SubFlows (issue nr #184)"),
node_value::Value::SubFlow(sub_flow) => {
match sub_flow.execution_reference.as_ref() {
Some(sub_flow::ExecutionReference::StartingNodeId(node_id)) => {
CompiledArg::DeferredNode(*node_id)
}
Some(sub_flow::ExecutionReference::FunctionIdentifier(identifier)) => {
return Err(CompileError::SubFlowFunctionIdentifierUnsupported {
node_id: node.database_id,
parameter_index,
function_identifier: identifier.clone(),
});
}
None => {
return Err(CompileError::SubFlowExecutionReferenceMissing {
node_id: node.database_id,
parameter_index,
});
}
}
}
};

parameters.push(CompiledParameter {
Expand Down
82 changes: 47 additions & 35 deletions crates/taurus-core/src/runtime/engine/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use std::collections::HashMap;

use futures_lite::future::block_on;
use tucana::aquila::ActionExecutionRequest;
use tucana::shared::node_execution_result::Result as TucanaNodeResult;
use tucana::shared::reference_value::Target;
use tucana::shared::value::Kind;
use tucana::shared::{Struct, Value};
use tucana::shared::{NodeExecutionResult as TucanaNodeExecutionResult, Struct, Value};
use uuid::Uuid;

use crate::handler::argument::{Argument, ParameterNode};
Expand Down Expand Up @@ -56,7 +57,7 @@ struct ExecutionResult {
}

/// Result of executing exactly one compiled node.
struct NodeExecutionResult {
struct NodeResult {
signal: Signal,
frame_id: Option<u64>,
}
Expand Down Expand Up @@ -150,25 +151,24 @@ impl<'a> EngineExecutor<'a> {
}
}

fn execute_single_node(
&self,
node_idx: usize,
value_store: &mut ValueStore,
) -> NodeExecutionResult {
fn execute_single_node(&self, node_idx: usize, value_store: &mut ValueStore) -> NodeResult {
let node = &self.flow.nodes[node_idx];
// InputType references resolve against the currently running node.
value_store.set_current_node_id(node.id);

let frame_id = self.trace_enter(node, value_store);
let signal = match &node.execution_target {
NodeExecutionTarget::Local => self.execute_local_node(node, value_store, frame_id),
NodeExecutionTarget::Local => {
let signal = self.execute_local_node(node, value_store, frame_id);
self.commit_result(node.id, signal, value_store)
}
NodeExecutionTarget::Remote { service } => {
self.execute_remote_node(node, service, value_store, frame_id)
}
};
self.trace_exit(frame_id, &signal, value_store);

NodeExecutionResult { signal, frame_id }
NodeResult { signal, frame_id }
}

fn execute_local_node(
Expand All @@ -190,14 +190,11 @@ impl<'a> EngineExecutor<'a> {

let mut args = match self.build_args(node, value_store, frame_id) {
Ok(args) => args,
Err(err) => {
value_store.insert_error(node.id, err.clone());
return Signal::Failure(err);
}
Err(err) => return Signal::Failure(err),
};

if let Some(signal) = self.force_eager_args(entry, &mut args, value_store, frame_id) {
return self.commit_result(node.id, signal, value_store);
return signal;
}

// Handler-owned runtime calls (for lazy args / callbacks) re-enter the same executor.
Expand All @@ -211,8 +208,7 @@ impl<'a> EngineExecutor<'a> {
child_result.signal
};

let signal = (entry.handler)(&args, value_store, &mut run);
self.commit_result(node.id, signal, value_store)
(entry.handler)(&args, value_store, &mut run)
}

fn execute_remote_node(
Expand All @@ -225,20 +221,21 @@ impl<'a> EngineExecutor<'a> {
let remote_runtime = match self.remote {
Some(remote) => remote,
None => {
return Signal::Failure(RuntimeError::new(
"T-CORE-000003",
"RemoteRuntimeNotConfigured",
"Remote runtime not configured",
));
return self.commit_result(
node.id,
Signal::Failure(RuntimeError::new(
"T-CORE-000003",
"RemoteRuntimeNotConfigured",
"Remote runtime not configured",
)),
value_store,
);
}
};

let mut args = match self.build_args(node, value_store, frame_id) {
Ok(args) => args,
Err(err) => {
value_store.insert_error(node.id, err.clone());
return Signal::Failure(err);
}
Err(err) => return self.commit_result(node.id, Signal::Failure(err), value_store),
};

let values = match self.resolve_remote_args(&mut args, value_store, frame_id) {
Expand All @@ -248,21 +245,16 @@ impl<'a> EngineExecutor<'a> {

let request = match self.build_remote_request(node, values) {
Ok(request) => request,
Err(err) => {
value_store.insert_error(node.id, err.clone());
return Signal::Failure(err);
}
Err(err) => return self.commit_result(node.id, Signal::Failure(err), value_store),
};

let signal = match block_on(remote_runtime.execute_remote(RemoteExecution {
match block_on(remote_runtime.execute_remote(RemoteExecution {
target_service: service.to_string(),
request,
})) {
Ok(value) => Signal::Success(value),
Err(err) => Signal::Failure(err),
};

self.commit_result(node.id, signal, value_store)
Ok(result) => self.commit_remote_result(node.id, result, value_store),
Err(err) => self.commit_result(node.id, Signal::Failure(err), value_store),
}
}

fn build_args(
Expand Down Expand Up @@ -522,6 +514,26 @@ impl<'a> EngineExecutor<'a> {
}
}

fn commit_remote_result(
&self,
node_id: i64,
result: TucanaNodeExecutionResult,
value_store: &mut ValueStore,
) -> Signal {
value_store.insert_node_result(node_id, result.clone());
match result.result {
Some(TucanaNodeResult::Success(value)) => Signal::Success(value),
Some(TucanaNodeResult::Error(error)) => {
Signal::Failure(RuntimeError::from_tucana_error(&error))
}
None => Signal::Failure(RuntimeError::new(
"T-CORE-000006",
"NodeExecutionResultMissingOutcome",
"Remote node execution result is missing success/error outcome",
)),
Comment thread
raphael-goetz marked this conversation as resolved.
}
}

fn trace_enter(&self, node: &CompiledNode, value_store: &ValueStore) -> Option<u64> {
self.tracer.map(|tracer| {
tracer.borrow_mut().enter_node(
Expand Down
15 changes: 10 additions & 5 deletions crates/taurus-core/src/runtime/execution/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

use std::collections::HashMap;

use tucana::shared::node_execution_result::Result as TucanaNodeResult;

use crate::runtime::execution::trace::{
ArgKind, EdgeKind, Outcome, StoreDiff, StoreResultStatus, TraceFrame, TraceRun,
ArgKind, EdgeKind, Outcome, StoreDiff, TraceFrame, TraceRun,
};

struct TraceTheme;
Expand Down Expand Up @@ -266,14 +268,17 @@ fn render_store_diff(
}

for set in &diff.result_sets {
let status = match set.status {
StoreResultStatus::Success => "success",
StoreResultStatus::Error => "error",
let status = match &set.result.result {
Some(TucanaNodeResult::Success(_)) => "success",
Some(TucanaNodeResult::Error(_)) => "error",
None => "empty",
};
out.push_str(&format!(
"{step:04} {prefix}{continuation}{store:<5} result.set node={} [{}] {}\n",
"{step:04} {prefix}{continuation}{store:<5} result.set node={} [{}] started_at={} finished_at={} {}\n",
set.node_id,
status,
set.result.started_at,
set.result.finished_at,
set.preview,
step = *step,
prefix = prefix,
Expand Down
12 changes: 2 additions & 10 deletions crates/taurus-core/src/runtime/execution/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,10 @@

use std::collections::HashMap;

use tucana::shared::Value;
use tucana::shared::{NodeExecutionResult, Value};

use crate::types::errors::runtime_error::RuntimeError;
use crate::types::execution::ids::{FrameId, NodeId};

/// Runtime outcome persisted per node.
#[derive(Debug, Clone)]
pub enum NodeOutcome {
Success(Value),
Failure(RuntimeError),
}

/// Input slot key for runtime-provided temporary inputs (for iterators/predicates).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct InputSlotKey {
Expand All @@ -25,7 +17,7 @@ pub struct InputSlotKey {
/// Store that captures mutable runtime execution state.
#[derive(Debug, Clone, Default)]
pub struct ExecutionStore {
pub node_outcomes: HashMap<NodeId, NodeOutcome>,
pub node_results: HashMap<NodeId, NodeExecutionResult>,
pub input_slots: HashMap<InputSlotKey, Value>,
pub flow_input: Option<Value>,
pub current_node: Option<NodeId>,
Expand Down
Loading