@@ -176,6 +176,119 @@ func TestProcessNextDAHeader_Success_SingleHeaderAndData(t *testing.T) {
176176 mockStore .AssertExpectations (t )
177177}
178178
179+ // TestProcessNextDAHeader_MultipleHeadersAndBatches verifies that multiple headers and batches in a single DA block are all processed and corresponding events are emitted.
180+ func TestProcessNextDAHeader_MultipleHeadersAndBatches (t * testing.T ) {
181+ daHeight := uint64 (50 )
182+ startBlockHeight := uint64 (130 )
183+ nHeaders := 50
184+ manager , mockDAClient , _ , _ , _ , _ , cancel := setupManagerForRetrieverTest (t , daHeight )
185+ defer cancel ()
186+
187+ proposerAddr := manager .genesis .ProposerAddress
188+
189+ var blobs [][]byte
190+ var blockHeights []uint64
191+ var txLens []int
192+
193+ invalidBlob := []byte ("not a valid protobuf message" )
194+
195+ for i := 0 ; i < nHeaders ; i ++ {
196+ // Sprinkle an empty blob every 5th position
197+ if i % 5 == 0 {
198+ blobs = append (blobs , []byte {})
199+ }
200+ // Sprinkle an invalid blob every 7th position
201+ if i % 7 == 0 {
202+ blobs = append (blobs , invalidBlob )
203+ }
204+
205+ height := startBlockHeight + uint64 (i )
206+ blockHeights = append (blockHeights , height )
207+
208+ hc := types.HeaderConfig {Height : height , Signer : manager .signer }
209+ header , err := types .GetRandomSignedHeaderCustom (& hc , manager .genesis .ChainID )
210+ require .NoError (t , err )
211+ header .ProposerAddress = proposerAddr
212+ headerProto , err := header .ToProto ()
213+ require .NoError (t , err )
214+ headerBytes , err := proto .Marshal (headerProto )
215+ require .NoError (t , err )
216+ blobs = append (blobs , headerBytes )
217+
218+ ntxs := i + 1 // unique number of txs for each batch
219+ blockConfig := types.BlockConfig {Height : height , NTxs : ntxs , ProposerAddr : proposerAddr }
220+ _ , blockData , _ := types .GenerateRandomBlockCustom (& blockConfig , manager .genesis .ChainID )
221+ txLens = append (txLens , len (blockData .Txs ))
222+ batchProto := & v1.Batch {Txs : make ([][]byte , len (blockData .Txs ))}
223+ for j , tx := range blockData .Txs {
224+ batchProto .Txs [j ] = tx
225+ }
226+ blockDataBytes , err := proto .Marshal (batchProto )
227+ require .NoError (t , err )
228+ blobs = append (blobs , blockDataBytes )
229+ // Sprinkle an empty blob after each batch
230+ if i % 4 == 0 {
231+ blobs = append (blobs , []byte {})
232+ }
233+ }
234+
235+ // Add a few more invalid blobs at the end
236+ blobs = append (blobs , invalidBlob , []byte {})
237+
238+ // DA returns all blobs (headers, batches, and sprinkled invalid/empty blobs)
239+ mockDAClient .On ("Retrieve" , mock .Anything , daHeight ).Return (coreda.ResultRetrieve {
240+ BaseResult : coreda.BaseResult {Code : coreda .StatusSuccess },
241+ Data : blobs ,
242+ }, nil ).Once ()
243+
244+ ctx := context .Background ()
245+ err := manager .processNextDAHeaderAndData (ctx )
246+ require .NoError (t , err )
247+
248+ // Validate all header events
249+ headerEvents := make ([]NewHeaderEvent , 0 , nHeaders )
250+ for i := 0 ; i < nHeaders ; i ++ {
251+ select {
252+ case event := <- manager .headerInCh :
253+ headerEvents = append (headerEvents , event )
254+ case <- time .After (300 * time .Millisecond ):
255+ t .Fatalf ("Expected header event %d not received" , i + 1 )
256+ }
257+ }
258+ // Check all expected heights are present
259+ receivedHeights := make (map [uint64 ]bool )
260+ for _ , event := range headerEvents {
261+ receivedHeights [event .Header .Height ()] = true
262+ assert .Equal (t , daHeight , event .DAHeight )
263+ assert .Equal (t , proposerAddr , event .Header .ProposerAddress )
264+ }
265+ for _ , h := range blockHeights {
266+ assert .True (t , receivedHeights [h ], "Header event for height %d not received" , h )
267+ }
268+
269+ // Validate all data events
270+ dataEvents := make ([]NewDataEvent , 0 , nHeaders )
271+ for i := 0 ; i < nHeaders ; i ++ {
272+ select {
273+ case event := <- manager .dataInCh :
274+ dataEvents = append (dataEvents , event )
275+ case <- time .After (300 * time .Millisecond ):
276+ t .Fatalf ("Expected data event %d not received" , i + 1 )
277+ }
278+ }
279+ // Check all expected tx lens are present
280+ receivedLens := make (map [int ]bool )
281+ for _ , event := range dataEvents {
282+ receivedLens [len (event .Data .Txs )] = true
283+ assert .Equal (t , daHeight , event .DAHeight )
284+ }
285+ for _ , l := range txLens {
286+ assert .True (t , receivedLens [l ], "Data event for tx count %d not received" , l )
287+ }
288+
289+ mockDAClient .AssertExpectations (t )
290+ }
291+
179292// TestProcessNextDAHeaderAndData_NotFound verifies that no events are emitted when DA returns NotFound.
180293func TestProcessNextDAHeaderAndData_NotFound (t * testing.T ) {
181294 daHeight := uint64 (25 )
@@ -493,95 +606,6 @@ func TestRetrieveLoop_ProcessError_Other(t *testing.T) {
493606 }
494607}
495608
496- // TestProcessNextDAHeader_MultipleHeadersAndBatches verifies that multiple headers and batches in a single DA block are all processed and corresponding events are emitted.
497- func TestProcessNextDAHeader_MultipleHeadersAndBatches (t * testing.T ) {
498- daHeight := uint64 (50 )
499- blockHeight1 := uint64 (130 )
500- blockHeight2 := uint64 (131 )
501- manager , mockDAClient , _ , _ , _ , _ , cancel := setupManagerForRetrieverTest (t , daHeight )
502- defer cancel ()
503-
504- proposerAddr := manager .genesis .ProposerAddress
505-
506- // First header and batch
507- hc1 := types.HeaderConfig {Height : blockHeight1 , Signer : manager .signer }
508- header1 , err := types .GetRandomSignedHeaderCustom (& hc1 , manager .genesis .ChainID )
509- require .NoError (t , err )
510- header1 .ProposerAddress = proposerAddr
511- headerProto1 , err := header1 .ToProto ()
512- require .NoError (t , err )
513- headerBytes1 , err := proto .Marshal (headerProto1 )
514- require .NoError (t , err )
515-
516- blockConfig1 := types.BlockConfig {Height : blockHeight1 , NTxs : 1 , ProposerAddr : proposerAddr }
517- _ , blockData1 , _ := types .GenerateRandomBlockCustom (& blockConfig1 , manager .genesis .ChainID )
518- batchProto1 := & v1.Batch {Txs : make ([][]byte , len (blockData1 .Txs ))}
519- for i , tx := range blockData1 .Txs {
520- batchProto1 .Txs [i ] = tx
521- }
522- blockDataBytes1 , err := proto .Marshal (batchProto1 )
523- require .NoError (t , err )
524-
525- // Second header and batch
526- hc2 := types.HeaderConfig {Height : blockHeight2 , Signer : manager .signer }
527- header2 , err := types .GetRandomSignedHeaderCustom (& hc2 , manager .genesis .ChainID )
528- require .NoError (t , err )
529- header2 .ProposerAddress = proposerAddr
530- headerProto2 , err := header2 .ToProto ()
531- require .NoError (t , err )
532- headerBytes2 , err := proto .Marshal (headerProto2 )
533- require .NoError (t , err )
534-
535- blockConfig2 := types.BlockConfig {Height : blockHeight2 , NTxs : 2 , ProposerAddr : proposerAddr }
536- _ , blockData2 , _ := types .GenerateRandomBlockCustom (& blockConfig2 , manager .genesis .ChainID )
537- batchProto2 := & v1.Batch {Txs : make ([][]byte , len (blockData2 .Txs ))}
538- for i , tx := range blockData2 .Txs {
539- batchProto2 .Txs [i ] = tx
540- }
541- blockDataBytes2 , err := proto .Marshal (batchProto2 )
542- require .NoError (t , err )
543-
544- // DA returns all four blobs (2 headers, 2 batches)
545- mockDAClient .On ("Retrieve" , mock .Anything , daHeight ).Return (coreda.ResultRetrieve {
546- BaseResult : coreda.BaseResult {Code : coreda .StatusSuccess },
547- Data : [][]byte {headerBytes1 , blockDataBytes1 , headerBytes2 , blockDataBytes2 },
548- }, nil ).Once ()
549-
550- ctx := context .Background ()
551- err = manager .processNextDAHeaderAndData (ctx )
552- require .NoError (t , err )
553-
554- // Validate both header events
555- headerEvents := []NewHeaderEvent {}
556- for i := 0 ; i < 2 ; i ++ {
557- select {
558- case event := <- manager .headerInCh :
559- headerEvents = append (headerEvents , event )
560- case <- time .After (100 * time .Millisecond ):
561- t .Fatalf ("Expected header event %d not received" , i + 1 )
562- }
563- }
564- headerHeights := []uint64 {headerEvents [0 ].Header .Height (), headerEvents [1 ].Header .Height ()}
565- assert .Contains (t , headerHeights , blockHeight1 )
566- assert .Contains (t , headerHeights , blockHeight2 )
567-
568- // Validate both data events
569- dataEvents := []NewDataEvent {}
570- for i := 0 ; i < 2 ; i ++ {
571- select {
572- case event := <- manager .dataInCh :
573- dataEvents = append (dataEvents , event )
574- case <- time .After (100 * time .Millisecond ):
575- t .Fatalf ("Expected data event %d not received" , i + 1 )
576- }
577- }
578- dataTxLens := []int {len (dataEvents [0 ].Data .Txs ), len (dataEvents [1 ].Data .Txs )}
579- assert .Contains (t , dataTxLens , 1 )
580- assert .Contains (t , dataTxLens , 2 )
581-
582- mockDAClient .AssertExpectations (t )
583- }
584-
585609// TestProcessNextDAHeader_BatchWithNoTxs verifies that a batch with no transactions is ignored and does not emit events or mark as DA included.
586610func TestProcessNextDAHeader_BatchWithNoTxs (t * testing.T ) {
587611 daHeight := uint64 (55 )
0 commit comments