Skip to content

Commit cff9581

Browse files
Move batchSubmissionLogic out of sequencer
1 parent 1c043b6 commit cff9581

11 files changed

Lines changed: 247 additions & 336 deletions

File tree

block/daIncluder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func (m *Manager) DAIncluderLoop(ctx context.Context) {
2929
headerHash := header.Hash()
3030
dataHash := data.DACommitment()
3131

32-
if m.headerCache.IsDAIncluded(headerHash.String()) && (bytes.Equal(dataHash, dataHashForEmptyTxs) || m.dataCache.IsDAIncluded(dataHash.String())) {
32+
if m.headerCache.IsDAIncluded(headerHash.String()) && (bytes.Equal(dataHash, DataHashForEmptyTxs) || m.dataCache.IsDAIncluded(dataHash.String())) {
3333
// Both header and data are DA-included, so we can advance the height
3434
if err := m.incrementDAIncludedHeight(ctx); err != nil {
3535
break

block/manager.go

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ const (
6363

6464
var (
6565
// dataHashForEmptyTxs to be used while only syncing headers from DA and no p2p to get the Data for no txs scenarios, the syncing can proceed without getting stuck forever.
66-
dataHashForEmptyTxs = []byte{110, 52, 11, 156, 255, 179, 122, 152, 156, 165, 68, 230, 187, 120, 10, 44, 120, 144, 29, 63, 179, 55, 56, 118, 133, 17, 163, 6, 23, 175, 160, 29}
66+
DataHashForEmptyTxs = []byte{110, 52, 11, 156, 255, 179, 122, 152, 156, 165, 68, 230, 187, 120, 10, 44, 120, 144, 29, 63, 179, 55, 56, 118, 133, 17, 163, 6, 23, 175, 160, 29}
6767

6868
// initialBackoff defines initial value for block submission backoff
6969
initialBackoff = 100 * time.Millisecond
@@ -158,6 +158,9 @@ type Manager struct {
158158

159159
// txNotifyCh is used to signal when new transactions are available
160160
txNotifyCh chan struct{}
161+
162+
// batchSubmissionChan is used to submit batches to the sequencer
163+
batchSubmissionChan chan coresequencer.Batch
161164
}
162165

163166
// getInitialState tries to load lastState from Store, and if it's not available it reads genesis.
@@ -339,37 +342,38 @@ func NewManager(
339342
dalc: dalc,
340343
daHeight: &daH,
341344
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
342-
HeaderCh: make(chan *types.SignedHeader, channelLength),
343-
DataCh: make(chan *types.Data, channelLength),
344-
headerInCh: make(chan NewHeaderEvent, eventInChLength),
345-
dataInCh: make(chan NewDataEvent, eventInChLength),
346-
headerStoreCh: make(chan struct{}, 1),
347-
dataStoreCh: make(chan struct{}, 1),
348-
headerStore: headerStore,
349-
dataStore: dataStore,
350-
lastStateMtx: new(sync.RWMutex),
351-
lastBatchData: lastBatchData,
352-
headerCache: cache.NewCache[types.SignedHeader](),
353-
dataCache: cache.NewCache[types.Data](),
354-
retrieveCh: make(chan struct{}, 1),
355-
daIncluderCh: make(chan struct{}, 1),
356-
logger: logger,
357-
txsAvailable: false,
358-
pendingHeaders: pendingHeaders,
359-
metrics: seqMetrics,
360-
sequencer: sequencer,
361-
exec: exec,
362-
gasPrice: gasPrice,
363-
gasMultiplier: gasMultiplier,
364-
txNotifyCh: make(chan struct{}, 1), // Non-blocking channel
345+
HeaderCh: make(chan *types.SignedHeader, channelLength),
346+
DataCh: make(chan *types.Data, channelLength),
347+
headerInCh: make(chan NewHeaderEvent, eventInChLength),
348+
dataInCh: make(chan NewDataEvent, eventInChLength),
349+
headerStoreCh: make(chan struct{}, 1),
350+
dataStoreCh: make(chan struct{}, 1),
351+
headerStore: headerStore,
352+
dataStore: dataStore,
353+
lastStateMtx: new(sync.RWMutex),
354+
lastBatchData: lastBatchData,
355+
headerCache: cache.NewCache[types.SignedHeader](),
356+
dataCache: cache.NewCache[types.Data](),
357+
retrieveCh: make(chan struct{}, 1),
358+
daIncluderCh: make(chan struct{}, 1),
359+
logger: logger,
360+
txsAvailable: false,
361+
pendingHeaders: pendingHeaders,
362+
metrics: seqMetrics,
363+
sequencer: sequencer,
364+
exec: exec,
365+
gasPrice: gasPrice,
366+
gasMultiplier: gasMultiplier,
367+
txNotifyCh: make(chan struct{}, 1), // Non-blocking channel
368+
batchSubmissionChan: make(chan coresequencer.Batch, eventInChLength),
365369
}
366370
agg.init(ctx)
367371
// Set the default publishBlock implementation
368372
agg.publishBlock = agg.publishBlockInternal
369-
370-
// Set the manager pointer in the sequencer if it is a *single.Sequencer
371-
if s, ok := sequencer.(interface{ SetManager(*Manager) }); ok {
372-
s.SetManager(agg)
373+
if s, ok := agg.sequencer.(interface {
374+
SetBatchSubmissionChan(chan coresequencer.Batch)
375+
}); ok {
376+
s.SetBatchSubmissionChan(agg.batchSubmissionChan)
373377
}
374378

375379
return agg, nil
@@ -478,7 +482,7 @@ func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) {
478482
if len(res.Batch.Transactions) == 0 {
479483
errRetrieveBatch = ErrNoBatch
480484
}
481-
// Even if there are no transactions, update lastBatchData so we dont
485+
// Even if there are no transactions, update lastBatchData so we don't
482486
// repeatedly emit the same empty batch, and persist it to metadata.
483487
if err := m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)); err != nil {
484488
m.logger.Error("error while setting last batch hash", "error", err)
@@ -780,7 +784,7 @@ func (m *Manager) execCreateBlock(_ context.Context, height uint64, lastSignatur
780784
Time: uint64(batchData.UnixNano()), //nolint:gosec // why is time unix? (tac0turtle)
781785
},
782786
LastHeaderHash: lastHeaderHash,
783-
DataHash: dataHashForEmptyTxs, // Use batchDataIDs when available
787+
DataHash: DataHashForEmptyTxs, // Use batchDataIDs when available
784788
ConsensusHash: make(types.Hash, 32),
785789
AppHash: m.lastState.AppHash,
786790
ProposerAddress: m.genesis.ProposerAddress,
@@ -923,6 +927,11 @@ func (m *Manager) NotifyNewTransactions() {
923927
}
924928
}
925929

930+
// HeaderCache returns the headerCache used by the manager.
931+
func (m *Manager) HeaderCache() *cache.Cache[types.SignedHeader] {
932+
return m.headerCache
933+
}
934+
926935
// DataCache returns the dataCache used by the manager.
927936
func (m *Manager) DataCache() *cache.Cache[types.Data] {
928937
return m.dataCache

block/submitter.go

Lines changed: 154 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88
"google.golang.org/protobuf/proto"
99

1010
coreda "github.com/rollkit/rollkit/core/da"
11+
coresequencer "github.com/rollkit/rollkit/core/sequencer"
12+
"github.com/rollkit/rollkit/types"
13+
pb "github.com/rollkit/rollkit/types/pb/rollkit/v1"
1114
)
1215

1316
// HeaderSubmissionLoop is responsible for submitting headers to the DA layer.
@@ -103,7 +106,7 @@ daSubmitRetryLoop:
103106
}
104107
m.pendingHeaders.setLastSubmittedHeight(ctx, lastSubmittedHeight)
105108
headersToSubmit = notSubmittedHeaders
106-
m.daIncluderCh <- struct{}{}
109+
m.sendNonBlockingSignalToDAIncluderCh()
107110
// reset submission options when successful
108111
// scale back gasPrice gradually
109112
backoff = 0
@@ -142,3 +145,153 @@ daSubmitRetryLoop:
142145
}
143146
return nil
144147
}
148+
149+
// BatchSubmissionLoop is responsible for submitting batches to the DA layer.
150+
func (m *Manager) BatchSubmissionLoop(ctx context.Context) {
151+
for {
152+
select {
153+
case <-ctx.Done():
154+
m.logger.Info("Batch submission loop stopped")
155+
return
156+
case batch := <-m.batchSubmissionChan:
157+
err := m.submitBatchToDA(ctx, batch)
158+
if err != nil {
159+
m.logger.Error("failed to submit batch to DA", "error", err)
160+
}
161+
}
162+
}
163+
}
164+
165+
// submitBatchToDA submits a batch of transactions to the Data Availability (DA) layer.
166+
// It implements a retry mechanism with exponential backoff and gas price adjustments
167+
// to handle various failure scenarios.
168+
func (m *Manager) submitBatchToDA(ctx context.Context, batch coresequencer.Batch) error {
169+
currentBatch := batch
170+
submittedAllTxs := false
171+
var backoff time.Duration
172+
totalTxCount := len(batch.Transactions)
173+
submittedTxCount := 0
174+
attempt := 0
175+
176+
// Store initial values to be able to reset or compare later
177+
initialGasPrice, err := m.dalc.GasPrice(ctx)
178+
if err != nil {
179+
return fmt.Errorf("failed to get initial gas price: %w", err)
180+
}
181+
initialMaxBlobSize, err := m.dalc.MaxBlobSize(ctx)
182+
if err != nil {
183+
return fmt.Errorf("failed to get initial max blob size: %w", err)
184+
}
185+
maxBlobSize := initialMaxBlobSize
186+
gasPrice := initialGasPrice
187+
188+
for !submittedAllTxs && attempt < maxSubmitAttempts {
189+
// Wait for backoff duration or exit if context is done
190+
select {
191+
case <-ctx.Done():
192+
return ctx.Err()
193+
case <-time.After(backoff):
194+
}
195+
196+
// Convert batch to protobuf and marshal
197+
batchPb := &pb.Batch{
198+
Txs: currentBatch.Transactions,
199+
}
200+
batchBz, err := proto.Marshal(batchPb)
201+
if err != nil {
202+
return fmt.Errorf("failed to marshal batch: %w", err)
203+
}
204+
205+
// Attempt to submit the batch to the DA layer
206+
res := m.dalc.Submit(ctx, [][]byte{batchBz}, maxBlobSize, gasPrice)
207+
208+
gasMultiplier, err := m.dalc.GasMultiplier(ctx)
209+
if err != nil {
210+
return fmt.Errorf("failed to get gas multiplier: %w", err)
211+
}
212+
213+
switch res.Code {
214+
case coreda.StatusSuccess:
215+
// Count submitted transactions for this attempt
216+
submittedTxs := int(res.SubmittedCount)
217+
m.logger.Info("successfully submitted transactions to DA layer",
218+
"gasPrice", gasPrice,
219+
"height", res.Height,
220+
"submittedTxs", submittedTxs,
221+
"remainingTxs", len(currentBatch.Transactions)-submittedTxs)
222+
223+
// Update overall progress
224+
submittedTxCount += submittedTxs
225+
226+
// Check if all transactions in the current batch were submitted
227+
if submittedTxs == len(currentBatch.Transactions) {
228+
submittedAllTxs = true
229+
} else {
230+
// Update the current batch to contain only the remaining transactions
231+
currentBatch.Transactions = currentBatch.Transactions[submittedTxs:]
232+
}
233+
234+
// Reset submission parameters after success
235+
backoff = 0
236+
maxBlobSize = initialMaxBlobSize
237+
238+
// Gradually reduce gas price on success, but not below initial price
239+
if gasMultiplier > 0 && gasPrice != 0 {
240+
gasPrice = gasPrice / gasMultiplier
241+
if gasPrice < initialGasPrice {
242+
gasPrice = initialGasPrice
243+
}
244+
}
245+
m.logger.Debug("resetting DA layer submission options", "backoff", backoff, "gasPrice", gasPrice)
246+
247+
// Set DA included in manager's dataCache if all txs submitted and manager is set
248+
if submittedAllTxs {
249+
data := &types.Data{
250+
Txs: make(types.Txs, len(currentBatch.Transactions)),
251+
}
252+
for i, tx := range currentBatch.Transactions {
253+
data.Txs[i] = types.Tx(tx)
254+
}
255+
hash := data.DACommitment().String()
256+
if err == nil {
257+
m.DataCache().SetDAIncluded(hash)
258+
}
259+
m.sendNonBlockingSignalToDAIncluderCh()
260+
}
261+
262+
case coreda.StatusNotIncludedInBlock, coreda.StatusAlreadyInMempool:
263+
// For mempool-related issues, use a longer backoff and increase gas price
264+
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
265+
backoff = m.config.DA.BlockTime.Duration * time.Duration(m.config.DA.MempoolTTL)
266+
267+
// Increase gas price to prioritize the transaction
268+
if gasMultiplier > 0 && gasPrice != 0 {
269+
gasPrice = gasPrice * gasMultiplier
270+
}
271+
m.logger.Info("retrying DA layer submission with", "backoff", backoff, "gasPrice", gasPrice)
272+
273+
case coreda.StatusTooBig:
274+
// If the blob is too big, reduce the max blob size
275+
maxBlobSize = maxBlobSize / 4
276+
fallthrough
277+
278+
default:
279+
// For other errors, use exponential backoff
280+
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
281+
backoff = m.exponentialBackoff(backoff)
282+
}
283+
284+
attempt += 1
285+
}
286+
287+
// Return error if not all transactions were submitted after all attempts
288+
if !submittedAllTxs {
289+
return fmt.Errorf(
290+
"failed to submit all transactions to DA layer, submitted %d txs (%d left) after %d attempts",
291+
submittedTxCount,
292+
totalTxCount-submittedTxCount,
293+
attempt,
294+
)
295+
}
296+
return nil
297+
}

block/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
163163

164164
func (m *Manager) handleEmptyDataHash(ctx context.Context, header *types.Header) {
165165
headerHeight := header.Height()
166-
if bytes.Equal(header.DataHash, dataHashForEmptyTxs) {
166+
if bytes.Equal(header.DataHash, DataHashForEmptyTxs) {
167167
var lastDataHash types.Hash
168168
var err error
169169
var lastData *types.Data

block/sync_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,7 @@ func TestHandleEmptyDataHash(t *testing.T) {
767767
// Define the test data
768768
headerHeight := 2
769769
header := &types.Header{
770-
DataHash: dataHashForEmptyTxs,
770+
DataHash: DataHashForEmptyTxs,
771771
BaseHeader: types.BaseHeader{
772772
Height: 2,
773773
Time: uint64(time.Now().UnixNano()),

core/sequencer/dummy.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ import (
1212

1313
// dummySequencer is a dummy implementation of the Sequencer interface for testing
1414
type dummySequencer struct {
15-
mu sync.RWMutex
16-
batches map[string]*Batch
15+
mu sync.RWMutex
16+
batches map[string]*Batch
17+
batchSubmissionChan chan Batch
1718
}
1819

1920
// NewDummySequencer creates a new dummy Sequencer instance
@@ -29,6 +30,9 @@ func (s *dummySequencer) SubmitRollupBatchTxs(ctx context.Context, req SubmitRol
2930
defer s.mu.Unlock()
3031

3132
s.batches[string(req.RollupId)] = req.Batch
33+
if req.Batch != nil && len(req.Batch.Transactions) > 0 {
34+
s.batchSubmissionChan <- *req.Batch
35+
}
3236
return &SubmitRollupBatchTxsResponse{}, nil
3337
}
3438

@@ -54,3 +58,7 @@ func (s *dummySequencer) VerifyBatch(ctx context.Context, req VerifyBatchRequest
5458
Status: true,
5559
}, nil
5660
}
61+
62+
func (s *dummySequencer) SetBatchSubmissionChan(batchSubmissionChan chan Batch) {
63+
s.batchSubmissionChan = batchSubmissionChan
64+
}

node/full.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ func (n *FullNode) Run(ctx context.Context) error {
400400
go n.blockManager.AggregationLoop(ctx)
401401
go n.reaper.Start(ctx)
402402
go n.blockManager.HeaderSubmissionLoop(ctx)
403+
go n.blockManager.BatchSubmissionLoop(ctx)
403404
go n.headerPublishLoop(ctx)
404405
go n.dataPublishLoop(ctx)
405406
go n.blockManager.DAIncluderLoop(ctx)

0 commit comments

Comments
 (0)