@@ -93,7 +93,8 @@ type implementation struct {
9393 dataCache * Cache
9494 txCache * Cache
9595 txTimestamps * sync.Map // map[string]time.Time
96- pendingEvents * pendingEventsMap [common.DAHeightEvent ]
96+ pendingEvents map [uint64 ]* common.DAHeightEvent
97+ pendingMu sync.Mutex
9798 pendingHeaders * PendingHeaders
9899 pendingData * PendingData
99100 store store.Store
@@ -106,7 +107,6 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag
106107 headerCache := NewCache (st , HeaderDAIncludedPrefix )
107108 dataCache := NewCache (st , DataDAIncludedPrefix )
108109 txCache := NewCache (nil , "" )
109- pendingEvents := newPendingEventsMap [common.DAHeightEvent ]()
110110
111111 pendingHeaders , err := NewPendingHeaders (st , logger )
112112 if err != nil {
@@ -123,7 +123,7 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag
123123 dataCache : dataCache ,
124124 txCache : txCache ,
125125 txTimestamps : new (sync.Map ),
126- pendingEvents : pendingEvents ,
126+ pendingEvents : make ( map [ uint64 ] * common. DAHeightEvent ) ,
127127 pendingHeaders : pendingHeaders ,
128128 pendingData : pendingData ,
129129 store : st ,
@@ -252,7 +252,9 @@ func (m *implementation) CleanupOldTxs(olderThan time.Duration) int {
252252func (m * implementation ) DeleteHeight (blockHeight uint64 ) {
253253 m .headerCache .deleteAllForHeight (blockHeight )
254254 m .dataCache .deleteAllForHeight (blockHeight )
255- m .pendingEvents .deleteAllForHeight (blockHeight )
255+ m .pendingMu .Lock ()
256+ delete (m .pendingEvents , blockHeight )
257+ m .pendingMu .Unlock ()
256258
257259 // Note: txCache is intentionally NOT deleted here because:
258260 // 1. Transactions are tracked by hash, not by block height (they use height 0)
@@ -319,17 +321,27 @@ func (m *implementation) NumPendingData() uint64 {
319321
320322// SetPendingEvent sets the event at the specified height.
321323func (m * implementation ) SetPendingEvent (height uint64 , event * common.DAHeightEvent ) {
322- m .pendingEvents .setItem (height , event )
324+ m .pendingMu .Lock ()
325+ m .pendingEvents [height ] = event
326+ m .pendingMu .Unlock ()
323327}
324328
325329func (m * implementation ) PendingEventsCount () int {
326- return m .pendingEvents .itemCount ()
330+ m .pendingMu .Lock ()
331+ defer m .pendingMu .Unlock ()
332+ return len (m .pendingEvents )
327333}
328334
329335// GetNextPendingEvent efficiently retrieves and removes the event at the specified height.
330336// Returns nil if no event exists at that height.
331337func (m * implementation ) GetNextPendingEvent (height uint64 ) * common.DAHeightEvent {
332- return m .pendingEvents .getNextItem (height )
338+ m .pendingMu .Lock ()
339+ item , ok := m .pendingEvents [height ]
340+ if ok {
341+ delete (m .pendingEvents , height )
342+ }
343+ m .pendingMu .Unlock ()
344+ return item
333345}
334346
335347// SaveToStore flushes the DA inclusion snapshot to the store.
@@ -387,7 +399,7 @@ func (m *implementation) ClearFromStore() error {
387399 m .headerCache = NewCache (m .store , HeaderDAIncludedPrefix )
388400 m .dataCache = NewCache (m .store , DataDAIncludedPrefix )
389401 m .txCache = NewCache (nil , "" )
390- m .pendingEvents = newPendingEventsMap [ common.DAHeightEvent ]( )
402+ m .pendingEvents = make ( map [ uint64 ] * common.DAHeightEvent )
391403
392404 // Initialize DA height from store metadata to ensure DaHeight() is never 0.
393405 m .initDAHeightFromStore (ctx )
0 commit comments