Skip to content

Commit 5a0a8fa

Browse files
feat: add Prometheus metrics for DA submission failures (#2756)
Add comprehensive Prometheus metrics to track DA submission failures and retry behavior in the sequencer. Closes #2755 ## Changes - Added `da_submitter_failures_total{reason}` counter - Added `da_submitter_last_failure_timestamp{reason}` gauge - Added `da_submitter_pending_blobs` gauge - Added `da_submitter_resends_total` counter 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: Marko <tac0turtle@users.noreply.github.com>
1 parent c315175 commit 5a0a8fa

File tree

7 files changed

+193
-39
lines changed

7 files changed

+193
-39
lines changed

block/components.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func NewSyncComponents(
161161
)
162162

163163
// Create DA submitter for sync nodes (no signer, only DA inclusion processing)
164-
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, logger)
164+
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger)
165165
submitter := submitting.NewSubmitter(
166166
store,
167167
exec,
@@ -240,7 +240,7 @@ func NewAggregatorComponents(
240240
}
241241

242242
// Create DA submitter for aggregator nodes (with signer for submission)
243-
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, logger)
243+
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger)
244244
submitter := submitting.NewSubmitter(
245245
store,
246246
exec,

block/internal/common/metrics.go

Lines changed: 116 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,34 @@ const (
1313
MetricsSubsystem = "sequencer"
1414
)
1515

16+
// DASubmitterFailureReason represents a typed failure reason for DA submission failures
17+
type DASubmitterFailureReason string
18+
19+
const (
20+
DASubmitterFailureReasonAlreadyRejected DASubmitterFailureReason = "already_rejected"
21+
DASubmitterFailureReasonInsufficientFee DASubmitterFailureReason = "insufficient_fee"
22+
DASubmitterFailureReasonTimeout DASubmitterFailureReason = "timeout"
23+
DASubmitterFailureReasonAlreadyInMempool DASubmitterFailureReason = "already_in_mempool"
24+
DASubmitterFailureReasonNotIncludedInBlock DASubmitterFailureReason = "not_included_in_block"
25+
DASubmitterFailureReasonTooBig DASubmitterFailureReason = "too_big"
26+
DASubmitterFailureReasonContextCanceled DASubmitterFailureReason = "context_canceled"
27+
DASubmitterFailureReasonUnknown DASubmitterFailureReason = "unknown"
28+
)
29+
30+
// AllDASubmitterFailureReasons returns all possible failure reasons
31+
func AllDASubmitterFailureReasons() []DASubmitterFailureReason {
32+
return []DASubmitterFailureReason{
33+
DASubmitterFailureReasonAlreadyRejected,
34+
DASubmitterFailureReasonInsufficientFee,
35+
DASubmitterFailureReasonTimeout,
36+
DASubmitterFailureReasonAlreadyInMempool,
37+
DASubmitterFailureReasonNotIncludedInBlock,
38+
DASubmitterFailureReasonTooBig,
39+
DASubmitterFailureReasonContextCanceled,
40+
DASubmitterFailureReasonUnknown,
41+
}
42+
}
43+
1644
// Metrics contains all metrics exposed by this package.
1745
type Metrics struct {
1846
// Original metrics
@@ -63,6 +91,12 @@ type Metrics struct {
6391
// State transition metrics
6492
StateTransitions map[string]metrics.Counter
6593
InvalidTransitions metrics.Counter
94+
95+
// DA Submitter metrics
96+
DASubmitterFailures map[DASubmitterFailureReason]metrics.Counter // Counter with reason label
97+
DASubmitterLastFailure map[DASubmitterFailureReason]metrics.Gauge // Timestamp gauge with reason label
98+
DASubmitterPendingBlobs metrics.Gauge // Total number of blobs awaiting submission (backlog)
99+
DASubmitterResends metrics.Counter // Number of resend attempts
66100
}
67101

68102
// PrometheusMetrics returns Metrics built using Prometheus client library
@@ -73,10 +107,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
73107
}
74108

75109
m := &Metrics{
76-
ChannelBufferUsage: make(map[string]metrics.Gauge),
77-
ErrorsByType: make(map[string]metrics.Counter),
78-
OperationDuration: make(map[string]metrics.Histogram),
79-
StateTransitions: make(map[string]metrics.Counter),
110+
ChannelBufferUsage: make(map[string]metrics.Gauge),
111+
ErrorsByType: make(map[string]metrics.Counter),
112+
OperationDuration: make(map[string]metrics.Histogram),
113+
StateTransitions: make(map[string]metrics.Counter),
114+
DASubmitterFailures: make(map[DASubmitterFailureReason]metrics.Counter),
115+
DASubmitterLastFailure: make(map[DASubmitterFailureReason]metrics.Gauge),
80116
}
81117

82118
// Original metrics
@@ -349,6 +385,44 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
349385
}, labels).With(labelsAndValues...)
350386
}
351387

388+
// DA Submitter metrics
389+
m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
390+
Namespace: namespace,
391+
Subsystem: MetricsSubsystem,
392+
Name: "da_submitter_pending_blobs",
393+
Help: "Total number of blobs awaiting DA submission (backlog)",
394+
}, labels).With(labelsAndValues...)
395+
396+
m.DASubmitterResends = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
397+
Namespace: namespace,
398+
Subsystem: MetricsSubsystem,
399+
Name: "da_submitter_resends_total",
400+
Help: "Total number of DA submission retry attempts",
401+
}, labels).With(labelsAndValues...)
402+
403+
// Initialize DA submitter failure counters and timestamps for various reasons
404+
for _, reason := range AllDASubmitterFailureReasons() {
405+
m.DASubmitterFailures[reason] = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
406+
Namespace: namespace,
407+
Subsystem: MetricsSubsystem,
408+
Name: "da_submitter_failures_total",
409+
Help: "Total number of DA submission failures by reason",
410+
ConstLabels: map[string]string{
411+
"reason": string(reason),
412+
},
413+
}, labels).With(labelsAndValues...)
414+
415+
m.DASubmitterLastFailure[reason] = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
416+
Namespace: namespace,
417+
Subsystem: MetricsSubsystem,
418+
Name: "da_submitter_last_failure_timestamp",
419+
Help: "Unix timestamp of the last DA submission failure by reason",
420+
ConstLabels: map[string]string{
421+
"reason": string(reason),
422+
},
423+
}, labels).With(labelsAndValues...)
424+
}
425+
352426
return m
353427
}
354428

@@ -363,34 +437,38 @@ func NopMetrics() *Metrics {
363437
CommittedHeight: discard.NewGauge(),
364438

365439
// Extended metrics
366-
ChannelBufferUsage: make(map[string]metrics.Gauge),
367-
ErrorsByType: make(map[string]metrics.Counter),
368-
OperationDuration: make(map[string]metrics.Histogram),
369-
StateTransitions: make(map[string]metrics.Counter),
370-
DroppedSignals: discard.NewCounter(),
371-
RecoverableErrors: discard.NewCounter(),
372-
NonRecoverableErrors: discard.NewCounter(),
373-
GoroutineCount: discard.NewGauge(),
374-
DASubmissionAttempts: discard.NewCounter(),
375-
DASubmissionSuccesses: discard.NewCounter(),
376-
DASubmissionFailures: discard.NewCounter(),
377-
DARetrievalAttempts: discard.NewCounter(),
378-
DARetrievalSuccesses: discard.NewCounter(),
379-
DARetrievalFailures: discard.NewCounter(),
380-
DAInclusionHeight: discard.NewGauge(),
381-
PendingHeadersCount: discard.NewGauge(),
382-
PendingDataCount: discard.NewGauge(),
383-
SyncLag: discard.NewGauge(),
384-
HeadersSynced: discard.NewCounter(),
385-
DataSynced: discard.NewCounter(),
386-
BlocksApplied: discard.NewCounter(),
387-
InvalidHeadersCount: discard.NewCounter(),
388-
BlockProductionTime: discard.NewHistogram(),
389-
EmptyBlocksProduced: discard.NewCounter(),
390-
LazyBlocksProduced: discard.NewCounter(),
391-
NormalBlocksProduced: discard.NewCounter(),
392-
TxsPerBlock: discard.NewHistogram(),
393-
InvalidTransitions: discard.NewCounter(),
440+
ChannelBufferUsage: make(map[string]metrics.Gauge),
441+
ErrorsByType: make(map[string]metrics.Counter),
442+
OperationDuration: make(map[string]metrics.Histogram),
443+
StateTransitions: make(map[string]metrics.Counter),
444+
DroppedSignals: discard.NewCounter(),
445+
RecoverableErrors: discard.NewCounter(),
446+
NonRecoverableErrors: discard.NewCounter(),
447+
GoroutineCount: discard.NewGauge(),
448+
DASubmissionAttempts: discard.NewCounter(),
449+
DASubmissionSuccesses: discard.NewCounter(),
450+
DASubmissionFailures: discard.NewCounter(),
451+
DARetrievalAttempts: discard.NewCounter(),
452+
DARetrievalSuccesses: discard.NewCounter(),
453+
DARetrievalFailures: discard.NewCounter(),
454+
DAInclusionHeight: discard.NewGauge(),
455+
PendingHeadersCount: discard.NewGauge(),
456+
PendingDataCount: discard.NewGauge(),
457+
SyncLag: discard.NewGauge(),
458+
HeadersSynced: discard.NewCounter(),
459+
DataSynced: discard.NewCounter(),
460+
BlocksApplied: discard.NewCounter(),
461+
InvalidHeadersCount: discard.NewCounter(),
462+
BlockProductionTime: discard.NewHistogram(),
463+
EmptyBlocksProduced: discard.NewCounter(),
464+
LazyBlocksProduced: discard.NewCounter(),
465+
NormalBlocksProduced: discard.NewCounter(),
466+
TxsPerBlock: discard.NewHistogram(),
467+
InvalidTransitions: discard.NewCounter(),
468+
DASubmitterFailures: make(map[DASubmitterFailureReason]metrics.Counter),
469+
DASubmitterLastFailure: make(map[DASubmitterFailureReason]metrics.Gauge),
470+
DASubmitterPendingBlobs: discard.NewGauge(),
471+
DASubmitterResends: discard.NewCounter(),
394472
}
395473

396474
// Initialize maps with no-op metrics
@@ -414,5 +492,11 @@ func NopMetrics() *Metrics {
414492
m.StateTransitions[transition] = discard.NewCounter()
415493
}
416494

495+
// Initialize DA submitter failure maps with no-op metrics
496+
for _, reason := range AllDASubmitterFailureReasons() {
497+
m.DASubmitterFailures[reason] = discard.NewCounter()
498+
m.DASubmitterLastFailure[reason] = discard.NewGauge()
499+
}
500+
417501
return m
418502
}

block/internal/submitting/da_submitter.go

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ type DASubmitter struct {
120120
genesis genesis.Genesis
121121
options common.BlockOptions
122122
logger zerolog.Logger
123+
metrics *common.Metrics
123124

124125
// calculate namespaces bytes once and reuse them
125126
namespaceBz []byte
@@ -132,6 +133,7 @@ func NewDASubmitter(
132133
config config.Config,
133134
genesis genesis.Genesis,
134135
options common.BlockOptions,
136+
metrics *common.Metrics,
135137
logger zerolog.Logger,
136138
) *DASubmitter {
137139
daSubmitterLogger := logger.With().Str("component", "da_submitter").Logger()
@@ -141,17 +143,37 @@ func NewDASubmitter(
141143
server.SetDAVisualizationServer(server.NewDAVisualizationServer(da, visualizerLogger, config.Node.Aggregator))
142144
}
143145

146+
// Use NoOp metrics if nil to avoid nil checks throughout the code
147+
if metrics == nil {
148+
metrics = common.NopMetrics()
149+
}
150+
144151
return &DASubmitter{
145152
da: da,
146153
config: config,
147154
genesis: genesis,
148155
options: options,
156+
metrics: metrics,
149157
logger: daSubmitterLogger,
150158
namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(),
151159
namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
152160
}
153161
}
154162

163+
// recordFailure records a DA submission failure in metrics
164+
func (s *DASubmitter) recordFailure(reason common.DASubmitterFailureReason) {
165+
counter, ok := s.metrics.DASubmitterFailures[reason]
166+
if !ok {
167+
s.logger.Warn().Str("reason", string(reason)).Msg("unregistered failure reason, metric not recorded")
168+
return
169+
}
170+
counter.Add(1)
171+
172+
if gauge, ok := s.metrics.DASubmitterLastFailure[reason]; ok {
173+
gauge.Set(float64(time.Now().Unix()))
174+
}
175+
}
176+
155177
// getGasMultiplier fetches the gas multiplier from DA layer with fallback and clamping
156178
func (s *DASubmitter) getGasMultiplier(ctx context.Context, pol retryPolicy) float64 {
157179
gasMultiplier, err := s.da.GasMultiplier(ctx)
@@ -215,6 +237,7 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager) er
215237
s.namespaceBz,
216238
[]byte(s.config.DA.SubmitOptions),
217239
cache,
240+
func() uint64 { return cache.NumPendingHeaders() },
218241
)
219242
}
220243

@@ -258,6 +281,7 @@ func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signe
258281
s.namespaceDataBz,
259282
[]byte(s.config.DA.SubmitOptions),
260283
cache,
284+
func() uint64 { return cache.NumPendingData() },
261285
)
262286
}
263287

@@ -328,6 +352,7 @@ func submitToDA[T any](
328352
namespace []byte,
329353
options []byte,
330354
cache cache.Manager,
355+
getTotalPendingFn func() uint64,
331356
) error {
332357
marshaled, err := marshalItems(ctx, items, marshalFn, itemType)
333358
if err != nil {
@@ -352,8 +377,18 @@ func submitToDA[T any](
352377
marshaled = batchMarshaled
353378
}
354379

380+
// Update pending blobs metric to track total backlog
381+
if getTotalPendingFn != nil {
382+
s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn()))
383+
}
384+
355385
// Start the retry loop
356386
for rs.Attempt < pol.MaxAttempts {
387+
// Record resend metric for retry attempts (not the first attempt)
388+
if rs.Attempt > 0 {
389+
s.metrics.DASubmitterResends.Add(1)
390+
}
391+
357392
if err := waitForBackoffOrContext(ctx, rs.Backoff); err != nil {
358393
return err
359394
}
@@ -375,14 +410,24 @@ func submitToDA[T any](
375410
s.logger.Info().Str("itemType", itemType).Float64("gasPrice", rs.GasPrice).Uint64("count", res.SubmittedCount).Msg("successfully submitted items to DA layer")
376411
if int(res.SubmittedCount) == len(items) {
377412
rs.Next(reasonSuccess, pol, gm, sentinelNoGas)
413+
// Update pending blobs metric to reflect total backlog
414+
if getTotalPendingFn != nil {
415+
s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn()))
416+
}
378417
return nil
379418
}
380419
// partial success: advance window
381420
items = items[res.SubmittedCount:]
382421
marshaled = marshaled[res.SubmittedCount:]
383422
rs.Next(reasonSuccess, pol, gm, sentinelNoGas)
423+
// Update pending blobs count to reflect total backlog
424+
if getTotalPendingFn != nil {
425+
s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn()))
426+
}
384427

385428
case coreda.StatusTooBig:
429+
// Record failure metric
430+
s.recordFailure(common.DASubmitterFailureReasonTooBig)
386431
// Iteratively halve until it fits or single-item too big
387432
if len(items) == 1 {
388433
s.logger.Error().Str("itemType", itemType).Msg("single item exceeds DA blob size limit")
@@ -397,21 +442,39 @@ func submitToDA[T any](
397442
marshaled = marshaled[:half]
398443
s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying")
399444
rs.Next(reasonTooBig, pol, gm, sentinelNoGas)
445+
// Update pending blobs count to reflect total backlog
446+
if getTotalPendingFn != nil {
447+
s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn()))
448+
}
449+
450+
case coreda.StatusNotIncludedInBlock:
451+
// Record failure metric
452+
s.recordFailure(common.DASubmitterFailureReasonNotIncludedInBlock)
453+
s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state")
454+
rs.Next(reasonMempool, pol, gm, sentinelNoGas)
400455

401-
case coreda.StatusNotIncludedInBlock, coreda.StatusAlreadyInMempool:
456+
case coreda.StatusAlreadyInMempool:
457+
// Record failure metric
458+
s.recordFailure(common.DASubmitterFailureReasonAlreadyInMempool)
402459
s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state")
403460
rs.Next(reasonMempool, pol, gm, sentinelNoGas)
404461

405462
case coreda.StatusContextCanceled:
463+
// Record failure metric
464+
s.recordFailure(common.DASubmitterFailureReasonContextCanceled)
406465
s.logger.Info().Msg("DA layer submission canceled due to context cancellation")
407466
return context.Canceled
408467

409468
default:
469+
// Record failure metric
470+
s.recordFailure(common.DASubmitterFailureReasonUnknown)
410471
s.logger.Error().Str("error", res.Message).Int("attempt", rs.Attempt+1).Msg("DA layer submission failed")
411472
rs.Next(reasonFailure, pol, gm, sentinelNoGas)
412473
}
413474
}
414475

476+
// Final failure after max attempts
477+
s.recordFailure(common.DASubmitterFailureReasonTimeout)
415478
return fmt.Errorf("failed to submit all %s(s) to DA layer after %d attempts", itemType, rs.Attempt)
416479
}
417480

block/internal/submitting/da_submitter_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted(
8686
dummyDA := coreda.NewDummyDA(10_000_000, 0, 0, 10*time.Millisecond)
8787

8888
// Create DA submitter
89-
daSubmitter := NewDASubmitter(dummyDA, cfg, gen, common.DefaultBlockOptions(), zerolog.Nop())
89+
daSubmitter := NewDASubmitter(dummyDA, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop())
9090

9191
// Submit headers and data
9292
require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), cm))

0 commit comments

Comments
 (0)