Skip to content
Deep Dive

AI Agent NATS Slow Consumer Guardrails

If core subscriptions have no explicit queue budget, burst traffic will pick the budget for you.

Deep Dive11 min readMar 2026
TL;DR
  • -NATS docs are explicit: slow consumers get cut off or drop messages unless applications react.
  • -Cordum already enforces `MaxAckPending` on JetStream subscriptions, but core NATS subscriptions are created without `SetPendingLimits`.
  • -Cordum currently wires disconnect/reconnect callbacks, but not `nats.ErrorHandler` for async slow-consumer notifications.
  • -Add explicit pending limits and async error handling on core paths before traffic growth turns silent drops into incident forensics.
Documented risk

NATS favors cluster protection over one lagging consumer. Drops or disconnects are expected behavior.

Current split

JetStream path has explicit backpressure control; core path is more permissive by default.

Low-risk hardening

Pending-limit policy plus error callback telemetry is small code with high operational payoff.

Scope

This guide focuses on slow-consumer protection in Cordum bus client code, not full broker hardware sizing strategy.

The production problem

Load increases. One subscriber falls behind. Nobody notices until downstream state looks inconsistent.

NATS behavior here is intentional. It protects the system first, and the lagging consumer second.

If your application does not expose local slow-consumer signals quickly, incident response starts late.

What top results cover and miss

SourceStrong coverageMissing piece
NATS docs: Slow Consumers (server/operator)Server/client slow-consumer behavior, default pending limits (65,536 messages and 65,536*1024 bytes), and tuning directions.No control-plane-specific guidance for mixed JetStream and core NATS subject classes in one runtime.
NATS docs: Slow Consumers (developer events)Client-side pending-limit APIs and slow-consumer detection pattern examples.No migration checklist for teams with legacy subscriptions that were created without limits.
nats.go package docs`ErrorHandler`, `Subscription.SetPendingLimits`, `Pending`, `Dropped`, and `StatusChanged` APIs.No production defaults for AI control-plane workloads or guidance on per-subject budget partitioning.

Cordum runtime behavior

Cordum has strong JetStream guardrails in place. Core NATS paths are less opinionated today.

That split is easy to miss because both code paths share the same high-level bus API.

BoundaryCurrent behaviorOperational impact
JetStream subscribe pathCordum sets `nats.MaxAckPending(natsMaxAckPending())` for durable subjects.Backpressure ceiling is explicit and configurable in JetStream mode.
Core NATS subscribe pathCore subscriptions use `nc.Subscribe` / `nc.QueueSubscribe` with no immediate `SetPendingLimits` call.Slow-consumer limits stay at library defaults unless changed elsewhere.
Connection callbacksCordum sets disconnect/reconnect/closed handlers but not `nats.ErrorHandler`.Async local slow-consumer events are easier to miss in operator telemetry.
Operational observabilityNo built-in metric surface for per-subscription dropped-count snapshots on core NATS paths.Teams usually discover this by reading logs after user-visible lag.
Current core + connection setup excerpt
go
opts := []nats.Option{
  nats.Name("cordum-bus"),
  nats.MaxReconnects(-1),
  nats.ReconnectWait(2 * time.Second),
  nats.DisconnectErrHandler(...),
  nats.ReconnectHandler(...),
  nats.ClosedHandler(...),
}

// Core path
if queue == "" {
  sub, err := b.nc.Subscribe(subject, cb)
  ...
}
sub, err := b.nc.QueueSubscribe(subject, queue, cb)

Practical patch pattern

Add two pieces together:

1) `nats.ErrorHandler` for async local slow-consumer visibility, 2) explicit `SetPendingLimits` for core subscriptions.

Start with conservative limits and tune by subject class. A single global number is usually wrong.

Guardrail patch sketch
go
func withSlowConsumerGuardrails(opts *[]nats.Option) {
  *opts = append(*opts, nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
    if err == nats.ErrSlowConsumer {
      pendingMsgs, pendingBytes, pErr := sub.Pending()
      if pErr == nil {
        slog.Warn("bus: slow consumer", "subject", sub.Subject, "pending_msgs", pendingMsgs, "pending_bytes", pendingBytes)
      } else {
        slog.Warn("bus: slow consumer", "subject", sub.Subject, "pending_err", pErr)
      }
      return
    }
    slog.Warn("bus: async nats error", "err", err)
  }))
}

func setCorePendingLimits(sub *nats.Subscription) {
  // Example starting point, tune per workload.
  const msgLimit = 20000
  const bytesLimit = 64 * 1024 * 1024
  if err := sub.SetPendingLimits(msgLimit, bytesLimit); err != nil {
    slog.Warn("bus: set pending limits failed", "subject", sub.Subject, "err", err)
  }
}

Rollout and validation

Do not merge this blind. Reproduce bursts in staging and compare pending and dropped counters before and after.

Validation runbook
bash
# 1) Force burst traffic above normal steady-state rate
# 2) Compare before/after dropped and pending signals

kubectl -n cordum logs deploy/cordum-scheduler | rg "slow consumer|pending_msgs|pending_bytes|async nats error"

# Optional: inspect server side count
curl -s http://nats:8222/varz | jq '.slow_consumers'

# 3) Tune per subject class
# - low-latency control subjects: tighter pending limits
# - bursty telemetry subjects: looser byte budget with queue scale-out

If dropped counts rise after tightening limits, either scale queue workers or split subject namespaces before relaxing limits.

Limitations and tradeoffs

ApproachUpsideDownside
Keep default core pending limitsNo behavior change and no tuning work.Risk of delayed detection and surprise drops under bursts.
Add explicit limits + async error callbackPredictable queue budgets and faster operator signal.Requires per-subject tuning to avoid over-aggressive drops.
Set unlimited pending queuesFewer immediate drop events in short spikes.Memory growth risk and worse failure modes under sustained mismatch.

Next step

Patch one service first, run burst tests, and publish a small internal default matrix by subject type before rolling the change across all control-plane binaries.

Related Articles

View all posts

Need production-safe agent governance?

Cordum helps teams enforce pre-dispatch policy, run dependable agent workflows, and keep evidence trails auditable.