Skip to content

Commit 5897ef3

Browse files
auricomclaude
andcommitted
fix(raft): wait for block-store sync before abdicating on leader election
When all nodes restart simultaneously their block stores can lag behind the raft FSM height (block data arrives via p2p, not raft). With the previous code every elected node saw diff < -1 and immediately called leadershipTransfer(), creating an infinite hot-potato: no node ever stabilised as leader and block production stalled. Instead of abdicating immediately, the new waitForBlockStoreSync helper polls IsSynced for up to ShutdownTimeout (default ~1s). The fastest- syncing peer proceeds as leader; nodes that cannot catch up in time still abdicate and yield to a better candidate. Leadership also checks mid-wait so a lost-leadership event aborts the wait early. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 84ec0d0 commit 5897ef3

2 files changed

Lines changed: 134 additions & 20 deletions

File tree

pkg/raft/election.go

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -136,19 +136,36 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error {
136136
// Store is more than 1 block behind raft state.
137137
// RecoverFromRaft can only apply the single latest block
138138
// from the raft snapshot; it cannot replay a larger gap.
139-
// Starting leader operations in this state would stall block
140-
// production until catch-up completes (potentially minutes or
141-
// hours). Abdicate immediately so a better-synced peer can
142-
// take leadership.
139+
//
140+
// Before abdicating, wait for p2p block-store sync to close
141+
// the gap. If all nodes restart simultaneously with lagging
142+
// block stores, immediate abdication causes a leadership
143+
// hot-potato: every elected node abdicates at once and the
144+
// cluster never stabilises. Waiting gives the fastest-syncing
145+
// peer a chance to stay as leader.
143146
d.logger.Warn().
144147
Int("store_lag_blocks", -diff).
145148
Uint64("raft_height", raftState.Height).
146-
Msg("became leader but store is significantly behind raft state; abdicating to prevent stalled block production")
147-
if tErr := d.node.leadershipTransfer(); tErr != nil {
148-
d.logger.Error().Err(tErr).Msg("leadership transfer failed after store-lag abdication")
149-
return fmt.Errorf("leadership transfer failed after store-lag abdication: %w", tErr)
149+
Msg("became leader but store is significantly behind raft state; waiting for block-store sync")
150+
if !d.waitForBlockStoreSync(ctx, runnable) {
151+
d.logger.Warn().
152+
Int("store_lag_blocks", -diff).
153+
Uint64("raft_height", raftState.Height).
154+
Msg("store still significantly behind raft state after wait; abdicating to prevent stalled block production")
155+
if tErr := d.node.leadershipTransfer(); tErr != nil {
156+
d.logger.Error().Err(tErr).Msg("leadership transfer failed after store-lag abdication")
157+
return fmt.Errorf("leadership transfer failed after store-lag abdication: %w", tErr)
158+
}
159+
continue
160+
}
161+
// Block store caught up — refresh state so the recovery
162+
// check below works with the latest values.
163+
d.logger.Info().Msg("block store caught up after wait; proceeding as leader")
164+
raftState = d.node.GetState()
165+
diff, err = runnable.IsSynced(raftState)
166+
if err != nil {
167+
return err
150168
}
151-
continue
152169
}
153170
if diff != 0 {
154171
d.logger.Info().Msg("became leader but not synced, attempting recovery")
@@ -271,3 +288,37 @@ func (d *DynamicLeaderElection) verifyState(ctx context.Context, runnable Runnab
271288
func (d *DynamicLeaderElection) IsRunning() bool {
272289
return d.running.Load()
273290
}
291+
292+
// waitForBlockStoreSync polls IsSynced until the block store is within 1 block
293+
// of the current raft FSM height, leadership is lost, or the context expires.
294+
// Returns true if sync was achieved in time.
295+
func (d *DynamicLeaderElection) waitForBlockStoreSync(ctx context.Context, r Runnable) bool {
296+
cfg := d.node.Config()
297+
timeout := cfg.ShutdownTimeout
298+
if timeout <= 0 {
299+
timeout = 5 * cfg.SendTimeout
300+
}
301+
deadline := time.NewTimer(timeout)
302+
defer deadline.Stop()
303+
pollInterval := min(100*time.Millisecond, timeout/10)
304+
ticker := time.NewTicker(pollInterval)
305+
defer ticker.Stop()
306+
for {
307+
select {
308+
case <-ctx.Done():
309+
return false
310+
case <-deadline.C:
311+
// Final check before giving up.
312+
diff, err := r.IsSynced(d.node.GetState())
313+
return err == nil && diff >= -1
314+
case <-ticker.C:
315+
if d.node.leaderID() != d.node.NodeID() {
316+
return false // lost leadership during wait
317+
}
318+
diff, err := r.IsSynced(d.node.GetState())
319+
if err == nil && diff >= -1 {
320+
return true
321+
}
322+
}
323+
}
324+
}

pkg/raft/election_test.go

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -221,19 +221,21 @@ func TestDynamicLeaderElectionRun(t *testing.T) {
221221
assert.ErrorIs(t, err, context.Canceled)
222222
},
223223
},
224-
"abdicate when store significantly behind raft": {
224+
"abdicate when store significantly behind raft and never catches up": {
225225
setup: func(t *testing.T) (*DynamicLeaderElection, context.Context, context.CancelFunc) {
226226
m := newMocksourceNode(t)
227227
leaderCh := make(chan bool, 2)
228228
m.EXPECT().leaderCh().Return((<-chan bool)(leaderCh))
229-
// GetState called in verifyState (follower start) and in leader sync check
230-
m.EXPECT().GetState().Return(&RaftBlockState{Height: 10})
231-
m.EXPECT().GetState().Return(&RaftBlockState{Height: 10})
232-
m.EXPECT().Config().Return(testCfg()).Times(2)
229+
// GetState is called in verifyState, leader sync check, and the wait loop.
230+
m.EXPECT().GetState().Return(&RaftBlockState{Height: 10}).Maybe()
231+
// Config: once for follower waitForMsgsLanded, once for leader waitForMsgsLanded,
232+
// once inside waitForBlockStoreSync.
233+
m.EXPECT().Config().Return(testCfg()).Times(3)
233234
m.EXPECT().waitForMsgsLanded(2 * time.Millisecond).Return(nil)
234-
m.EXPECT().NodeID().Return("self")
235-
m.EXPECT().leaderID().Return("self")
236-
// Abdication must transfer leadership
235+
// NodeID + leaderID: called in leader-sync check and polled inside the wait loop.
236+
m.EXPECT().NodeID().Return("self").Maybe()
237+
m.EXPECT().leaderID().Return("self").Maybe()
238+
// Abdication must transfer leadership after wait times out.
237239
m.EXPECT().leadershipTransfer().Return(nil)
238240

239241
fStarted := make(chan struct{})
@@ -262,12 +264,12 @@ func TestDynamicLeaderElectionRun(t *testing.T) {
262264
leaderCh <- false
263265
<-fStarted
264266
leaderCh <- true
265-
// Wait for abdication to complete (transfer + continue) then verify
266-
// the leader was never started before cancelling.
267+
// Wait long enough for the sync wait to time out and abdication to
268+
// complete, then verify the leader was never started.
267269
select {
268270
case <-leaderStarted:
269271
t.Error("leader should not start when store is significantly behind raft")
270-
case <-time.After(50 * time.Millisecond):
272+
case <-time.After(200 * time.Millisecond):
271273
// leadership transferred without starting leader — expected
272274
}
273275
cancel()
@@ -279,6 +281,67 @@ func TestDynamicLeaderElectionRun(t *testing.T) {
279281
assert.ErrorIs(t, err, context.Canceled)
280282
},
281283
},
284+
"proceed as leader when store catches up during wait": {
285+
// Simulates the hot-potato scenario: all nodes behind on election,
286+
// but the winner's block store syncs up before the wait times out.
287+
setup: func(t *testing.T) (*DynamicLeaderElection, context.Context, context.CancelFunc) {
288+
m := newMocksourceNode(t)
289+
leaderCh := make(chan bool, 2)
290+
m.EXPECT().leaderCh().Return((<-chan bool)(leaderCh))
291+
m.EXPECT().GetState().Return(&RaftBlockState{Height: 10}).Maybe()
292+
m.EXPECT().Config().Return(testCfg()).Maybe()
293+
m.EXPECT().waitForMsgsLanded(2 * time.Millisecond).Return(nil)
294+
m.EXPECT().NodeID().Return("self").Maybe()
295+
m.EXPECT().leaderID().Return("self").Maybe()
296+
// No leadershipTransfer: the node should stay as leader.
297+
298+
fStarted := make(chan struct{})
299+
var syncedCalls int
300+
follower := &testRunnable{
301+
startedCh: fStarted,
302+
isSyncedFn: func(*RaftBlockState) (int, error) {
303+
syncedCalls++
304+
if syncedCalls < 3 {
305+
return -5, nil // still catching up
306+
}
307+
return 0, nil // caught up
308+
},
309+
}
310+
leaderStarted := make(chan struct{})
311+
leader := &testRunnable{
312+
startedCh: leaderStarted,
313+
runFn: func(ctx context.Context) error {
314+
<-ctx.Done()
315+
return ctx.Err()
316+
},
317+
}
318+
319+
logger := zerolog.Nop()
320+
d := &DynamicLeaderElection{logger: logger, node: m,
321+
leaderFactory: func() (Runnable, error) { return leader, nil },
322+
followerFactory: func() (Runnable, error) { return follower, nil },
323+
}
324+
ctx, cancel := context.WithCancel(t.Context())
325+
go func() {
326+
leaderCh <- false
327+
<-fStarted
328+
leaderCh <- true
329+
// The leader must start once the store catches up.
330+
select {
331+
case <-leaderStarted:
332+
// expected: leader started after store synced
333+
case <-time.After(200 * time.Millisecond):
334+
t.Error("leader should have started once store caught up")
335+
}
336+
cancel()
337+
}()
338+
return d, ctx, cancel
339+
},
340+
assertF: func(t *testing.T, err error) {
341+
require.Error(t, err)
342+
assert.ErrorIs(t, err, context.Canceled)
343+
},
344+
},
282345
"lost leadership during sync wait": {
283346
setup: func(t *testing.T) (*DynamicLeaderElection, context.Context, context.CancelFunc) {
284347
m := newMocksourceNode(t)

0 commit comments

Comments
 (0)