fix(kafka): eliminate Resume timeout race in batch consumer pause/heartbeat#1313
fix(kafka): eliminate Resume timeout race in batch consumer pause/heartbeat#1313absorbb wants to merge 1 commit into
Conversation
…rtbeat resume() and the pause-heartbeat goroutine coordinate via an unbuffered resumeChannel, so a resume signal can only be delivered while the heartbeat is parked in its select. The heartbeat loop runs restartConsumer synchronously on non-retriable ReadMessage errors; restartConsumer blocks for KafkaSessionTimeoutMs+15s per init attempt (default 60s baseline) and loops if init keeps failing. With unhealthy brokers this stalls the heartbeat past KafkaMaxPollIntervalMs (5 min), triggering `failed to resume kafka consumer: Resume timeout` in resume() and the analogous failure in _unpause(). Two defense-in-depth fixes: - Buffer resumeChannel to size 1. resume() can deposit the signal even while the heartbeat is mid-iteration; the heartbeat picks it up on its next pass. pause() drains any stale signal on entry so a new cycle doesn't inherit one. The signal handler also re-resumes the current consumer in case it was replaced between resume()'s call and the heartbeat picking up the signal. - Run restartConsumer asynchronously when invoked from the heartbeat via new restartConsumerAsync, guarded by an atomic restarting flag to prevent overlapping restart attempts. The synchronous entrypoint is preserved for other call sites that must block until the new consumer is up. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| //buffered size 1: resume() can deposit the signal even if the | ||
| //heartbeat goroutine is mid-iteration (e.g. blocked in | ||
| //restartConsumer); the heartbeat picks it up on the next pass. | ||
| resumeChannel: make(chan struct{}, 1), |
There was a problem hiding this comment.
Making resumeChannel buffered removes the rendezvous semantics that _unpause() currently relies on in pauseOrSuspend() (_unpause(); consumer.Close(); bc.consumer.Store(nil)). With an unbuffered channel, _unpause() blocks until the pause heartbeat loop receives the signal; now it can return immediately while the loop is still inside ReadMessage(). Closing the consumer in that window can produce a non-timeout error in the pause loop and trigger restartConsumerAsync(), which can recreate a consumer right after we intended to suspend it. This looks like a user-visible regression (suspend may turn into restart churn).
Can we keep non-blocking resume() behavior without weakening _unpause() synchronization (e.g. separate channels/paths for resume vs suspend-ack, or an explicit ack/wait primitive for suspend)?
Summary
Two defense-in-depth fixes for the
failed to resume kafka consumer: Resume timeouterror seen on the retry consumer paths in bulkerapp (e.g.in.id.<workspace>.m.retry.t._all_).resumeChannelto size 1 soresume()(and_unpause()) can deposit the signal even when the pause heartbeat goroutine is mid-iteration.pause()drains any stale signal on entry, and the signal handler defensively re-resumes the current consumer in case it was replaced betweenresume()'sconsumer.Resume(...)call and the heartbeat actually picking up the signal.restartConsumerasynchronously when invoked from the heartbeat via a newrestartConsumerAsynchelper, guarded by anatomic.Boolto prevent overlapping restart attempts. The synchronousrestartConsumerentrypoint is preserved for the other call sites (pauseKafkaConsumer, etc.) that need to block until the new consumer is up.Why this fixes the symptom
resume()and the pause-heartbeat goroutine coordinate viaresumeChannel. Before this PR the channel was unbuffered, so a send only succeeds when the heartbeat is parked in itsselect. The heartbeat loop callsrestartConsumersynchronously on non-retriableReadMessageerrors;restartConsumerblocks forKafkaSessionTimeoutMs + 15sper init attempt (default 60 s baseline) and loops if init keeps failing. With unhealthy brokers / network the heartbeat is starved pastKafkaMaxPollIntervalMs(default 5 min), andresume()times out.The same starvation also explains the sibling
failed to unpause kafka consumer.from_unpause().With the buffered channel,
resume()deposits the signal immediately, callsconsumer.Resume(partitions)on the current consumer, and returns. The heartbeat picks up the signal on its next pass (wheneverrestartConsumerfinishes, or on its next ticker tick). With the async restart, the heartbeat no longer parks itself inrestartConsumerat all — restart proceeds in the background, the heartbeat keeps cycling, and the new consumer is brought up paused byrebalanceCallback(which checksbc.paused) as before.Op notes
resume()is no longer reachable.restartConsumeritself is unchanged; only the call site in the heartbeat loop is rerouted throughrestartConsumerAsync. Other callers (e.g.pauseKafkaConsumerfailure path) still run it synchronously.restarting atomic.Boolfield is unexported and only consulted byrestartConsumerAsync; piling-up overlapping restarts is suppressed (subsequent async calls are no-ops while one is in flight).Test plan
go vet ./bulkerapp/...passes (verified locally)Resume timeoutand nofailed to unpausein logs after recoverym.retry.t._all_) resumes cleanly after a forced restartrestartConsumerin flight per consumer)🤖 Generated with Claude Code