Skip to content
Documentation

Orchestrate runs with clarity.

The Workflow Engine stores definitions and runs in Redis, executes steps as a DAG, and emits an append-only timeline for every status transition. Each step dispatches a job via the bus and the Safety Kernel evaluates it before execution.

Capabilities
  • DAG execution with depends_on
  • 16 step types across 5 categories
  • for_each field for fan-out iteration
  • Retries with exponential backoff
  • Rerun from step & dry-run mode
  • Schema validation (input & output)
16 Step Types
Execution
  • llm

    Invokes an LLM provider for inference

  • worker

    Dispatches a job to the bus for external execution

  • http

    Makes an HTTP request to an external endpoint

  • container

    Runs a container image as a step

  • script

    Executes an inline script

Control Flow
  • condition

    Evaluates a boolean expression to branch execution

  • switch

    Multi-way branching based on expression matching

  • parallel

    Runs multiple sub-steps concurrently

  • loop

    Repeats a step until a condition is met

Gates
  • approval

    Pauses execution until a human approves or denies

  • input

    Pauses execution awaiting external input

  • delay

    Waits for a duration or until a timestamp

Data
  • transform

    Transforms data using expressions or mappings

  • storage

    Reads or writes data to a storage backend

  • notify

    Emits a SystemAlert event to the bus

Composition
  • subworkflow

    Invokes another workflow as a nested execution

for_each is a field on the Step struct, not a step type. Any step type can use for_each to enable fan-out iteration over an array.

DAG Execution

Steps form a directed acyclic graph via depends_on. Independent steps run in parallel. A step only executes when all dependencies have succeeded.

Multi-branch DAG workflow
id: deploy.pipeline
name: "Deploy Pipeline"
input_schema: DeployRequest
steps:
  lint:
    type: worker
    topic: job.ci.lint

  test:
    type: worker
    topic: job.ci.test

  security-scan:
    type: worker
    topic: job.ci.security

  # Runs after lint + test + security-scan all succeed
  build:
    type: worker
    topic: job.ci.build
    depends_on: [lint, test, security-scan]

  approve:
    type: approval
    reason: "Production deployment requires sign-off"
    depends_on: [build]

  deploy:
    type: worker
    topic: job.infra.deploy
    depends_on: [approve]
    retry:
      max_retries: 2
      initial_backoff_sec: 5

  notify:
    type: notify
    depends_on: [deploy]
    input:
      level: INFO
      message: "Deployment complete"
Parallel by default

Steps without depends_on are dispatched simultaneously in a single scheduling pass.

Failure blocks downstream

Failed, cancelled, or timed-out dependencies permanently block all downstream steps. No implicit continue-on-error.

All deps must succeed

A step waits for every listed dependency to reach succeeded before dispatch.

Execution Controls

depends_onStep IDs that must ALL succeed before this step runs
conditionExpression that must be truthy for this step to execute
for_eachField on any step — expression returning an array to fan out per element
max_parallelLimit on concurrent for_each children (0 = unlimited)

Workflow Definition

Workflows are stored in Redis as JSON. Each step supports expressions via ${...} for dynamic input, conditions, and for-each iteration.

Complete definition structure
id: incident.triage
name: "Incident Triage"
org_id: default
version: "1.0"
timeout_sec: 3600            # Overall workflow timeout
input_schema:                # JSON Schema for input validation
  type: object
  required: [alert_id, severity]
  properties:
    alert_id: { type: string }
    severity: { type: string, enum: [low, medium, high, critical] }

steps:
  enrich:
    type: worker
    topic: job.incident.enricher
    input:
      alert_id: "${input.alert_id}"
    output_path: enrichment    # Store output at ctx.enrichment
    output_schema:             # Validate step output
      type: object
      required: [summary]
    timeout_sec: 120
    retry:
      max_retries: 3
      initial_backoff_sec: 2
      max_backoff_sec: 30
      multiplier: 2

  check-severity:
    type: condition
    depends_on: [enrich]
    condition: "ctx.enrichment.severity == 'critical'"
    output_path: needs_approval

  approve:
    type: approval
    depends_on: [check-severity]
    condition: "ctx.needs_approval"
    reason: "Critical incident requires human sign-off"

  remediate:
    type: worker
    topic: job.incident.remediate
    depends_on: [approve]
    input:
      summary: "${ctx.enrichment.summary}"
    meta:
      risk_tags: [write, prod]
      capability: incident.remediate

Expression Language

Step inputs, conditions, and for_each fields support a simple expression language with dot-path resolution.

input.keyWorkflow input value
ctx.steps.step_id.outputPrevious step output
length(arr)Array/string/map length
itemCurrent for_each element
Operators: == != > < >= <= ! — Template strings: "Value: ${input.x}"

Schema Validation

Validate workflow input and step I/O with JSON Schema. Schemas can be defined inline or referenced from the schema registry.

Inline schema
steps:
  process:
    type: worker
    topic: job.data.process
    input_schema:           # Validate step input
      type: object
      required: [file_url]
      properties:
        file_url:
          type: string
          format: uri
    output_schema:          # Validate step output
      type: object
      required: [row_count]
      properties:
        row_count:
          type: integer
          minimum: 0
Registry reference
steps:
  transform:
    type: worker
    topic: job.etl.transform
    input_schema_id: etl/transform-input
    output_schema_id: etl/transform-output

# Register schemas with:
# cordumctl schema register \
#   --id etl/transform-input \
#   --file schema.json
  • Workflow input: Validated on run creation against the workflow input_schema
  • Step input: Validated before job dispatch against input_schema or input_schema_id
  • Step output: Validated after job result received. Failure marks the step as failed (no retry)
  • Pack-scoped: Schema IDs follow pack_id/schema_name convention

For-Each Fan-Out

Any step with a for_each expression fans out into parallel child jobs. Children are aggregated back: if any child fails, the parent step fails.

for_each workflow
id: batch.process
steps:
  scan:
    type: worker
    topic: job.scan.files
    output_path: file_list

  process:
    type: worker
    topic: job.transform
    depends_on: [scan]
    for_each: "ctx.file_list"     # Array from scan output
    max_parallel: 10              # Limit concurrent children
    input:
      file: "${item}"            # Current array element
      index: "${foreach_index}"  # 0-based index

  # Runs after ALL children complete
  aggregate:
    type: worker
    topic: job.aggregate
    depends_on: [process]
    input:
      count: "${length(ctx.file_list)}"

Fan-Out Mechanics

  • Children created with IDs: stepID[0], stepID[1], etc.
  • Each child receives item and foreach_index in scope
  • max_parallel throttles concurrent children per scheduling pass

Fan-In Aggregation

  • Parent completes when all children reach a terminal state
  • If any child fails, cancels, or times out — parent fails
  • All children must succeed for parent to succeed

Rerun & Dry-Run

Rerun a workflow from a specific step, preserving completed work. Or validate a run without side effects using dry-run mode.

Rerun from Step
cordumctl
# Rerun from a failed step
cordumctl run rerun <run_id> \
  --from deploy

# Output:
# new_run_id: run-abc-456
# rerun_of: run-abc-123
# rerun_step: deploy
# status: pending
# preserved: [lint, test, build]
Dry-Run Mode
cordumctl
# Validate without side effects
cordumctl run start deploy.pipeline \
  --input '{"env": "prod"}' \
  --dry-run

# Output:
# run_id: run-dry-789
# dry_run: true
# status: running
# Workers receive dry_run=true and
# should skip mutations.

Rerun Mechanics

  • Creates a new run linked via rerun_of
  • Original input is cloned exactly
  • Dependency outputs are copied into the new run context
  • All dependencies must have succeeded in the original run

Dry-Run Semantics

  • dry_run=true propagated to workers via env and labels
  • Workers should skip mutations (read-only operations)
  • Schema validation, policy checks, and dependencies still apply
  • Retries, failures, and timeouts behave normally

Run Timeline

Every run maintains an append-only timeline of events in Redis. Each event records a timestamp, step ID, status, and optional data payload.

Event Types

run_statusRun status transitioned
step_dispatchedJob published to bus
step_completedStep reached terminal state
step_waitingApproval step waiting
step_approvedApproval granted
step_rejectedApproval denied
step_condition_evaluatedCondition result recorded
step_delay_startedDelay timer started
step_event_emittedNotify alert sent
step_output_invalidSchema validation failed
cordumctl run timeline
cordumctl run get <run_id> --timeline

# TIME                  EVENT            STEP
# 10:00:01  run_status        -       running
# 10:00:01  step_dispatched   lint    running
# 10:00:01  step_dispatched   test    running
# 10:00:03  step_completed    lint    succeeded
# 10:00:05  step_completed    test    succeeded
# 10:00:05  step_dispatched   build   running
# 10:00:12  step_completed    build   succeeded
# 10:00:12  step_waiting      approve waiting
# 10:05:30  step_approved     approve succeeded
# 10:05:30  step_dispatched   deploy  running
# 10:05:45  step_completed    deploy  succeeded
# 10:05:45  run_status        -       succeeded
Timeline is stored at wf:run:timeline:<run_id> in Redis. Last 1000 events are retained per run.

Retry & Backoff

Steps can be configured with exponential backoff retries. The engine distinguishes between retryable and fatal failures.

Retry configuration
steps:
  deploy:
    type: worker
    topic: job.infra.deploy
    retry:
      max_retries: 3            # Up to 3 retry attempts
      initial_backoff_sec: 2    # First backoff: 2s
      max_backoff_sec: 60       # Cap at 60s
      multiplier: 2             # 2x exponential

# Backoff sequence:
# Attempt 1 → immediate
# Failure   → wait 2s
# Attempt 2 → after 2s
# Failure   → wait 4s
# Attempt 3 → after 4s
# Failure   → wait 8s
# Attempt 4 → after 8s (capped at 60s)
# Failure   → step marked FAILED

Failure Semantics

  • FAILED_RETRYABLE

    Transient failure. Step resets to pending with backoff delay. New job dispatched with incremented attempt counter.

  • FAILED_FATAL

    Permanent failure. Step marked failed immediately. Run transitions to failed. Saga rollback triggered for completed steps with compensation templates.

  • TIMEOUT

    Step or run timeout exceeded. Reconciler marks stale jobs. Not retried unless explicitly configured.

Job IDs include the attempt number: run_id:step_id@2 for the second attempt.

Cancel Propagation

Cancelling a run stops all executing steps and publishes cancellation signals for in-flight jobs.

Cancel a run
# Cancel via CLI
cordumctl run cancel <run_id>

# Cancel via API
curl -X POST \
  http://localhost:8081/api/v1/workflow-runs/<id>/cancel \
  -H "X-API-Key: <token>"

Propagation Flow

  • Run transitions to cancelled immediately
  • All non-terminal steps (pending, running, waiting) marked cancelled
  • For-each children recursively cancelled
  • JobCancel published to sys.job.cancel for each running job
  • No further steps are dispatched

Reconciliation

A reconciler runs every 5 seconds, scanning active runs for stale jobs and missed results. It uses distributed locking to avoid conflicts across engine replicas.

  • Stale detection: Queries scheduler for job state of running steps
  • Lost results: Synthesizes job results for jobs that completed but result message was lost
  • Retry triggers: Retriggers scheduling for steps past their backoff delay
  • Distributed lock: Only one engine instance reconciles at a time

Storage

Workflows, runs, and timelines are persisted in Redis with indexed lookups.

Redis key patterns
# Workflow definitions
wf:def:<workflow_id>           # Full definition (JSON)
wf:index:org:<org_id>          # Sorted set by timestamp

# Runs
wf:run:<run_id>                # Full run document (JSON)
wf:runs:<workflow_id>          # Sorted set of runs per workflow
wf:runs:status:<status>        # Sorted set by status
wf:runs:active:<org_id>        # Active (non-terminal) runs

# Timeline & idempotency
wf:run:timeline:<run_id>       # Append-only event list (max 1000)
wf:run:idempotency:<key>       # Maps key → run_id (SETNX)