@@ -52,8 +52,8 @@ type Syncer struct {
5252 // State management
5353 lastState * atomic.Pointer [types.State ]
5454
55- // DA state
56- daHeight * atomic.Uint64
55+ // DA retriever height
56+ daRetrieverHeight * atomic.Uint64
5757
5858 // P2P stores
5959 headerStore common.Broadcaster [* types.SignedHeader ]
@@ -95,29 +95,28 @@ func NewSyncer(
9595 errorCh chan <- error ,
9696) * Syncer {
9797 return & Syncer {
98- store : store ,
99- exec : exec ,
100- da : da ,
101- cache : cache ,
102- metrics : metrics ,
103- config : config ,
104- genesis : genesis ,
105- options : options ,
106- headerStore : headerStore ,
107- dataStore : dataStore ,
108- lastState : & atomic.Pointer [types.State ]{},
109- daHeight : & atomic.Uint64 {},
110- heightInCh : make (chan common.DAHeightEvent , 1_000 ),
111- errorCh : errorCh ,
112- logger : logger .With ().Str ("component" , "syncer" ).Logger (),
98+ store : store ,
99+ exec : exec ,
100+ da : da ,
101+ cache : cache ,
102+ metrics : metrics ,
103+ config : config ,
104+ genesis : genesis ,
105+ options : options ,
106+ headerStore : headerStore ,
107+ dataStore : dataStore ,
108+ lastState : & atomic.Pointer [types.State ]{},
109+ daRetrieverHeight : & atomic.Uint64 {},
110+ heightInCh : make (chan common.DAHeightEvent , 1_000 ),
111+ errorCh : errorCh ,
112+ logger : logger .With ().Str ("component" , "syncer" ).Logger (),
113113 }
114114}
115115
116116// Start begins the syncing component
117117func (s * Syncer ) Start (ctx context.Context ) error {
118118 s .ctx , s .cancel = context .WithCancel (ctx )
119119
120- // Initialize state
121120 if err := s .initializeState (); err != nil {
122121 return fmt .Errorf ("failed to initialize syncer state: %w" , err )
123122 }
@@ -131,12 +130,12 @@ func (s *Syncer) Start(ctx context.Context) error {
131130 s .p2pHandler .SetProcessedHeight (currentHeight )
132131 }
133132
133+ if ! s .waitForGenesis () {
134+ return nil
135+ }
136+
134137 // Start main processing loop
135- s .wg .Add (1 )
136- go func () {
137- defer s .wg .Done ()
138- s .processLoop ()
139- }()
138+ go s .processLoop ()
140139
141140 // Start dedicated workers for DA, and pending processing
142141 s .startSyncWorkers ()
@@ -175,16 +174,6 @@ func (s *Syncer) SetLastState(state types.State) {
175174 s .lastState .Store (& state )
176175}
177176
178- // GetDAHeight returns the current DA height
179- func (s * Syncer ) GetDAHeight () uint64 {
180- return max (s .daHeight .Load (), s .cache .DaHeight ())
181- }
182-
183- // SetDAHeight updates the DA height
184- func (s * Syncer ) SetDAHeight (height uint64 ) {
185- s .daHeight .Store (height )
186- }
187-
188177// initializeState loads the current sync state
189178func (s * Syncer ) initializeState () error {
190179 // Load state from store
@@ -216,14 +205,13 @@ func (s *Syncer) initializeState() error {
216205 }
217206 s .SetLastState (state )
218207
219- // Set DA height
220- // we get the max from the genesis da height, the state da height and the cache (fetched) da height
221- // if a user has messed up and sync da too far ahead, on restart they can clear the cache (--clear-cache) and the retrieve will restart fetching from the last known block synced and executed from DA or the set genesis da height.
222- s .SetDAHeight (max (s .genesis .DAStartHeight , s .cache .DaHeight (), state .DAHeight ))
208+ // Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height.
209+ // This ensures we resume from the highest known DA height, even if the cache is cleared on restart. If the DA height is too high because of a user error, reset it with --evnode.clear_cache. The DA height will be back to the last highest known executed DA height for a height.
210+ s .daRetrieverHeight .Store (max (s .genesis .DAStartHeight , s .cache .DaHeight (), state .DAHeight ))
223211
224212 s .logger .Info ().
225213 Uint64 ("height" , state .LastBlockHeight ).
226- Uint64 ("da_height" , s .GetDAHeight ()).
214+ Uint64 ("da_height" , s .daRetrieverHeight . Load ()).
227215 Str ("chain_id" , state .ChainID ).
228216 Msg ("initialized syncer state" )
229217
@@ -238,6 +226,9 @@ func (s *Syncer) initializeState() error {
238226
239227// processLoop is the main coordination loop for processing events
240228func (s * Syncer ) processLoop () {
229+ s .wg .Add (1 )
230+ defer s .wg .Done ()
231+
241232 s .logger .Info ().Msg ("starting process loop" )
242233 defer s .logger .Info ().Msg ("process loop stopped" )
243234
@@ -261,10 +252,6 @@ func (s *Syncer) startSyncWorkers() {
261252func (s * Syncer ) daWorkerLoop () {
262253 defer s .wg .Done ()
263254
264- if ! s .waitForGenesis () {
265- return
266- }
267-
268255 s .logger .Info ().Msg ("starting DA worker" )
269256 defer s .logger .Info ().Msg ("DA worker stopped" )
270257
@@ -299,13 +286,13 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
299286 default :
300287 }
301288
302- daHeight := s . GetDAHeight ( )
289+ daHeight := max ( s . daRetrieverHeight . Load (), s . cache . DaHeight () )
303290
304291 events , err := s .daRetriever .RetrieveFromDA (s .ctx , daHeight )
305292 if err != nil {
306293 switch {
307294 case errors .Is (err , coreda .ErrBlobNotFound ):
308- s .SetDAHeight (daHeight + 1 )
295+ s .daRetrieverHeight . Store (daHeight + 1 )
309296 continue // Fetch next height immediately
310297 case errors .Is (err , coreda .ErrHeightFromFuture ):
311298 s .logger .Debug ().Err (err ).Uint64 ("da_height" , daHeight ).Msg ("DA is ahead of local target; backing off future height requests" )
@@ -330,18 +317,14 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
330317 }
331318 }
332319
333- // increment DA height on successful retrieval
334- s .SetDAHeight (daHeight + 1 )
320+ // increment DA retrieval height on successful retrieval
321+ s .daRetrieverHeight . Store (daHeight + 1 )
335322 }
336323}
337324
338325func (s * Syncer ) pendingWorkerLoop () {
339326 defer s .wg .Done ()
340327
341- if ! s .waitForGenesis () {
342- return
343- }
344-
345328 s .logger .Info ().Msg ("starting pending worker" )
346329 defer s .logger .Info ().Msg ("pending worker stopped" )
347330
@@ -361,10 +344,6 @@ func (s *Syncer) pendingWorkerLoop() {
361344func (s * Syncer ) p2pWorkerLoop () {
362345 defer s .wg .Done ()
363346
364- if ! s .waitForGenesis () {
365- return
366- }
367-
368347 logger := s .logger .With ().Str ("worker" , "p2p" ).Logger ()
369348 logger .Info ().Msg ("starting P2P worker" )
370349 defer logger .Info ().Msg ("P2P worker stopped" )
@@ -545,13 +524,14 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
545524 return err
546525 }
547526
548- // Apply block
549527 newState , err := s .applyBlock (header .Header , data , currentState )
550528 if err != nil {
551529 return fmt .Errorf ("failed to apply block: %w" , err )
552530 }
553531
554532 // Update DA height if needed
533+ // This height is only updated when a height is processed from DA as P2P
534+ // events do not contain DA height information
555535 if event .DaHeight > newState .DAHeight {
556536 newState .DAHeight = event .DaHeight
557537 }
@@ -677,15 +657,6 @@ func (s *Syncer) sendCriticalError(err error) {
677657 }
678658}
679659
680- // sendNonBlockingSignal sends a signal without blocking
681- func (s * Syncer ) sendNonBlockingSignal (ch chan struct {}, name string ) {
682- select {
683- case ch <- struct {}{}:
684- default :
685- s .logger .Debug ().Str ("channel" , name ).Msg ("channel full, signal dropped" )
686- }
687- }
688-
689660// processPendingEvents fetches and processes pending events from cache
690661// optimistically fetches the next events from cache until no matching heights are found
691662func (s * Syncer ) processPendingEvents () {
0 commit comments