Workflows are Python files that orchestrate multi-step tasks using a built-in async DSL. The workflow tool runs them as background tasks and delivers results back to the calling session.
operator_use/workflow/
types.py ← WorkflowMeta, WorkflowRunRecord, WorkflowStatus, WorkflowContext,
WorkflowInvocation, WorkflowJournal, Workflow (ABC)
manager.py ← WorkflowManager — discovery, invocation, status tracking
load.py ← WorkflowLoader — file discovery, meta parsing
execute.py ← DSL globals injection
context.py ← WorkflowExecuteContext, Budget, phase context manager
A workflow is a .py file with a top-level meta dict and an async def run() entry point:
# ~/.operator/profiles/<name>/workflows/research.py
meta = {
"name": "research",
"description": "Research a topic and produce a structured report.",
"when_to_use": "User asks for research or background on a subject.",
"phases": [
{"name": "gather", "description": "Find sources"},
{"name": "write", "description": "Write report"},
],
}
async def run():
topic = args.get("topic", "AI safety")
async with phase("gather"):
log(f"Gathering sources on: {topic}")
sources = await agent(f"Find 5 authoritative sources on {topic}.")
async with phase("write"):
log("Writing report…")
report = await agent(
f"Write a structured markdown report on {topic} using:\n{sources}"
)
return reportThese names are injected at runtime — do not import them:
| Global | Signature | Description |
|---|---|---|
agent |
await agent(prompt, schema=None, system=None, tools=None, resume=False, stall_ms=180000, max_retries=5, model=None, provider=None) |
Run an LLM agent call. Returns str (no schema) or a Pydantic model instance. A call that doesn't finish within stall_ms is cancelled and retried up to max_retries times, then raises TimeoutError. model/provider override the session default for this single call. |
classify |
await classify(prompt, *, options=None, schema=None, system=None, model=None, provider=None) |
Single direct LLM call — no subagent loop, no tool execution. Use options=[...] for string enum classification (returns str); use schema=MyModel for structured output (returns model instance). Cheaper than agent() for routing/labelling tasks. |
parallel |
await parallel(*thunks, concurrency=5, return_exceptions=False) |
Run zero-argument async callables concurrently; returns list of results. Default fail-fast (cancels siblings, re-raises); with return_exceptions=True it never rejects and each failed slot holds its exception. |
pipeline |
await pipeline(items, *stages, concurrency=5) |
Process items through a list of sync or async transform functions. |
workflow |
await workflow(name, args=None) |
Run another workflow inline and return its result. One level deep only; shares the caller's run record (unified log + shared agent-call cap). |
phase |
async with phase("name"): |
Label the current phase in the run status. |
log |
log("message") |
Append a timestamped line to the run log. |
budget |
budget.remaining() / budget.spent() / budget.exhausted() / budget.tokens_spent() |
Soft, advisory call-count budget for loop guards (not enforced — see Limits). tokens_spent() returns {input, output, cache_read, cache_write, total} accumulated from all structured agent() and classify() calls. |
args |
dict |
Key-value arguments passed at invocation. |
When schema is a Pydantic model class, agent() instructs the LLM to respond in that shape:
from pydantic import BaseModel
class Summary(BaseModel):
title: str
bullet_points: list[str]
result: Summary = await agent("Summarise this document.", schema=Summary)
print(result.title)# ~/.operator/profiles/researcher/workflows/deep_research.py
from pydantic import BaseModel
class ResearchResult(BaseModel):
topic: str
sources: list[str]
summary: str
questions: list[str]
meta = {
"name": "deep_research",
"description": "Comprehensive research on a topic.",
}
async def run():
topic = args.get("topic", "quantum computing")
async with phase("search"):
log(f"Searching for sources on: {topic}")
sources = await agent(
f"Find 5 authoritative sources on {topic}. "
f"Return them as a numbered list with URLs."
)
async with phase("summarize"):
log("Summarizing each source...")
summary_parts = await parallel(
lambda: agent(f"Summarize: {s[:200]}", schema=str)
for s in sources.split("\n")[:5]
)
summary = "\n".join(summary_parts)
async with phase("analyze"):
log("Analyzing findings...")
result: ResearchResult = await agent(
f"Analyze this research:\n{summary}\n"
f"Return topic, 2-3 open questions to explore further.",
schema=ResearchResult
)
return resultInvocation:
{ "action": "run", "name": "deep_research", "args": {"topic": "AI safety"} }Output:
topic: "AI safety"
sources: ["arxiv.org/...", "openai.com/...", ...]
summary: "Key findings discuss alignment..."
questions: ["How can we measure alignment?", "What are failure modes?"]
# ~/.operator/profiles/processor/workflows/batch_process.py
meta = {
"name": "batch_process",
"description": "Process multiple items in parallel.",
}
async def process_item(item):
"""Process a single item."""
return await agent(f"Analyze this: {item}")
async def run():
items = args.get("items", [])
async with phase("validate"):
log(f"Validating {len(items)} items...")
if not items:
return {"error": "No items provided"}
async with phase("process"):
log(f"Processing {len(items)} items in parallel...")
results = await parallel(
lambda i=item: process_item(i)
for item in items,
concurrency=3 # Max 3 parallel calls
)
async with phase("aggregate"):
log("Aggregating results...")
summary = await agent(
f"Summarize these results:\n" +
"\n".join(f"- {r}" for r in results)
)
return {
"item_count": len(items),
"results": results,
"summary": summary
}Invocation:
{
"action": "run",
"name": "batch_process",
"args": {
"items": ["Document 1", "Document 2", "Document 3"]
}
}# ~/.operator/profiles/processor/workflows/document_pipeline.py
meta = {
"name": "doc_pipeline",
"description": "Process documents through extraction → analysis → reporting.",
}
async def run():
files = args.get("files", [])
async with phase("extract"):
log(f"Extracting data from {len(files)} files...")
extracted = await pipeline(
files,
lambda f: agent(f"Extract key information from: {f}"),
lambda e: agent(f"Validate extracted data: {e}"),
)
async with phase("analyze"):
log("Analyzing extracted data...")
analysis = await agent(
f"Analyze these items:\n" +
"\n".join(extracted)
)
async with phase("report"):
log("Generating report...")
report = await agent(
f"Write a markdown report based on:\n{analysis}"
)
return {
"extracted_count": len(extracted),
"analysis": analysis,
"report": report
}# ~/.operator/profiles/researcher/workflows/careful_research.py
meta = {
"name": "careful_research",
"description": "Research with budget constraints.",
}
async def run():
max_calls = args.get("max_calls", 10)
async with phase("research"):
sources = []
attempt = 0
while attempt < max_calls and not budget.exhausted():
log(f"Call {attempt + 1}/{max_calls}")
source = await agent(
f"Find one unique source not in: {sources}"
)
sources.append(source)
attempt += 1
log(f"Completed {attempt} calls, "
f"remaining budget: {budget.remaining()}")
return {
"sources": sources,
"calls_used": attempt,
"budget_remaining": budget.remaining()
}Check running workflows:
{ "action": "list" } # List all workflows
{ "action": "status", "run_id": "..." } # Check progress
{ "action": "cancel", "run_id": "..." } # Cancel a runView run logs:
~/.operator/profiles/<name>/workflows/runs/<run_id>/log.txt
A workflow can run another inline via workflow() and use its result:
async def run():
cleaned = await workflow("normalize", {"text": args["text"]})
return await agent(f"Summarise:\n{cleaned}")Nesting is one level deep only — a nested workflow that calls workflow() again raises RuntimeError. The nested run shares the caller's run record, so its log lines and agent() calls are unified with the parent (and count against the same cap). Both file-based and class-based workflows can be invoked.
| Limit | Default | Behavior |
|---|---|---|
max_agent_calls |
1000 |
Hard runaway-loop guard. Once a run (including its nested workflows) has made this many agent() calls, the next one raises WorkflowAgentCapError. Override via args["max_agent_calls"]. |
stall_ms (per agent()) |
180000 |
A call that doesn't finish in this window is cancelled and retried. |
max_retries (per agent()) |
5 |
Stall retries before agent() raises TimeoutError. |
| run wall-clock | 1800s |
Whole-run timeout enforced by WorkflowManager. |
budget is advisory only — it is incremented per agent() call for use in loop conditions (while budget.remaining() > 10: …) but never auto-enforces. The hard stop is max_agent_calls.
# run a workflow in the background
{ "action": "run", "name": "research", "args": { "topic": "LLM context windows" } }
# → returns run_id immediately
# generate a new workflow from a description
{ "action": "create", "name": "summarize",
"description": "Summarize a set of documents and produce key takeaways." }
# generate a workflow that does NOT inject its result back into the session
{ "action": "create", "name": "export-log",
"description": "Export session log to file.", "deliver": false }
# list available workflows
{ "action": "discover" }
# list active and recent runs
{ "action": "list" }
# check run status
{ "action": "status", "run_id": "<id>" }
# cancel a running workflow
{ "action": "cancel", "run_id": "<id>" }
# permanently delete a workflow file
{ "action": "delete", "name": "old-workflow" }Workflow files are stored in the active profile's workflows/ directory (e.g. ~/.operator/profiles/<name>/workflows/). When no profile is active, they fall back to a temporary directory.
@dataclass
class WorkflowRunRecord:
run_id: str
workflow_name: str
status: WorkflowStatus # running | completed | failed | cancelled
started_at: datetime
finished_at: datetime | None
result: Any # return value of run()
error: str | None
log_lines: list[str]
agent_calls: int
current_phase: str | None
channel: str | None # reply channel (same pattern as SubagentManager)
chat_id: str | None
deliver: bool = True # inject result back into the calling session when doneWorkflowMeta.deliver (default True) controls whether the workflow result is injected back into the calling session when the run completes. Set to False in meta for fire-and-forget workflows that write to files or external systems and don't need to report back:
meta = {
"name": "export-log",
"description": "Export session log to a file.",
"deliver": False,
}When creating with the workflow tool, pass "deliver": false in the schema to embed this in the generated file.
{ "action": "create" } uses the LLM to write a new workflow file from a description, then saves it to the profile's workflows/ directory. The generated file follows all DSL rules and is immediately available to run.
For programmatic workflows (not user-authored files), subclass Workflow from operator_use.workflow.types:
from operator_use.workflow.types import Workflow, WorkflowContext, WorkflowInvocation
class MyWorkflow(Workflow):
name = 'my-workflow'
description = 'Does something useful.'
when_to_use = 'User asks for something useful.'
phases = [
{'name': 'step1', 'description': 'First step'},
{'name': 'step2', 'description': 'Second step'},
]
async def execute(self, invocation: WorkflowInvocation, workflow_context: WorkflowContext) -> str:
ctx = await self.build_context(invocation, workflow_context)
async with ctx.phase('step1'):
result = await ctx.agent('Do step 1.')
async with ctx.phase('step2'):
final = await ctx.agent(f'Do step 2 given: {result}')
return finalbuild_context() constructs a WorkflowExecuteContext (from context.py) with all DSL globals (agent, phase, log, args, etc.) plus a WorkflowRunRecord and WorkflowJournal. The underlying Subagent is filtered to exclude subagent and workflow tools to prevent recursive nesting.
WorkflowContext (the lightweight dependency carrier):
@dataclass
class WorkflowContext:
llm: Any
tools: listWorkflowInvocation (analogous to ToolInvocation):
@dataclass
class WorkflowInvocation:
workflow_name: str
args: dict[str, Any] # keyword args for the workflow
run_id: str # auto-generated if not providedWorkflowJournal (write-through SHA-256-keyed cache, moved from journal.py into types.py):
journal.get(prompt, opts) # None if not cached
journal.set(prompt, opts, result) # write to memory and diskWorkflowLoader scans the configured workflows_dir for .py files. A file is a valid workflow if it contains a top-level meta dict with at least name and description, and an async def run() function.
Workflow search paths (highest priority last):
- Builtins (
operator_use/builtins/workflows/if present) - Profile:
~/.operator/profiles/<name>/workflows/
Each workflow run gets its own WorkflowExecuteContext. The agent() call runs a Subagent instance with the same tools as the parent agent (minus subagent and workflow to prevent recursive nesting). Per-run state (log, journal) is stored under <tmpdir>/.operator-workflow-runs/<run_id>/. Results are delivered back via the message bus (unless deliver=False).
A structured workflow block sets per-run knob defaults (global and per-profile, via the settings merge). Explicit invocation args and per-call agent(...) params still override these.
{
"workflow": {
"enabled": true,
"max_agent_calls": 1000,
"budget": 100,
"concurrency": 5,
"stall_ms": 180000,
"max_retries": 5
}
}When enabled is false, the workflow tool is hidden from the LLM. workflow.enabled supersedes the legacy flat workflows_enabled flag (still read as a fallback when no workflow block is present). Toggle on/off via control_center:
{ "action": "set", "key": "workflows_enabled", "value": true }Precedence for the knobs: per-call agent()/parallel() argument → invocation args → workflow settings block → built-in default.
- docs/profiles.md — Profile workflow directory
- docs/team.md — Teams as an alternative coordination primitive
- docs/acp.md — ACP agents as external workflow executors