Skip to content

Commit 705317c

Browse files
author
tac0turtle
committed
pull out submitter
1 parent c9e1a5d commit 705317c

2 files changed

Lines changed: 144 additions & 135 deletions

File tree

block/manager.go

Lines changed: 0 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
goheaderstore "github.com/celestiaorg/go-header"
1616
ds "github.com/ipfs/go-datastore"
1717
"github.com/libp2p/go-libp2p/core/crypto"
18-
"google.golang.org/protobuf/proto"
1918

2019
"github.com/rollkit/go-sequencing"
2120

@@ -477,26 +476,6 @@ func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) {
477476
return nil, ErrNoBatch
478477
}
479478

480-
// HeaderSubmissionLoop is responsible for submitting blocks to the DA layer.
481-
func (m *Manager) HeaderSubmissionLoop(ctx context.Context) {
482-
timer := time.NewTicker(m.config.DA.BlockTime.Duration)
483-
defer timer.Stop()
484-
for {
485-
select {
486-
case <-ctx.Done():
487-
return
488-
case <-timer.C:
489-
}
490-
if m.pendingHeaders.isEmpty() {
491-
continue
492-
}
493-
err := m.submitHeadersToDA(ctx)
494-
if err != nil {
495-
m.logger.Error("error while submitting block to DA", "error", err)
496-
}
497-
}
498-
}
499-
500479
func (m *Manager) isUsingExpectedCentralizedSequencer(header *types.SignedHeader) bool {
501480
return bytes.Equal(header.ProposerAddress, m.genesis.ProposerAddress) && header.ValidateBasic() == nil
502481
}
@@ -722,120 +701,6 @@ func (m *Manager) recordMetrics(data *types.Data) {
722701
m.metrics.CommittedHeight.Set(float64(data.Metadata.Height))
723702
}
724703

725-
func (m *Manager) submitHeadersToDA(ctx context.Context) error {
726-
submittedAllHeaders := false
727-
var backoff time.Duration
728-
headersToSubmit, err := m.pendingHeaders.getPendingHeaders(ctx)
729-
if len(headersToSubmit) == 0 {
730-
// There are no pending headers; return because there's nothing to do, but:
731-
// - it might be caused by error, then err != nil
732-
// - all pending headers are processed, then err == nil
733-
// whatever the reason, error information is propagated correctly to the caller
734-
return err
735-
}
736-
if err != nil {
737-
// There are some pending blocks but also an error. It's very unlikely case - probably some error while reading
738-
// headers from the store.
739-
// The error is logged and normal processing of pending blocks continues.
740-
m.logger.Error("error while fetching blocks pending DA", "err", err)
741-
}
742-
numSubmittedHeaders := 0
743-
attempt := 0
744-
maxBlobSize, err := m.dalc.MaxBlobSize(ctx)
745-
if err != nil {
746-
return err
747-
}
748-
initialMaxBlobSize := maxBlobSize
749-
gasPrice := m.gasPrice
750-
initialGasPrice := gasPrice
751-
752-
daSubmitRetryLoop:
753-
for !submittedAllHeaders && attempt < maxSubmitAttempts {
754-
select {
755-
case <-ctx.Done():
756-
break daSubmitRetryLoop
757-
case <-time.After(backoff):
758-
}
759-
760-
headersBz := make([][]byte, len(headersToSubmit))
761-
for i, header := range headersToSubmit {
762-
headerPb, err := header.ToProto()
763-
if err != nil {
764-
// do we drop the header from attempting to be submitted?
765-
return fmt.Errorf("failed to transform header to proto: %w", err)
766-
}
767-
headersBz[i], err = proto.Marshal(headerPb)
768-
if err != nil {
769-
// do we drop the header from attempting to be submitted?
770-
return fmt.Errorf("failed to marshal header: %w", err)
771-
}
772-
}
773-
774-
ctx, cancel := context.WithTimeout(ctx, 60*time.Second) //TODO: make this configurable
775-
defer cancel()
776-
res := m.dalc.Submit(ctx, headersBz, maxBlobSize, gasPrice)
777-
switch res.Code {
778-
case coreda.StatusSuccess:
779-
m.logger.Info("successfully submitted Rollkit headers to DA layer", "gasPrice", gasPrice, "daHeight", res.Height, "headerCount", res.SubmittedCount)
780-
if res.SubmittedCount == uint64(len(headersToSubmit)) {
781-
submittedAllHeaders = true
782-
}
783-
submittedBlocks, notSubmittedBlocks := headersToSubmit[:res.SubmittedCount], headersToSubmit[res.SubmittedCount:]
784-
numSubmittedHeaders += len(submittedBlocks)
785-
for _, block := range submittedBlocks {
786-
m.headerCache.SetDAIncluded(block.Hash().String())
787-
err = m.setDAIncludedHeight(ctx, block.Height())
788-
if err != nil {
789-
return err
790-
}
791-
}
792-
lastSubmittedHeight := uint64(0)
793-
if l := len(submittedBlocks); l > 0 {
794-
lastSubmittedHeight = submittedBlocks[l-1].Height()
795-
}
796-
m.pendingHeaders.setLastSubmittedHeight(ctx, lastSubmittedHeight)
797-
headersToSubmit = notSubmittedBlocks
798-
// reset submission options when successful
799-
// scale back gasPrice gradually
800-
backoff = 0
801-
maxBlobSize = initialMaxBlobSize
802-
if m.gasMultiplier > 0 && gasPrice != -1 {
803-
gasPrice = gasPrice / m.gasMultiplier
804-
if gasPrice < initialGasPrice {
805-
gasPrice = initialGasPrice
806-
}
807-
}
808-
m.logger.Debug("resetting DA layer submission options", "backoff", backoff, "gasPrice", gasPrice, "maxBlobSize", maxBlobSize)
809-
case coreda.StatusNotIncludedInBlock, coreda.StatusAlreadyInMempool:
810-
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
811-
backoff = m.config.DA.BlockTime.Duration * time.Duration(m.config.DA.MempoolTTL) //nolint:gosec
812-
if m.gasMultiplier > 0 && gasPrice != -1 {
813-
gasPrice = gasPrice * m.gasMultiplier
814-
}
815-
m.logger.Info("retrying DA layer submission with", "backoff", backoff, "gasPrice", gasPrice, "maxBlobSize", maxBlobSize)
816-
817-
case coreda.StatusTooBig:
818-
maxBlobSize = maxBlobSize / 4
819-
fallthrough
820-
default:
821-
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
822-
backoff = m.exponentialBackoff(backoff)
823-
}
824-
825-
attempt += 1
826-
}
827-
828-
if !submittedAllHeaders {
829-
return fmt.Errorf(
830-
"failed to submit all blocks to DA layer, submitted %d blocks (%d left) after %d attempts",
831-
numSubmittedHeaders,
832-
len(headersToSubmit),
833-
attempt,
834-
)
835-
}
836-
return nil
837-
}
838-
839704
func (m *Manager) exponentialBackoff(backoff time.Duration) time.Duration {
840705
backoff *= 2
841706
if backoff == 0 {

block/submitter.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package block
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
coreda "github.com/rollkit/rollkit/core/da"
9+
"google.golang.org/protobuf/proto"
10+
)
11+
12+
// HeaderSubmissionLoop is responsible for submitting blocks to the DA layer.
13+
func (m *Manager) HeaderSubmissionLoop(ctx context.Context) {
14+
timer := time.NewTicker(m.config.DA.BlockTime.Duration)
15+
defer timer.Stop()
16+
for {
17+
select {
18+
case <-ctx.Done():
19+
return
20+
case <-timer.C:
21+
}
22+
if m.pendingHeaders.isEmpty() {
23+
continue
24+
}
25+
err := m.submitHeadersToDA(ctx)
26+
if err != nil {
27+
m.logger.Error("error while submitting block to DA", "error", err)
28+
}
29+
}
30+
}
31+
32+
func (m *Manager) submitHeadersToDA(ctx context.Context) error {
33+
submittedAllHeaders := false
34+
var backoff time.Duration
35+
headersToSubmit, err := m.pendingHeaders.getPendingHeaders(ctx)
36+
if len(headersToSubmit) == 0 {
37+
// There are no pending headers; return because there's nothing to do, but:
38+
// - it might be caused by error, then err != nil
39+
// - all pending headers are processed, then err == nil
40+
// whatever the reason, error information is propagated correctly to the caller
41+
return err
42+
}
43+
if err != nil {
44+
// There are some pending blocks but also an error. It's very unlikely case - probably some error while reading
45+
// headers from the store.
46+
// The error is logged and normal processing of pending blocks continues.
47+
m.logger.Error("error while fetching blocks pending DA", "err", err)
48+
}
49+
numSubmittedHeaders := 0
50+
attempt := 0
51+
maxBlobSize, err := m.dalc.MaxBlobSize(ctx)
52+
if err != nil {
53+
return err
54+
}
55+
initialMaxBlobSize := maxBlobSize
56+
gasPrice := m.gasPrice
57+
initialGasPrice := gasPrice
58+
59+
daSubmitRetryLoop:
60+
for !submittedAllHeaders && attempt < maxSubmitAttempts {
61+
select {
62+
case <-ctx.Done():
63+
break daSubmitRetryLoop
64+
case <-time.After(backoff):
65+
}
66+
67+
headersBz := make([][]byte, len(headersToSubmit))
68+
for i, header := range headersToSubmit {
69+
headerPb, err := header.ToProto()
70+
if err != nil {
71+
// do we drop the header from attempting to be submitted?
72+
return fmt.Errorf("failed to transform header to proto: %w", err)
73+
}
74+
headersBz[i], err = proto.Marshal(headerPb)
75+
if err != nil {
76+
// do we drop the header from attempting to be submitted?
77+
return fmt.Errorf("failed to marshal header: %w", err)
78+
}
79+
}
80+
81+
ctx, cancel := context.WithTimeout(ctx, 60*time.Second) //TODO: make this configurable
82+
defer cancel()
83+
res := m.dalc.Submit(ctx, headersBz, maxBlobSize, gasPrice)
84+
switch res.Code {
85+
case coreda.StatusSuccess:
86+
m.logger.Info("successfully submitted Rollkit headers to DA layer", "gasPrice", gasPrice, "daHeight", res.Height, "headerCount", res.SubmittedCount)
87+
if res.SubmittedCount == uint64(len(headersToSubmit)) {
88+
submittedAllHeaders = true
89+
}
90+
submittedBlocks, notSubmittedBlocks := headersToSubmit[:res.SubmittedCount], headersToSubmit[res.SubmittedCount:]
91+
numSubmittedHeaders += len(submittedBlocks)
92+
for _, block := range submittedBlocks {
93+
m.headerCache.SetDAIncluded(block.Hash().String())
94+
err = m.setDAIncludedHeight(ctx, block.Height())
95+
if err != nil {
96+
return err
97+
}
98+
}
99+
lastSubmittedHeight := uint64(0)
100+
if l := len(submittedBlocks); l > 0 {
101+
lastSubmittedHeight = submittedBlocks[l-1].Height()
102+
}
103+
m.pendingHeaders.setLastSubmittedHeight(ctx, lastSubmittedHeight)
104+
headersToSubmit = notSubmittedBlocks
105+
// reset submission options when successful
106+
// scale back gasPrice gradually
107+
backoff = 0
108+
maxBlobSize = initialMaxBlobSize
109+
if m.gasMultiplier > 0 && gasPrice != -1 {
110+
gasPrice = gasPrice / m.gasMultiplier
111+
if gasPrice < initialGasPrice {
112+
gasPrice = initialGasPrice
113+
}
114+
}
115+
m.logger.Debug("resetting DA layer submission options", "backoff", backoff, "gasPrice", gasPrice, "maxBlobSize", maxBlobSize)
116+
case coreda.StatusNotIncludedInBlock, coreda.StatusAlreadyInMempool:
117+
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
118+
backoff = m.config.DA.BlockTime.Duration * time.Duration(m.config.DA.MempoolTTL) //nolint:gosec
119+
if m.gasMultiplier > 0 && gasPrice != -1 {
120+
gasPrice = gasPrice * m.gasMultiplier
121+
}
122+
m.logger.Info("retrying DA layer submission with", "backoff", backoff, "gasPrice", gasPrice, "maxBlobSize", maxBlobSize)
123+
124+
case coreda.StatusTooBig:
125+
maxBlobSize = maxBlobSize / 4
126+
fallthrough
127+
default:
128+
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
129+
backoff = m.exponentialBackoff(backoff)
130+
}
131+
132+
attempt += 1
133+
}
134+
135+
if !submittedAllHeaders {
136+
return fmt.Errorf(
137+
"failed to submit all blocks to DA layer, submitted %d blocks (%d left) after %d attempts",
138+
numSubmittedHeaders,
139+
len(headersToSubmit),
140+
attempt,
141+
)
142+
}
143+
return nil
144+
}

0 commit comments

Comments
 (0)