Skip to content
Deep Dive

AI Agent NATS Drain vs Close

`sub.Drain()` is helpful. It is not the same as draining the whole connection before shutdown.

Deep Dive10 min readMar 2026
TL;DR
  • -NATS docs separate `Drain` and `Close`, but many platform runbooks still treat shutdown as one step.
  • -Cordum currently drains tracked subscriptions, then calls `nc.Close()` on the connection.
  • -Core NATS publish paths can still have in-memory messages during shutdown if you close immediately.
  • -A safer pattern is: stop intake, drain subscriptions, drain/flush connection, then exit.
Real failure shape

Shutdown races are short and ugly. One lost state event can create expensive repair work.

Current baseline

Cordum drains subscriptions before connection close. That is better than raw close-only behavior.

Gap to close

Connection-level drain semantics are still different from subscription-only drain.

Scope

This guide focuses on shutdown correctness at the NATS client layer for Cordum-style control planes, not full broker maintenance strategy.

The production problem

A pod receives SIGTERM while it still has pending publish work. Shutdown code closes fast. Minutes later, someone asks why a progress or alert event vanished.

This class of bug is intermittent, expensive, and usually found during incident review instead of tests.

Reconnect logic helps after disconnects. It does not rescue messages that never left process memory before close.

What top results cover and miss

SourceStrong coverageMissing piece
NATS docs: Draining Messages Before DisconnectClear connection-drain sequence: drain subs, stop publish, flush remaining publishes, then close.No platform-specific guidance for mixed JetStream + core NATS subject sets in agent control planes.
NATS docs: Automatic ReconnectionsReconnect behavior, callback events, and reconnection buffers/timeouts.Reconnect guidance does not replace deterministic process-shutdown sequencing.
nats.go package docs`Conn.Close()` and `Conn.Drain()` semantics are documented separately.No end-to-end migration checklist for existing services that already implemented partial drain logic.

Cordum runtime behavior

Cordum already does one important thing right: draining tracked subscriptions before close.

The remaining edge is connection-level publish teardown. SIGTERM does not care about your pending buffer.

AreaCurrent behaviorOperational impact
Subscription shutdownCordum tracks subscriptions and calls `sub.Drain()` before closing bus resources.In-flight subscribed work can finish before teardown.
Connection shutdownAfter subscription drain, `Close()` calls `nc.Close()` directly.Connection closes fast. Pending client-side publish buffers are not explicitly drained in this step.
Publish path splitDurable subjects can use JetStream `js.Publish()`; other subjects use core `nc.Publish()`.Risk profile differs by subject durability class during shutdown.
Reconnect defaultsCordum sets `MaxReconnects(-1)` and `ReconnectWait(2 * time.Second)`.Reconnect helps availability after disconnect. It does not retroactively flush messages already dropped during immediate close.
Current close path
go
// Current shutdown path in core/infra/bus/nats.go
func (b *NatsBus) Drain() {
  subs := b.subs // simplified excerpt from tracked subscription drain

  for _, sub := range subs {
    if sub == nil || !sub.IsValid() {
      continue
    }
    _ = sub.Drain()
  }
}

func (b *NatsBus) Close() {
  b.Drain()
  if b.nc != nil {
    b.nc.Close()
  }
}
Publish path split
go
func (b *NatsBus) Publish(subject string, packet *pb.BusPacket) error {
  if b.jsEnabled && isDurableSubject(subject) {
    _, err := b.js.Publish(subject, data)
    return err
  }

  // Core NATS path
  return b.nc.Publish(subject, data)
}

Safer shutdown pattern

Use a four-step sequence:

1) stop intake, 2) drain subscriptions, 3) drain or flush connection publishes, 4) then exit.

Keep a bounded shutdown budget and surface drain timeout metrics. Unbounded drain can hang rollout jobs.

Operator validation runbook
bash
# 1) Send controlled load on both durable and non-durable subjects
# 2) Trigger rolling restart of scheduler / gateway pods
kubectl -n cordum rollout restart deploy/cordum-scheduler

# 3) Verify no unexpected publish-path errors during shutdown
kubectl -n cordum logs deploy/cordum-scheduler | rg "drain|close|publish|disconnected|reconnected"

# 4) Compare message continuity metrics before/after patch
#    - submitted jobs
#    - result events
#    - progress/heartbeat visibility

Implementation options

If you want stricter alignment with NATS connection-drain guidance, add connection drain semantics in `Close()` and keep force-close fallback for timeout cases.

Connection-drain close variant
go
// One possible hardening direction.
func (b *NatsBus) Close() {
  b.Drain() // subscription-level drain first

  if b.nc == nil {
    return
  }

  // Connection drain attempts: stop new publish, flush queued messages, close.
  if err := b.nc.Drain(); err != nil {
    slog.Warn("bus: connection drain failed, forcing close", "err", err)
    b.nc.Close()
  }
}

Roll this behind a feature flag first. Drain behavior under long-running handlers can change shutdown timing in visible ways.

Limitations and tradeoffs

ApproachUpsideDownside
Keep subscription drain + immediate closeShort shutdown latency and simple control flow.Higher chance of losing buffered core-NATS publishes in unlucky timing windows.
Add connection drainCloser to documented NATS graceful-close sequence.Longer shutdown windows and possible timeout handling complexity.
Rely only on reconnectNo shutdown-code changes.Reconnect solves disconnect survival, not deterministic pre-exit buffer handling.

Next step

Run one staged restart test that intentionally kills pods under load, then compare pre/post shutdown-event continuity for core NATS subjects.

If your metrics improve, keep the change. If shutdown latency breaks rollout windows, tune drain timeout and fallback close strategy before wider rollout.

Related Articles

View all posts

Need production-safe agent governance?

Cordum helps teams enforce pre-dispatch policy, run dependable agent workflows, and keep evidence trails auditable.