Skip to content
Documentation

Worker Development Guide

Workers are external processes that subscribe to job topics on the NATS bus, execute domain logic, and publish results. Build workers in Go, Python, or Node.js using the CAP SDK.

What is a Worker?

A worker is any process that speaks the CAP v2 protocol. Workers connect to NATS, subscribe to job topics, receive BusPacket{JobRequest} messages, and publish BusPacket{JobResult} back. The Cordum control plane handles scheduling, safety evaluation, and lifecycle managementβ€”workers only need to execute domain logic.

Fetch & Execute

Load context from Redis, run logic, write results back

Heartbeat

Periodic health signals with capacity and resource metrics

Topic Subscription

Pool subjects (job.*) or direct delivery (worker.<id>.jobs)

Safety Evaluated

Every job is checked by the Safety Kernel before dispatch

Worker Lifecycle

1

Connect to NATS

Establish a connection to the NATS bus using the configured URL

2

Subscribe to topics

Subscribe to job topics (job.<pack>.<task>) or direct subject (worker.<id>.jobs)

3

Send initial heartbeat

Announce presence to the scheduler with capabilities, pool, and capacity

4

Receive JobRequest

Scheduler dispatches a BusPacket{JobRequest} with context_ptr and metadata

5

Fetch context

Load job input from Redis via the context_ptr (e.g. redis://ctx:<job_id>)

6

Execute work

Run your domain logic β€” call APIs, query databases, invoke LLMs, etc.

7

Write result

Store output JSON in Redis via the result_ptr (e.g. redis://res:<job_id>)

8

Publish JobResult

Send BusPacket{JobResult} to sys.job.result with status and execution time

9

Continue heartbeating

Keep sending heartbeats so the scheduler knows the worker is healthy

Bus Subjects

Workers interact with the NATS bus through these subjects. With JetStream enabled, durable subjects provide at-least-once delivery. Plain NATS subjects (heartbeats, cancellation) use at-most-once.

SubjectDescription
sys.job.submitInbound jobs to the scheduler
sys.job.resultJob completions from workers
sys.heartbeatWorker heartbeats (fan-out, no queue group)
sys.job.cancelCancellation notifications
sys.job.progressOptional progress updates during execution
job.*Worker pool subjects (e.g. job.default, job.batch)
worker.<id>.jobsDirect, worker-targeted delivery

Go Worker Example

The Go SDK provides a typed runtime.Agent with automatic protobuf marshalling, context hydration, and result publishing. Register handlers for specific topics and the SDK handles the rest.

examples/hello-worker-go/main.go
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"

    "github.com/cordum/cordum/sdk/runtime"
    "github.com/nats-io/nats.go"
)

type echoInput struct {
    Message string `json:"message"`
    Author  string `json:"author,omitempty"`
}

type echoOutput struct {
    Message string `json:"message"`
    Author  string `json:"author,omitempty"`
}

func main() {
    ctx, cancel := signal.NotifyContext(
        context.Background(), os.Interrupt, syscall.SIGTERM,
    )
    defer cancel()

    workerID := envOr("WORKER_ID", "hello-worker")
    natsURL  := envOr("NATS_URL", "nats://127.0.0.1:4222")

    agent := &runtime.Agent{
        NATSURL:  natsURL,
        RedisURL: envOr("REDIS_URL", "redis://127.0.0.1:6379/0"),
        SenderID: workerID,
    }

    handler := func(ctx runtime.Context, input echoInput) (echoOutput, error) {
        message := strings.TrimSpace(input.Message)
        if message == "" {
            message = "hello from worker"
        }
        return echoOutput{Message: message, Author: input.Author}, nil
    }

    // Register handler for pool topic + direct subject
    runtime.Register(agent, "job.hello-pack.echo", handler)
    runtime.Register(agent, runtime.DirectSubject(workerID), handler)

    if err := agent.Start(); err != nil {
        log.Fatalf("runtime start: %v", err)
    }
    defer agent.Close()

    // Separate NATS connection for heartbeats
    nc, err := nats.Connect(natsURL,
        nats.Name(workerID), nats.Timeout(5*time.Second))
    if err != nil {
        log.Fatalf("nats connect: %v", err)
    }
    defer nc.Drain()

    heartbeatFn := func() ([]byte, error) {
        return runtime.HeartbeatPayload(
            workerID, "hello-pack", 0, 4, 0,
        ) // workerID, pool, activeJobs, maxParallel, cpuLoad
    }

    if payload, err := heartbeatFn(); err == nil {
        _ = runtime.EmitHeartbeat(nc, payload)
    }
    go runtime.HeartbeatLoop(ctx, nc, heartbeatFn)

    log.Printf("worker ready (id=%s)", workerID)
    <-ctx.Done()
}

Typed Handlers

Define Go structs for input/output β€” the SDK handles JSON marshalling and Redis pointer hydration automatically

Topic Registration

runtime.Register() binds a handler to a NATS subject. Register the same handler for pool + direct subjects

Graceful Shutdown

signal.NotifyContext traps SIGINT/SIGTERM. agent.Close() drains the NATS connection and finishes in-flight jobs

Python Worker Example

The Python SDK uses a decorator pattern for handler registration with async support. The JobContext object provides access to job input, metadata, and result publishing.

examples/python-worker/worker.py
from cordum import Worker, JobContext

worker = Worker(
    pool="hello-pack",
    subjects=["job.hello-pack.echo"],
    capabilities=["hello-pack.echo"],
)

@worker.handler("job.hello-pack.echo")
async def handle_echo(ctx: JobContext):
    message = ctx.input.get("message", "hello from python")
    author = ctx.input.get("author", "")
    return {"message": message, "author": author}

if __name__ == "__main__":
    worker.run()
Install: pip install cordum β€” the Python SDK handles NATS connection, heartbeats, context hydration, and result publishing automatically.

Node.js Worker Example

The Node.js SDK provides a fluent API with promise-based handlers. Configure the worker with a constructor, register handlers, and start.

examples/node-worker/worker.js
import { Worker } from "@cordum/sdk";

const worker = new Worker({
  pool: "hello-pack",
  subjects: ["job.hello-pack.echo"],
  capabilities: ["hello-pack.echo"],
  natsUrl: process.env.NATS_URL || "nats://localhost:4222",
});

worker.handle("job.hello-pack.echo", async (ctx) => {
  const message = ctx.input?.message ?? "hello from node";
  const author = ctx.input?.author ?? "";
  return { message, author };
});

worker.start();
Install: npm install @cordum/sdk β€” includes TypeScript types for full type safety.

Worker Configuration

Go SDK Config Struct

runtime.Config
type Config struct {
    Pool            string            // Worker pool name
    Subjects        []string          // NATS subjects to subscribe
    Queue           string            // Queue group name
    NatsURL         string            // NATS connection URL
    MaxParallelJobs int32             // Concurrency limit (default: 1)
    Capabilities    []string          // Capability labels
    Labels          map[string]string // Custom routing labels
    Type            string            // Worker type -> job.<type>.*
    WorkerID        string            // Unique worker ID
    HeartbeatEvery  time.Duration     // Heartbeat interval
}

Environment Variables

Worker env vars
# Required
NATS_URL=nats://localhost:4222
REDIS_URL=redis://localhost:6379/0

# Optional
WORKER_ID=my-worker-1      # Auto-generated if not set
WORKER_TYPE=batch-processor # Derives subjects: job.<type>.*
CORDUM_TENANT_ID=my-tenant  # Default: "default"

Worker ID Resolution

The SDK resolves the worker ID in priority order:

  1. Explicit WorkerID in config
  2. Environment variable WORKER_ID
  3. Hostname + type (if available)
  4. Fallback: cordum-worker

Subject Resolution

If Subjects are provided, the worker subscribes to those. If only Type is set, it derives job.<type>.*. The direct subject worker.<id>.jobs is always included automatically.

Heartbeat Protocol

Workers publish heartbeats to sys.heartbeat at a regular interval (default: 30 seconds). The scheduler uses heartbeats for least-loaded routing and the reconciler uses them to detect stale workers.

FieldDescription
worker_idUnique identifier for this worker instance
poolWorker pool name (matches pools.yaml)
typeWorker type (used to derive default subjects)
active_jobsNumber of currently executing jobs
max_parallel_jobsConcurrency limit advertised to scheduler
capabilitiesCapability labels (must match pool requires)
labelsCustom key-value labels for routing and observability
cpu_loadCurrent CPU utilization (0.0 – 1.0)
memory_loadCurrent memory utilization (0.0 – 1.0)
Heartbeat in Go
// Build heartbeat payload
heartbeatFn := func() ([]byte, error) {
    return runtime.HeartbeatPayloadWithMemory(
        workerID,   // worker_id
        "my-pool",  // pool name
        2,          // active_jobs
        8,          // max_parallel_jobs
        0.45,       // cpu_load (0.0-1.0)
        0.62,       // memory_load (0.0-1.0)
    )
}

// Emit once, then start loop
if payload, err := heartbeatFn(); err == nil {
    _ = runtime.EmitHeartbeat(nc, payload)
}
go runtime.HeartbeatLoop(ctx, nc, heartbeatFn)

Result Status Codes

Workers set the status on JobResult to tell the scheduler how the job completed. The status determines retry behavior, DLQ entry, and workflow progression.

SUCCEEDED

Job completed successfully

FAILED_RETRYABLE

Transient failure β€” scheduler may retry (no DLQ entry)

retryable
FAILED_FATAL

Terminal failure β€” triggers compensation if in a workflow

FAILED

Generic terminal failure

CANCELLED

Job was cancelled mid-execution

Error handling in Go handler
handler := func(ctx runtime.Context, input myInput) (myOutput, error) {
    result, err := callExternalAPI(input)
    if err != nil {
        if isTransient(err) {
            // Scheduler will retry based on config
            return myOutput{}, runtime.RetryableError(err)
        }
        // Terminal failure β€” no retry, creates DLQ entry
        return myOutput{}, runtime.FatalError(err)
    }
    return myOutput{Data: result}, nil
}

Concurrency Control

The Go SDK uses a semaphore (buffered channel) to limit parallel job execution. Each incoming job acquires a token before dispatching to a goroutine. The active_jobs count is tracked atomically and reported in heartbeats.

MaxParallelJobs

Set to 1 for serial execution (default) or higher for concurrent processing. Reported in heartbeats.

Queue Groups

Workers on the same subject with a queue group receive round-robin delivery β€” only one worker in the group handles each message.

Direct Delivery

The scheduler can target a specific worker via worker.<id>.jobs, bypassing pool load balancing.

Best Practices

Idempotency

With JetStream enabled, jobs are delivered at-least-once. Use the idempotency_key from JobRequest to detect duplicates. Store a processing record before starting work and check it on receipt.

Graceful Shutdown

Trap SIGINT/SIGTERM and finish the current job before exiting. The Go SDK drains the NATS connection on Close(), ensuring in-flight messages are processed.

Error Classification

Return FAILED_RETRYABLE for transient errors (network timeouts, rate limits) so the scheduler can retry. Return FAILED_FATAL for permanent errors (invalid input, missing permissions) to avoid infinite retries.

Cancellation Handling

Subscribe to sys.job.cancel and check your context for cancellation. When cancelled, clean up resources and publish a terminal result with CANCELLED status.

Heartbeat Accuracy

Report accurate active_jobs and resource metrics. The scheduler uses this for least-loaded routing. Stale heartbeats cause the reconciler to mark jobs as TIMEOUT.

Compensation Templates

For saga workflows, implement compensation logic that reverses your step's effects. Return FAILED_FATAL to trigger the workflow engine's rollback path.

Quick Start

Submit a job to your worker
# Start the worker
NATS_URL=nats://localhost:4222 REDIS_URL=redis://localhost:6379/0 \
  go run ./examples/hello-worker-go

# Submit a job via cordumctl
cordumctl job submit --topic job.hello-pack.echo \
  --input '{"message": "Hello, world!", "author": "Alice"}'

# Or via the API
curl -X POST http://localhost:8081/api/v1/jobs \
  -H "Authorization: Bearer $API_KEY" \
  -d '{
    "topic": "job.hello-pack.echo",
    "input": {"message": "Hello, world!", "author": "Alice"}
  }'