Skip to content

Commit e867b3f

Browse files
authored
feat(syncing): add grace period for missing force txs inclusion (#2915)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview ref: #2906 Add grace period for missing force txs inclusion. The grace period of 1 epoch can increase depending of the chain congestion (size of blocks). The grace period becomes elastic if the chain congested, to give to the sequencer to process all txs before being considered malicious. <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent 8b2b8d6 commit e867b3f

5 files changed

Lines changed: 804 additions & 186 deletions

File tree

block/internal/common/metrics.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ type Metrics struct {
6565
DAInclusionHeight metrics.Gauge
6666
PendingHeadersCount metrics.Gauge
6767
PendingDataCount metrics.Gauge
68+
69+
// Forced inclusion metrics
70+
ForcedInclusionTxsInGracePeriod metrics.Gauge // Number of forced inclusion txs currently in grace period
71+
ForcedInclusionTxsMalicious metrics.Counter // Total number of forced inclusion txs marked as malicious
6872
}
6973

7074
// PrometheusMetrics returns Metrics built using Prometheus client library
@@ -182,6 +186,21 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
182186
Help: "Number of data blocks pending DA submission",
183187
}, labels).With(labelsAndValues...)
184188

189+
// Forced inclusion metrics
190+
m.ForcedInclusionTxsInGracePeriod = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
191+
Namespace: namespace,
192+
Subsystem: MetricsSubsystem,
193+
Name: "forced_inclusion_txs_in_grace_period",
194+
Help: "Number of forced inclusion transactions currently in grace period (past epoch end but within grace boundary)",
195+
}, labels).With(labelsAndValues...)
196+
197+
m.ForcedInclusionTxsMalicious = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
198+
Namespace: namespace,
199+
Subsystem: MetricsSubsystem,
200+
Name: "forced_inclusion_txs_malicious_total",
201+
Help: "Total number of forced inclusion transactions marked as malicious (past grace boundary)",
202+
}, labels).With(labelsAndValues...)
203+
185204
// DA Submitter metrics
186205
m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
187206
Namespace: namespace,
@@ -246,6 +265,10 @@ func NopMetrics() *Metrics {
246265
DASubmitterLastFailure: make(map[DASubmitterFailureReason]metrics.Gauge),
247266
DASubmitterPendingBlobs: discard.NewGauge(),
248267
DASubmitterResends: discard.NewCounter(),
268+
269+
// Forced inclusion metrics
270+
ForcedInclusionTxsInGracePeriod: discard.NewGauge(),
271+
ForcedInclusionTxsMalicious: discard.NewCounter(),
249272
}
250273

251274
// Initialize maps with no-op metrics

block/internal/syncing/syncer.go

Lines changed: 182 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"encoding/hex"
99
"errors"
1010
"fmt"
11+
"math"
1112
"sync"
1213
"sync/atomic"
1314
"time"
@@ -18,6 +19,7 @@ import (
1819

1920
coreda "github.com/evstack/ev-node/core/da"
2021
coreexecutor "github.com/evstack/ev-node/core/execution"
22+
seqcommon "github.com/evstack/ev-node/sequencers/common"
2123

2224
"github.com/evstack/ev-node/block/internal/cache"
2325
"github.com/evstack/ev-node/block/internal/common"
@@ -28,6 +30,47 @@ import (
2830
"github.com/evstack/ev-node/types"
2931
)
3032

33+
// forcedInclusionGracePeriodConfig contains internal configuration for forced inclusion grace periods.
34+
type forcedInclusionGracePeriodConfig struct {
35+
// basePeriod is the base number of additional epochs allowed for including forced inclusion transactions
36+
// before marking the sequencer as malicious. This provides tolerance for temporary chain congestion.
37+
// A value of 0 means strict enforcement (no grace period).
38+
// A value of 1 means transactions from epoch N can be included in epoch N+1 without being marked malicious.
39+
// Recommended: 1 epoch.
40+
basePeriod uint64
41+
42+
// dynamicMinMultiplier is the minimum multiplier for the base grace period.
43+
// The actual grace period will be at least: basePeriod * dynamicMinMultiplier.
44+
// Example: base=2, min=0.5 → minimum grace period is 1 epoch.
45+
dynamicMinMultiplier float64
46+
47+
// dynamicMaxMultiplier is the maximum multiplier for the base grace period.
48+
// The actual grace period will be at most: basePeriod * dynamicMaxMultiplier.
49+
// Example: base=2, max=3.0 → maximum grace period is 6 epochs.
50+
dynamicMaxMultiplier float64
51+
52+
// dynamicFullnessThreshold defines what percentage of block capacity is considered "full".
53+
// When EMA of block fullness is above this threshold, grace period increases.
54+
// When below, grace period decreases. Value should be between 0.0 and 1.0.
55+
dynamicFullnessThreshold float64
56+
57+
// dynamicAdjustmentRate controls how quickly the grace period multiplier adapts.
58+
// Higher values make it adapt faster to congestion changes. Value should be between 0.0 and 1.0.
59+
// Recommended: 0.05 for gradual adjustment, 0.1 for faster response.
60+
dynamicAdjustmentRate float64
61+
}
62+
63+
// newForcedInclusionGracePeriodConfig returns the internal grace period configuration.
64+
func newForcedInclusionGracePeriodConfig() forcedInclusionGracePeriodConfig {
65+
return forcedInclusionGracePeriodConfig{
66+
basePeriod: 1, // 1 epoch grace period
67+
dynamicMinMultiplier: 0.5, // Minimum 0.5x base grace period
68+
dynamicMaxMultiplier: 3.0, // Maximum 3x base grace period
69+
dynamicFullnessThreshold: 0.8, // 80% capacity considered full
70+
dynamicAdjustmentRate: 0.05, // 5% adjustment per block
71+
}
72+
}
73+
3174
// Syncer handles block synchronization from DA and P2P sources.
3275
type Syncer struct {
3376
// Core components
@@ -66,6 +109,9 @@ type Syncer struct {
66109

67110
// Forced inclusion tracking
68111
pendingForcedInclusionTxs sync.Map // map[string]pendingForcedInclusionTx
112+
gracePeriodMultiplier *atomic.Pointer[float64]
113+
blockFullnessEMA *atomic.Pointer[float64]
114+
gracePeriodConfig forcedInclusionGracePeriodConfig
69115

70116
// Lifecycle
71117
ctx context.Context
@@ -102,22 +148,34 @@ func NewSyncer(
102148
daRetrieverHeight := &atomic.Uint64{}
103149
daRetrieverHeight.Store(genesis.DAStartHeight)
104150

151+
// Initialize dynamic grace period state
152+
initialMultiplier := 1.0
153+
gracePeriodMultiplier := &atomic.Pointer[float64]{}
154+
gracePeriodMultiplier.Store(&initialMultiplier)
155+
156+
initialFullness := 0.0
157+
blockFullnessEMA := &atomic.Pointer[float64]{}
158+
blockFullnessEMA.Store(&initialFullness)
159+
105160
return &Syncer{
106-
store: store,
107-
exec: exec,
108-
cache: cache,
109-
metrics: metrics,
110-
config: config,
111-
genesis: genesis,
112-
options: options,
113-
lastState: &atomic.Pointer[types.State]{},
114-
daClient: daClient,
115-
daRetrieverHeight: daRetrieverHeight,
116-
headerStore: headerStore,
117-
dataStore: dataStore,
118-
heightInCh: make(chan common.DAHeightEvent, 100),
119-
errorCh: errorCh,
120-
logger: logger.With().Str("component", "syncer").Logger(),
161+
store: store,
162+
exec: exec,
163+
cache: cache,
164+
metrics: metrics,
165+
config: config,
166+
genesis: genesis,
167+
options: options,
168+
lastState: &atomic.Pointer[types.State]{},
169+
daClient: daClient,
170+
daRetrieverHeight: daRetrieverHeight,
171+
headerStore: headerStore,
172+
dataStore: dataStore,
173+
heightInCh: make(chan common.DAHeightEvent, 100),
174+
errorCh: errorCh,
175+
logger: logger.With().Str("component", "syncer").Logger(),
176+
gracePeriodMultiplier: gracePeriodMultiplier,
177+
blockFullnessEMA: blockFullnessEMA,
178+
gracePeriodConfig: newForcedInclusionGracePeriodConfig(),
121179
}
122180
}
123181

@@ -677,15 +735,92 @@ func hashTx(tx []byte) string {
677735
return hex.EncodeToString(hash[:])
678736
}
679737

738+
// calculateBlockFullness returns a value between 0.0 and 1.0 indicating how full the block is.
739+
// It estimates fullness based on total data size.
740+
// This is a heuristic - actual limits may vary by execution layer.
741+
func (s *Syncer) calculateBlockFullness(data *types.Data) float64 {
742+
const maxDataSize = seqcommon.AbsoluteMaxBlobSize
743+
744+
var fullness float64
745+
count := 0
746+
747+
// Check data size fullness
748+
dataSize := uint64(0)
749+
for _, tx := range data.Txs {
750+
dataSize += uint64(len(tx))
751+
}
752+
sizeFullness := float64(dataSize) / float64(maxDataSize)
753+
fullness += min(sizeFullness, 1.0)
754+
count++
755+
756+
// Return average fullness
757+
return fullness / float64(count)
758+
}
759+
760+
// updateDynamicGracePeriod updates the grace period multiplier based on block fullness.
761+
// When blocks are consistently full, the multiplier increases (more lenient).
762+
// When blocks have capacity, the multiplier decreases (stricter).
763+
func (s *Syncer) updateDynamicGracePeriod(blockFullness float64) {
764+
// Update exponential moving average of block fullness
765+
currentEMA := *s.blockFullnessEMA.Load()
766+
alpha := s.gracePeriodConfig.dynamicAdjustmentRate
767+
newEMA := alpha*blockFullness + (1-alpha)*currentEMA
768+
s.blockFullnessEMA.Store(&newEMA)
769+
770+
// Adjust grace period multiplier based on EMA
771+
currentMultiplier := *s.gracePeriodMultiplier.Load()
772+
threshold := s.gracePeriodConfig.dynamicFullnessThreshold
773+
774+
var newMultiplier float64
775+
if newEMA > threshold {
776+
// Blocks are full - increase grace period (more lenient)
777+
adjustment := alpha * (newEMA - threshold) / (1.0 - threshold)
778+
newMultiplier = currentMultiplier + adjustment
779+
} else {
780+
// Blocks have capacity - decrease grace period (stricter)
781+
adjustment := alpha * (threshold - newEMA) / threshold
782+
newMultiplier = currentMultiplier - adjustment
783+
}
784+
785+
// Clamp to min/max bounds
786+
newMultiplier = max(newMultiplier, s.gracePeriodConfig.dynamicMinMultiplier)
787+
newMultiplier = min(newMultiplier, s.gracePeriodConfig.dynamicMaxMultiplier)
788+
789+
s.gracePeriodMultiplier.Store(&newMultiplier)
790+
791+
// Log significant changes (more than 10% change)
792+
if math.Abs(newMultiplier-currentMultiplier) > 0.1 {
793+
s.logger.Debug().
794+
Float64("block_fullness", blockFullness).
795+
Float64("fullness_ema", newEMA).
796+
Float64("old_multiplier", currentMultiplier).
797+
Float64("new_multiplier", newMultiplier).
798+
Msg("dynamic grace period multiplier adjusted")
799+
}
800+
}
801+
802+
// getEffectiveGracePeriod returns the current effective grace period considering dynamic adjustment.
803+
func (s *Syncer) getEffectiveGracePeriod() uint64 {
804+
multiplier := *s.gracePeriodMultiplier.Load()
805+
effectivePeriod := math.Round(float64(s.gracePeriodConfig.basePeriod) * multiplier)
806+
minPeriod := float64(s.gracePeriodConfig.basePeriod) * s.gracePeriodConfig.dynamicMinMultiplier
807+
808+
return uint64(max(effectivePeriod, minPeriod))
809+
}
810+
680811
// verifyForcedInclusionTxs verifies that forced inclusion transactions from DA are properly handled.
681812
// Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions
682813
// to future blocks (smoothing). This is legitimate behavior within an epoch.
683-
// However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins.
814+
// However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later).
684815
func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.Data) error {
685816
if s.fiRetriever == nil {
686817
return nil
687818
}
688819

820+
// Update dynamic grace period based on block fullness
821+
blockFullness := s.calculateBlockFullness(data)
822+
s.updateDynamicGracePeriod(blockFullness)
823+
689824
// Retrieve forced inclusion transactions from DA for current epoch
690825
forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(s.ctx, currentState.DAHeight)
691826
if err != nil {
@@ -741,16 +876,36 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.
741876
}
742877

743878
// Check if we've moved past any epoch boundaries with pending txs
879+
// Grace period: Allow forced inclusion txs from epoch N to be included in epoch N+1, N+2, etc.
880+
// Only flag as malicious if past grace boundary to prevent false positives during chain congestion.
744881
var maliciousTxs, remainingPending []pendingForcedInclusionTx
882+
var txsInGracePeriod int
745883
for _, pending := range stillPending {
746-
// If current DA height is past this epoch's end, these txs should have been included
747-
if currentState.DAHeight > pending.EpochEnd {
884+
// Calculate grace boundary: epoch end + (effective grace periods × epoch size)
885+
effectiveGracePeriod := s.getEffectiveGracePeriod()
886+
graceBoundary := pending.EpochEnd + (effectiveGracePeriod * s.genesis.DAEpochForcedInclusion)
887+
888+
if currentState.DAHeight > graceBoundary {
748889
maliciousTxs = append(maliciousTxs, pending)
890+
s.logger.Warn().
891+
Uint64("current_da_height", currentState.DAHeight).
892+
Uint64("epoch_end", pending.EpochEnd).
893+
Uint64("grace_boundary", graceBoundary).
894+
Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod).
895+
Uint64("effective_grace_periods", effectiveGracePeriod).
896+
Float64("grace_multiplier", *s.gracePeriodMultiplier.Load()).
897+
Str("tx_hash", pending.TxHash[:16]).
898+
Msg("forced inclusion transaction past grace boundary - marking as malicious")
749899
} else {
750900
remainingPending = append(remainingPending, pending)
901+
if currentState.DAHeight > pending.EpochEnd {
902+
txsInGracePeriod++
903+
}
751904
}
752905
}
753906

907+
s.metrics.ForcedInclusionTxsInGracePeriod.Set(float64(txsInGracePeriod))
908+
754909
// Update pending map - clear old entries and store only remaining pending
755910
s.pendingForcedInclusionTxs.Range(func(key, value any) bool {
756911
s.pendingForcedInclusionTxs.Delete(key)
@@ -760,14 +915,20 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.
760915
s.pendingForcedInclusionTxs.Store(pending.TxHash, pending)
761916
}
762917

763-
// If there are transactions from past epochs that weren't included, sequencer is malicious
918+
// If there are transactions past grace boundary that weren't included, sequencer is malicious
764919
if len(maliciousTxs) > 0 {
920+
s.metrics.ForcedInclusionTxsMalicious.Add(float64(len(maliciousTxs)))
921+
922+
effectiveGracePeriod := s.getEffectiveGracePeriod()
765923
s.logger.Error().
766924
Uint64("height", data.Height()).
767925
Uint64("current_da_height", currentState.DAHeight).
768926
Int("malicious_count", len(maliciousTxs)).
769-
Msg("SEQUENCER IS MALICIOUS: forced inclusion transactions from past epoch(s) not included")
770-
return errors.Join(errMaliciousProposer, fmt.Errorf("sequencer is malicious: %d forced inclusion transactions from past epoch(s) not included", len(maliciousTxs)))
927+
Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod).
928+
Uint64("effective_grace_periods", effectiveGracePeriod).
929+
Float64("grace_multiplier", *s.gracePeriodMultiplier.Load()).
930+
Msg("SEQUENCER IS MALICIOUS: forced inclusion transactions past grace boundary not included")
931+
return errors.Join(errMaliciousProposer, fmt.Errorf("sequencer is malicious: %d forced inclusion transactions past grace boundary (base_grace_periods=%d, effective_grace_periods=%d) not included", len(maliciousTxs), s.gracePeriodConfig.basePeriod, effectiveGracePeriod))
771932
}
772933

773934
// Log current state

0 commit comments

Comments
 (0)