Skip to content

fix(kafka): eliminate Resume timeout race in batch consumer pause/heartbeat#1313

Open
absorbb wants to merge 1 commit into
newjitsufrom
fix/kafka-consumer-resume-timeout
Open

fix(kafka): eliminate Resume timeout race in batch consumer pause/heartbeat#1313
absorbb wants to merge 1 commit into
newjitsufrom
fix/kafka-consumer-resume-timeout

Conversation

@absorbb
Copy link
Copy Markdown
Contributor

@absorbb absorbb commented May 21, 2026

Summary

Two defense-in-depth fixes for the failed to resume kafka consumer: Resume timeout error seen on the retry consumer paths in bulkerapp (e.g. in.id.<workspace>.m.retry.t._all_).

  • Buffer resumeChannel to size 1 so resume() (and _unpause()) can deposit the signal even when the pause heartbeat goroutine is mid-iteration. pause() drains any stale signal on entry, and the signal handler defensively re-resumes the current consumer in case it was replaced between resume()'s consumer.Resume(...) call and the heartbeat actually picking up the signal.
  • Run restartConsumer asynchronously when invoked from the heartbeat via a new restartConsumerAsync helper, guarded by an atomic.Bool to prevent overlapping restart attempts. The synchronous restartConsumer entrypoint is preserved for the other call sites (pauseKafkaConsumer, etc.) that need to block until the new consumer is up.

Why this fixes the symptom

resume() and the pause-heartbeat goroutine coordinate via resumeChannel. Before this PR the channel was unbuffered, so a send only succeeds when the heartbeat is parked in its select. The heartbeat loop calls restartConsumer synchronously on non-retriable ReadMessage errors; restartConsumer blocks for KafkaSessionTimeoutMs + 15s per init attempt (default 60 s baseline) and loops if init keeps failing. With unhealthy brokers / network the heartbeat is starved past KafkaMaxPollIntervalMs (default 5 min), and resume() times out.

The same starvation also explains the sibling failed to unpause kafka consumer. from _unpause().

With the buffered channel, resume() deposits the signal immediately, calls consumer.Resume(partitions) on the current consumer, and returns. The heartbeat picks up the signal on its next pass (whenever restartConsumer finishes, or on its next ticker tick). With the async restart, the heartbeat no longer parks itself in restartConsumer at all — restart proceeds in the background, the heartbeat keeps cycling, and the new consumer is brought up paused by rebalanceCallback (which checks bc.paused) as before.

Op notes

  • Behavior is unchanged on the happy path. Resume timing improves under broker degradation; the 5-minute stall in resume() is no longer reachable.
  • restartConsumer itself is unchanged; only the call site in the heartbeat loop is rerouted through restartConsumerAsync. Other callers (e.g. pauseKafkaConsumer failure path) still run it synchronously.
  • The new restarting atomic.Bool field is unexported and only consulted by restartConsumerAsync; piling-up overlapping restarts is suppressed (subsequent async calls are no-ops while one is in flight).

Test plan

  • go vet ./bulkerapp/... passes (verified locally)
  • Deploy to staging; induce a broker disconnect to force the heartbeat into the non-retriable-error branch; confirm no Resume timeout and no failed to unpause in logs after recovery
  • Confirm the retry consumer (m.retry.t._all_) resumes cleanly after a forced restart
  • Verify no duplicate / orphan kafka consumers when restart is triggered from heartbeat (should be at most one concurrent restartConsumer in flight per consumer)

🤖 Generated with Claude Code

…rtbeat

resume() and the pause-heartbeat goroutine coordinate via an unbuffered
resumeChannel, so a resume signal can only be delivered while the
heartbeat is parked in its select. The heartbeat loop runs
restartConsumer synchronously on non-retriable ReadMessage errors;
restartConsumer blocks for KafkaSessionTimeoutMs+15s per init attempt
(default 60s baseline) and loops if init keeps failing. With unhealthy
brokers this stalls the heartbeat past KafkaMaxPollIntervalMs (5 min),
triggering `failed to resume kafka consumer: Resume timeout` in
resume() and the analogous failure in _unpause().

Two defense-in-depth fixes:

- Buffer resumeChannel to size 1. resume() can deposit the signal even
  while the heartbeat is mid-iteration; the heartbeat picks it up on
  its next pass. pause() drains any stale signal on entry so a new
  cycle doesn't inherit one. The signal handler also re-resumes the
  current consumer in case it was replaced between resume()'s call
  and the heartbeat picking up the signal.
- Run restartConsumer asynchronously when invoked from the heartbeat
  via new restartConsumerAsync, guarded by an atomic restarting flag
  to prevent overlapping restart attempts. The synchronous entrypoint
  is preserved for other call sites that must block until the new
  consumer is up.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@jitsu-code-review jitsu-code-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found one correctness issue in this change that can regress suspend behavior under the same race this PR is addressing. Please see the inline comment.

//buffered size 1: resume() can deposit the signal even if the
//heartbeat goroutine is mid-iteration (e.g. blocked in
//restartConsumer); the heartbeat picks it up on the next pass.
resumeChannel: make(chan struct{}, 1),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making resumeChannel buffered removes the rendezvous semantics that _unpause() currently relies on in pauseOrSuspend() (_unpause(); consumer.Close(); bc.consumer.Store(nil)). With an unbuffered channel, _unpause() blocks until the pause heartbeat loop receives the signal; now it can return immediately while the loop is still inside ReadMessage(). Closing the consumer in that window can produce a non-timeout error in the pause loop and trigger restartConsumerAsync(), which can recreate a consumer right after we intended to suspend it. This looks like a user-visible regression (suspend may turn into restart churn).

Can we keep non-blocking resume() behavior without weakening _unpause() synchronization (e.g. separate channels/paths for resume vs suspend-ack, or an explicit ack/wait primitive for suspend)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant