Orchestrate runs with clarity.
The Workflow Engine stores definitions and runs in Redis, executes steps as a DAG, and emits an append-only timeline for every status transition. Each step dispatches a job via the bus and the Safety Kernel evaluates it before execution.
- DAG execution with depends_on
- 16 step types across 5 categories
- for_each field for fan-out iteration
- Retries with exponential backoff
- Rerun from step & dry-run mode
- Schema validation (input & output)
llmInvokes an LLM provider for inference
workerDispatches a job to the bus for external execution
httpMakes an HTTP request to an external endpoint
containerRuns a container image as a step
scriptExecutes an inline script
conditionEvaluates a boolean expression to branch execution
switchMulti-way branching based on expression matching
parallelRuns multiple sub-steps concurrently
loopRepeats a step until a condition is met
approvalPauses execution until a human approves or denies
inputPauses execution awaiting external input
delayWaits for a duration or until a timestamp
transformTransforms data using expressions or mappings
storageReads or writes data to a storage backend
notifyEmits a SystemAlert event to the bus
subworkflowInvokes another workflow as a nested execution
for_each is a field on the Step struct, not a step type. Any step type can use for_each to enable fan-out iteration over an array.DAG Execution
Steps form a directed acyclic graph via depends_on. Independent steps run in parallel. A step only executes when all dependencies have succeeded.
id: deploy.pipeline
name: "Deploy Pipeline"
input_schema: DeployRequest
steps:
lint:
type: worker
topic: job.ci.lint
test:
type: worker
topic: job.ci.test
security-scan:
type: worker
topic: job.ci.security
# Runs after lint + test + security-scan all succeed
build:
type: worker
topic: job.ci.build
depends_on: [lint, test, security-scan]
approve:
type: approval
reason: "Production deployment requires sign-off"
depends_on: [build]
deploy:
type: worker
topic: job.infra.deploy
depends_on: [approve]
retry:
max_retries: 2
initial_backoff_sec: 5
notify:
type: notify
depends_on: [deploy]
input:
level: INFO
message: "Deployment complete"Steps without depends_on are dispatched simultaneously in a single scheduling pass.
Failed, cancelled, or timed-out dependencies permanently block all downstream steps. No implicit continue-on-error.
A step waits for every listed dependency to reach succeeded before dispatch.
Execution Controls
depends_onStep IDs that must ALL succeed before this step runsconditionExpression that must be truthy for this step to executefor_eachField on any step — expression returning an array to fan out per elementmax_parallelLimit on concurrent for_each children (0 = unlimited)Workflow Definition
Workflows are stored in Redis as JSON. Each step supports expressions via ${...} for dynamic input, conditions, and for-each iteration.
id: incident.triage
name: "Incident Triage"
org_id: default
version: "1.0"
timeout_sec: 3600 # Overall workflow timeout
input_schema: # JSON Schema for input validation
type: object
required: [alert_id, severity]
properties:
alert_id: { type: string }
severity: { type: string, enum: [low, medium, high, critical] }
steps:
enrich:
type: worker
topic: job.incident.enricher
input:
alert_id: "${input.alert_id}"
output_path: enrichment # Store output at ctx.enrichment
output_schema: # Validate step output
type: object
required: [summary]
timeout_sec: 120
retry:
max_retries: 3
initial_backoff_sec: 2
max_backoff_sec: 30
multiplier: 2
check-severity:
type: condition
depends_on: [enrich]
condition: "ctx.enrichment.severity == 'critical'"
output_path: needs_approval
approve:
type: approval
depends_on: [check-severity]
condition: "ctx.needs_approval"
reason: "Critical incident requires human sign-off"
remediate:
type: worker
topic: job.incident.remediate
depends_on: [approve]
input:
summary: "${ctx.enrichment.summary}"
meta:
risk_tags: [write, prod]
capability: incident.remediateExpression Language
Step inputs, conditions, and for_each fields support a simple expression language with dot-path resolution.
input.keyWorkflow input valuectx.steps.step_id.outputPrevious step outputlength(arr)Array/string/map lengthitemCurrent for_each element== != > < >= <= ! — Template strings: "Value: ${input.x}"Schema Validation
Validate workflow input and step I/O with JSON Schema. Schemas can be defined inline or referenced from the schema registry.
steps:
process:
type: worker
topic: job.data.process
input_schema: # Validate step input
type: object
required: [file_url]
properties:
file_url:
type: string
format: uri
output_schema: # Validate step output
type: object
required: [row_count]
properties:
row_count:
type: integer
minimum: 0steps:
transform:
type: worker
topic: job.etl.transform
input_schema_id: etl/transform-input
output_schema_id: etl/transform-output
# Register schemas with:
# cordumctl schema register \
# --id etl/transform-input \
# --file schema.json- Workflow input: Validated on run creation against the workflow
input_schema - Step input: Validated before job dispatch against
input_schemaorinput_schema_id - Step output: Validated after job result received. Failure marks the step as failed (no retry)
- Pack-scoped: Schema IDs follow
pack_id/schema_nameconvention
For-Each Fan-Out
Any step with a for_each expression fans out into parallel child jobs. Children are aggregated back: if any child fails, the parent step fails.
id: batch.process
steps:
scan:
type: worker
topic: job.scan.files
output_path: file_list
process:
type: worker
topic: job.transform
depends_on: [scan]
for_each: "ctx.file_list" # Array from scan output
max_parallel: 10 # Limit concurrent children
input:
file: "${item}" # Current array element
index: "${foreach_index}" # 0-based index
# Runs after ALL children complete
aggregate:
type: worker
topic: job.aggregate
depends_on: [process]
input:
count: "${length(ctx.file_list)}"Fan-Out Mechanics
- Children created with IDs:
stepID[0],stepID[1], etc. - Each child receives
itemandforeach_indexin scope max_parallelthrottles concurrent children per scheduling pass
Fan-In Aggregation
- Parent completes when all children reach a terminal state
- If any child fails, cancels, or times out — parent fails
- All children must succeed for parent to succeed
Rerun & Dry-Run
Rerun a workflow from a specific step, preserving completed work. Or validate a run without side effects using dry-run mode.
# Rerun from a failed step cordumctl run rerun <run_id> \ --from deploy # Output: # new_run_id: run-abc-456 # rerun_of: run-abc-123 # rerun_step: deploy # status: pending # preserved: [lint, test, build]
# Validate without side effects
cordumctl run start deploy.pipeline \
--input '{"env": "prod"}' \
--dry-run
# Output:
# run_id: run-dry-789
# dry_run: true
# status: running
# Workers receive dry_run=true and
# should skip mutations.Rerun Mechanics
- Creates a new run linked via
rerun_of - Original input is cloned exactly
- Dependency outputs are copied into the new run context
- All dependencies must have succeeded in the original run
Dry-Run Semantics
dry_run=truepropagated to workers via env and labels- Workers should skip mutations (read-only operations)
- Schema validation, policy checks, and dependencies still apply
- Retries, failures, and timeouts behave normally
Run Timeline
Every run maintains an append-only timeline of events in Redis. Each event records a timestamp, step ID, status, and optional data payload.
Event Types
run_statusRun status transitionedstep_dispatchedJob published to busstep_completedStep reached terminal statestep_waitingApproval step waitingstep_approvedApproval grantedstep_rejectedApproval deniedstep_condition_evaluatedCondition result recordedstep_delay_startedDelay timer startedstep_event_emittedNotify alert sentstep_output_invalidSchema validation failedcordumctl run get <run_id> --timeline # TIME EVENT STEP # 10:00:01 run_status - running # 10:00:01 step_dispatched lint running # 10:00:01 step_dispatched test running # 10:00:03 step_completed lint succeeded # 10:00:05 step_completed test succeeded # 10:00:05 step_dispatched build running # 10:00:12 step_completed build succeeded # 10:00:12 step_waiting approve waiting # 10:05:30 step_approved approve succeeded # 10:05:30 step_dispatched deploy running # 10:05:45 step_completed deploy succeeded # 10:05:45 run_status - succeeded
wf:run:timeline:<run_id> in Redis. Last 1000 events are retained per run.Retry & Backoff
Steps can be configured with exponential backoff retries. The engine distinguishes between retryable and fatal failures.
steps:
deploy:
type: worker
topic: job.infra.deploy
retry:
max_retries: 3 # Up to 3 retry attempts
initial_backoff_sec: 2 # First backoff: 2s
max_backoff_sec: 60 # Cap at 60s
multiplier: 2 # 2x exponential
# Backoff sequence:
# Attempt 1 → immediate
# Failure → wait 2s
# Attempt 2 → after 2s
# Failure → wait 4s
# Attempt 3 → after 4s
# Failure → wait 8s
# Attempt 4 → after 8s (capped at 60s)
# Failure → step marked FAILEDFailure Semantics
- FAILED_RETRYABLE
Transient failure. Step resets to pending with backoff delay. New job dispatched with incremented attempt counter.
- FAILED_FATAL
Permanent failure. Step marked failed immediately. Run transitions to failed. Saga rollback triggered for completed steps with compensation templates.
- TIMEOUT
Step or run timeout exceeded. Reconciler marks stale jobs. Not retried unless explicitly configured.
run_id:step_id@2 for the second attempt.Cancel Propagation
Cancelling a run stops all executing steps and publishes cancellation signals for in-flight jobs.
# Cancel via CLI cordumctl run cancel <run_id> # Cancel via API curl -X POST \ http://localhost:8081/api/v1/workflow-runs/<id>/cancel \ -H "X-API-Key: <token>"
Propagation Flow
- Run transitions to
cancelledimmediately - All non-terminal steps (pending, running, waiting) marked cancelled
- For-each children recursively cancelled
JobCancelpublished tosys.job.cancelfor each running job- No further steps are dispatched
Reconciliation
A reconciler runs every 5 seconds, scanning active runs for stale jobs and missed results. It uses distributed locking to avoid conflicts across engine replicas.
- Stale detection: Queries scheduler for job state of running steps
- Lost results: Synthesizes job results for jobs that completed but result message was lost
- Retry triggers: Retriggers scheduling for steps past their backoff delay
- Distributed lock: Only one engine instance reconciles at a time
Storage
Workflows, runs, and timelines are persisted in Redis with indexed lookups.
# Workflow definitions wf:def:<workflow_id> # Full definition (JSON) wf:index:org:<org_id> # Sorted set by timestamp # Runs wf:run:<run_id> # Full run document (JSON) wf:runs:<workflow_id> # Sorted set of runs per workflow wf:runs:status:<status> # Sorted set by status wf:runs:active:<org_id> # Active (non-terminal) runs # Timeline & idempotency wf:run:timeline:<run_id> # Append-only event list (max 1000) wf:run:idempotency:<key> # Maps key → run_id (SETNX)
