Worker Development Guide
Workers are external processes that subscribe to job topics on the NATS bus, execute domain logic, and publish results. Build workers in Go, Python, or Node.js using the CAP SDK.
What is a Worker?
A worker is any process that speaks the CAP v2 protocol. Workers connect to NATS, subscribe to job topics, receive BusPacket{JobRequest} messages, and publish BusPacket{JobResult} back. The Cordum control plane handles scheduling, safety evaluation, and lifecycle managementβworkers only need to execute domain logic.
Fetch & Execute
Load context from Redis, run logic, write results back
Heartbeat
Periodic health signals with capacity and resource metrics
Topic Subscription
Pool subjects (job.*) or direct delivery (worker.<id>.jobs)
Safety Evaluated
Every job is checked by the Safety Kernel before dispatch
Worker Lifecycle
Connect to NATS
Establish a connection to the NATS bus using the configured URL
Subscribe to topics
Subscribe to job topics (job.<pack>.<task>) or direct subject (worker.<id>.jobs)
Send initial heartbeat
Announce presence to the scheduler with capabilities, pool, and capacity
Receive JobRequest
Scheduler dispatches a BusPacket{JobRequest} with context_ptr and metadata
Fetch context
Load job input from Redis via the context_ptr (e.g. redis://ctx:<job_id>)
Execute work
Run your domain logic β call APIs, query databases, invoke LLMs, etc.
Write result
Store output JSON in Redis via the result_ptr (e.g. redis://res:<job_id>)
Publish JobResult
Send BusPacket{JobResult} to sys.job.result with status and execution time
Continue heartbeating
Keep sending heartbeats so the scheduler knows the worker is healthy
Bus Subjects
Workers interact with the NATS bus through these subjects. With JetStream enabled, durable subjects provide at-least-once delivery. Plain NATS subjects (heartbeats, cancellation) use at-most-once.
| Subject | Description |
|---|---|
| sys.job.submit | Inbound jobs to the scheduler |
| sys.job.result | Job completions from workers |
| sys.heartbeat | Worker heartbeats (fan-out, no queue group) |
| sys.job.cancel | Cancellation notifications |
| sys.job.progress | Optional progress updates during execution |
| job.* | Worker pool subjects (e.g. job.default, job.batch) |
| worker.<id>.jobs | Direct, worker-targeted delivery |
Go Worker Example
The Go SDK provides a typed runtime.Agent with automatic protobuf marshalling, context hydration, and result publishing. Register handlers for specific topics and the SDK handles the rest.
package main
import (
"context"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/cordum/cordum/sdk/runtime"
"github.com/nats-io/nats.go"
)
type echoInput struct {
Message string `json:"message"`
Author string `json:"author,omitempty"`
}
type echoOutput struct {
Message string `json:"message"`
Author string `json:"author,omitempty"`
}
func main() {
ctx, cancel := signal.NotifyContext(
context.Background(), os.Interrupt, syscall.SIGTERM,
)
defer cancel()
workerID := envOr("WORKER_ID", "hello-worker")
natsURL := envOr("NATS_URL", "nats://127.0.0.1:4222")
agent := &runtime.Agent{
NATSURL: natsURL,
RedisURL: envOr("REDIS_URL", "redis://127.0.0.1:6379/0"),
SenderID: workerID,
}
handler := func(ctx runtime.Context, input echoInput) (echoOutput, error) {
message := strings.TrimSpace(input.Message)
if message == "" {
message = "hello from worker"
}
return echoOutput{Message: message, Author: input.Author}, nil
}
// Register handler for pool topic + direct subject
runtime.Register(agent, "job.hello-pack.echo", handler)
runtime.Register(agent, runtime.DirectSubject(workerID), handler)
if err := agent.Start(); err != nil {
log.Fatalf("runtime start: %v", err)
}
defer agent.Close()
// Separate NATS connection for heartbeats
nc, err := nats.Connect(natsURL,
nats.Name(workerID), nats.Timeout(5*time.Second))
if err != nil {
log.Fatalf("nats connect: %v", err)
}
defer nc.Drain()
heartbeatFn := func() ([]byte, error) {
return runtime.HeartbeatPayload(
workerID, "hello-pack", 0, 4, 0,
) // workerID, pool, activeJobs, maxParallel, cpuLoad
}
if payload, err := heartbeatFn(); err == nil {
_ = runtime.EmitHeartbeat(nc, payload)
}
go runtime.HeartbeatLoop(ctx, nc, heartbeatFn)
log.Printf("worker ready (id=%s)", workerID)
<-ctx.Done()
}Typed Handlers
Define Go structs for input/output β the SDK handles JSON marshalling and Redis pointer hydration automatically
Topic Registration
runtime.Register() binds a handler to a NATS subject. Register the same handler for pool + direct subjects
Graceful Shutdown
signal.NotifyContext traps SIGINT/SIGTERM. agent.Close() drains the NATS connection and finishes in-flight jobs
Python Worker Example
The Python SDK uses a decorator pattern for handler registration with async support. The JobContext object provides access to job input, metadata, and result publishing.
from cordum import Worker, JobContext
worker = Worker(
pool="hello-pack",
subjects=["job.hello-pack.echo"],
capabilities=["hello-pack.echo"],
)
@worker.handler("job.hello-pack.echo")
async def handle_echo(ctx: JobContext):
message = ctx.input.get("message", "hello from python")
author = ctx.input.get("author", "")
return {"message": message, "author": author}
if __name__ == "__main__":
worker.run()pip install cordum β the Python SDK handles NATS connection, heartbeats, context hydration, and result publishing automatically.Node.js Worker Example
The Node.js SDK provides a fluent API with promise-based handlers. Configure the worker with a constructor, register handlers, and start.
import { Worker } from "@cordum/sdk";
const worker = new Worker({
pool: "hello-pack",
subjects: ["job.hello-pack.echo"],
capabilities: ["hello-pack.echo"],
natsUrl: process.env.NATS_URL || "nats://localhost:4222",
});
worker.handle("job.hello-pack.echo", async (ctx) => {
const message = ctx.input?.message ?? "hello from node";
const author = ctx.input?.author ?? "";
return { message, author };
});
worker.start();npm install @cordum/sdk β includes TypeScript types for full type safety.Worker Configuration
Go SDK Config Struct
type Config struct {
Pool string // Worker pool name
Subjects []string // NATS subjects to subscribe
Queue string // Queue group name
NatsURL string // NATS connection URL
MaxParallelJobs int32 // Concurrency limit (default: 1)
Capabilities []string // Capability labels
Labels map[string]string // Custom routing labels
Type string // Worker type -> job.<type>.*
WorkerID string // Unique worker ID
HeartbeatEvery time.Duration // Heartbeat interval
}Environment Variables
# Required NATS_URL=nats://localhost:4222 REDIS_URL=redis://localhost:6379/0 # Optional WORKER_ID=my-worker-1 # Auto-generated if not set WORKER_TYPE=batch-processor # Derives subjects: job.<type>.* CORDUM_TENANT_ID=my-tenant # Default: "default"
Worker ID Resolution
The SDK resolves the worker ID in priority order:
- Explicit
WorkerIDin config - Environment variable
WORKER_ID - Hostname + type (if available)
- Fallback:
cordum-worker
Subject Resolution
If Subjects are provided, the worker subscribes to those. If only Type is set, it derives job.<type>.*. The direct subject worker.<id>.jobs is always included automatically.
Heartbeat Protocol
Workers publish heartbeats to sys.heartbeat at a regular interval (default: 30 seconds). The scheduler uses heartbeats for least-loaded routing and the reconciler uses them to detect stale workers.
| Field | Description |
|---|---|
| worker_id | Unique identifier for this worker instance |
| pool | Worker pool name (matches pools.yaml) |
| type | Worker type (used to derive default subjects) |
| active_jobs | Number of currently executing jobs |
| max_parallel_jobs | Concurrency limit advertised to scheduler |
| capabilities | Capability labels (must match pool requires) |
| labels | Custom key-value labels for routing and observability |
| cpu_load | Current CPU utilization (0.0 β 1.0) |
| memory_load | Current memory utilization (0.0 β 1.0) |
// Build heartbeat payload
heartbeatFn := func() ([]byte, error) {
return runtime.HeartbeatPayloadWithMemory(
workerID, // worker_id
"my-pool", // pool name
2, // active_jobs
8, // max_parallel_jobs
0.45, // cpu_load (0.0-1.0)
0.62, // memory_load (0.0-1.0)
)
}
// Emit once, then start loop
if payload, err := heartbeatFn(); err == nil {
_ = runtime.EmitHeartbeat(nc, payload)
}
go runtime.HeartbeatLoop(ctx, nc, heartbeatFn)Result Status Codes
Workers set the status on JobResult to tell the scheduler how the job completed. The status determines retry behavior, DLQ entry, and workflow progression.
SUCCEEDEDJob completed successfully
FAILED_RETRYABLETransient failure β scheduler may retry (no DLQ entry)
retryableFAILED_FATALTerminal failure β triggers compensation if in a workflow
FAILEDGeneric terminal failure
CANCELLEDJob was cancelled mid-execution
handler := func(ctx runtime.Context, input myInput) (myOutput, error) {
result, err := callExternalAPI(input)
if err != nil {
if isTransient(err) {
// Scheduler will retry based on config
return myOutput{}, runtime.RetryableError(err)
}
// Terminal failure β no retry, creates DLQ entry
return myOutput{}, runtime.FatalError(err)
}
return myOutput{Data: result}, nil
}Concurrency Control
The Go SDK uses a semaphore (buffered channel) to limit parallel job execution. Each incoming job acquires a token before dispatching to a goroutine. The active_jobs count is tracked atomically and reported in heartbeats.
MaxParallelJobs
Set to 1 for serial execution (default) or higher for concurrent processing. Reported in heartbeats.
Queue Groups
Workers on the same subject with a queue group receive round-robin delivery β only one worker in the group handles each message.
Direct Delivery
The scheduler can target a specific worker via worker.<id>.jobs, bypassing pool load balancing.
Best Practices
Idempotency
With JetStream enabled, jobs are delivered at-least-once. Use the idempotency_key from JobRequest to detect duplicates. Store a processing record before starting work and check it on receipt.
Graceful Shutdown
Trap SIGINT/SIGTERM and finish the current job before exiting. The Go SDK drains the NATS connection on Close(), ensuring in-flight messages are processed.
Error Classification
Return FAILED_RETRYABLE for transient errors (network timeouts, rate limits) so the scheduler can retry. Return FAILED_FATAL for permanent errors (invalid input, missing permissions) to avoid infinite retries.
Cancellation Handling
Subscribe to sys.job.cancel and check your context for cancellation. When cancelled, clean up resources and publish a terminal result with CANCELLED status.
Heartbeat Accuracy
Report accurate active_jobs and resource metrics. The scheduler uses this for least-loaded routing. Stale heartbeats cause the reconciler to mark jobs as TIMEOUT.
Compensation Templates
For saga workflows, implement compensation logic that reverses your step's effects. Return FAILED_FATAL to trigger the workflow engine's rollback path.
Quick Start
# Start the worker
NATS_URL=nats://localhost:4222 REDIS_URL=redis://localhost:6379/0 \
go run ./examples/hello-worker-go
# Submit a job via cordumctl
cordumctl job submit --topic job.hello-pack.echo \
--input '{"message": "Hello, world!", "author": "Alice"}'
# Or via the API
curl -X POST http://localhost:8081/api/v1/jobs \
-H "Authorization: Bearer $API_KEY" \
-d '{
"topic": "job.hello-pack.echo",
"input": {"message": "Hello, world!", "author": "Alice"}
}'