Skip to content
1 change: 1 addition & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2281,6 +2281,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
notNil(builder.CollectionSyncer),
notNil(builder.CollectionIndexer),
notNil(builder.collectionExecutedMetric),
builder.AccessMetrics,
notNil(builder.TxResultErrorMessagesCore),
builder.FollowerDistributor,
)
Expand Down
3 changes: 3 additions & 0 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,7 @@ func (suite *Suite) TestGetSealedTransaction() {
collectionSyncer,
collectionIndexer,
collectionExecutedMetric,
suite.metrics,
nil,
followerDistributor,
)
Expand Down Expand Up @@ -1052,6 +1053,7 @@ func (suite *Suite) TestGetTransactionResult() {
collectionSyncer,
collectionIndexer,
collectionExecutedMetric,
suite.metrics,
nil,
followerDistributor,
)
Expand Down Expand Up @@ -1324,6 +1326,7 @@ func (suite *Suite) TestExecuteScript() {
collectionSyncer,
collectionIndexer,
collectionExecutedMetric,
suite.metrics,
nil,
followerDistributor,
)
Expand Down
26 changes: 21 additions & 5 deletions engine/access/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingestion

import (
"context"
"errors"
"fmt"

"github.com/jordanschalm/lockctx"
Expand Down Expand Up @@ -73,6 +74,7 @@ type Engine struct {
// TODO: There's still a need for this metric to be in the ingestion engine rather than collection syncer.
// Maybe it is a good idea to split it up?
collectionExecutedMetric module.CollectionExecutedMetric
accessMetrics module.AccessMetrics

txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
}
Expand All @@ -96,6 +98,7 @@ func New(
collectionSyncer *collections.Syncer,
collectionIndexer *collections.Indexer,
collectionExecutedMetric module.CollectionExecutedMetric,
accessMetrics module.AccessMetrics,
txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore,
registrar hotstuff.FinalizationRegistrar,
) (*Engine, error) {
Expand Down Expand Up @@ -130,6 +133,7 @@ func New(
executionReceipts: executionReceipts,
maxReceiptHeight: 0,
collectionExecutedMetric: collectionExecutedMetric,
accessMetrics: accessMetrics,
finalizedBlockNotifier: engine.NewNotifier(),

// queue / notifier for execution receipts
Expand Down Expand Up @@ -230,12 +234,15 @@ func (e *Engine) processFinalizedBlockJob(ctx irrecoverable.SignalerContext, job
}

err = e.processFinalizedBlock(block)
if err == nil {
done()
if err != nil {
ctx.Throw(
fmt.Errorf(
"fatal error when ingestion building col->block index for finalized block (job: %s, height: %v): %w",
job.ID(), block.Height, err))
return
}

e.log.Error().Err(err).Str("job_id", string(job.ID())).Msg("error during finalized block processing job")
done()
}

// processExecutionReceipts is responsible for processing the execution receipts.
Expand Down Expand Up @@ -353,10 +360,10 @@ func (e *Engine) onFinalizedBlock(*model.Block) {

// processFinalizedBlock handles an incoming finalized block.
// It processes the block, indexes it for further processing, and requests missing collections if necessary.
// If the block is already indexed (storage.ErrAlreadyExists), it logs a warning and continues processing.
//
// Expected errors during normal operation:
// - storage.ErrNotFound - if last full block height does not exist in the database.
// - storage.ErrAlreadyExists - if the collection within block or an execution result ID already exists in the database.
// - generic error in case of unexpected failure from the database layer, or failure
// to decode an existing database value.
func (e *Engine) processFinalizedBlock(block *flow.Block) error {
Expand Down Expand Up @@ -386,14 +393,23 @@ func (e *Engine) processFinalizedBlock(block *flow.Block) error {
})
})
if err != nil {
return fmt.Errorf("could not index block for collections: %w", err)
if !errors.Is(err, storage.ErrAlreadyExists) {
return fmt.Errorf("could not index block for collections: %w", err)
}
// the job queue processed index is updated in a separate db update, so it's possible that the above index
// has been built, but the jobqueue index has not been updated yet. In this case, we can safely skip processing.
e.log.Warn().
Uint64("height", block.Height).
Str("block_id", block.ID().String()).
Msg("block already indexed, skipping indexing")
}

err = e.collectionSyncer.RequestCollectionsForBlock(block.Height, block.Payload.Guarantees)
if err != nil {
return fmt.Errorf("could not request collections for block: %w", err)
}
e.collectionExecutedMetric.BlockFinalized(block)
e.accessMetrics.UpdateIngestionFinalizedBlockHeight(block.Height)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Find the ingestion package structure and New() function signature
fd -n "engine.go" engine/access/ingestion/ | head -5

Repository: onflow/flow-go

Length of output: 288


🏁 Script executed:

# Get the constructor signature for ingestion.New()
rg -n -A 30 "^func New\(" engine/access/ingestion/ --type=go | head -60

Repository: onflow/flow-go

Length of output: 5989


🏁 Script executed:

# Find all callers of ingestion.New()
rg -n "ingestion\.New\(" --type=go

Repository: onflow/flow-go

Length of output: 407


🏁 Script executed:

# Also check for pattern 'New(' in the ingestion package to find constructor
rg -n "func.*New.*(" engine/access/ingestion/engine.go --type=go

Repository: onflow/flow-go

Length of output: 132


🏁 Script executed:

# Read the ingestion.New() constructor to see if it validates accessMetrics
sed -n '87,200p' engine/access/ingestion/engine.go

Repository: onflow/flow-go

Length of output: 4208


🏁 Script executed:

# Check the first caller in access_test.go
sed -n '770,800p' engine/access/access_test.go

Repository: onflow/flow-go

Length of output: 800


🏁 Script executed:

# Check the second test caller
sed -n '1035,1065p' engine/access/access_test.go

Repository: onflow/flow-go

Length of output: 682


🏁 Script executed:

# Check the third test caller
sed -n '1308,1338p' engine/access/access_test.go

Repository: onflow/flow-go

Length of output: 755


🏁 Script executed:

# Check the cmd/consensus/main.go caller
sed -n '550,575p' cmd/consensus/main.go

Repository: onflow/flow-go

Length of output: 760


🏁 Script executed:

# Check the cmd/access/node_builder caller
sed -n '2260,2290p' cmd/access/node_builder/access_node_builder.go

Repository: onflow/flow-go

Length of output: 928


🏁 Script executed:

# Check what notNil() does in access_node_builder to understand defensive pattern
rg -B 5 -A 5 "func notNil" cmd/access/node_builder/access_node_builder.go | head -20

Repository: onflow/flow-go

Length of output: 526


🏁 Script executed:

# Look at module.AccessMetrics definition to see if it's interface or concrete
rg -n "type AccessMetrics" --type=go

Repository: onflow/flow-go

Length of output: 171


🏁 Script executed:

# Examine the interface definition
rg -B 2 -A 10 "type AccessMetrics interface" --type=go

Repository: onflow/flow-go

Length of output: 712


🏁 Script executed:

# Verify if builder.AccessMetrics could be nil by checking how it's initialized
rg -B 10 "builder.AccessMetrics =" cmd/access/node_builder/access_node_builder.go | head -30

Repository: onflow/flow-go

Length of output: 413


🏁 Script executed:

# Check if AccessMetrics is initialized in all code paths
rg -n "AccessMetrics" cmd/access/node_builder/access_node_builder.go | grep -E "(AccessMetrics\s*=|AccessMetrics:)" | head -20

Repository: onflow/flow-go

Length of output: 222


🏁 Script executed:

# Check the exact context around the ingestion.New() call to see if AccessMetrics is wrapped with notNil()
sed -n '2265,2290p' cmd/access/node_builder/access_node_builder.go

Repository: onflow/flow-go

Length of output: 689


🏁 Script executed:

# Also check line 2132 context to understand which component that's for
sed -n '2120,2145p' cmd/access/node_builder/access_node_builder.go

Repository: onflow/flow-go

Length of output: 1432


Add notNil() wrapper for builder.AccessMetrics in ingestion.New() call to ensure defensive validation.

At line 2280 in cmd/access/node_builder/access_node_builder.go, builder.AccessMetrics is passed to ingestion.New() without the notNil() validation, unlike other critical dependencies in the same call (CollectionSyncer, CollectionIndexer, collectionExecutedMetric, TxResultErrorMessagesCore). Since accessMetrics is an interface type that can be nil, and the code at line 412 in engine/access/ingestion/engine.go calls e.accessMetrics.UpdateIngestionFinalizedBlockHeight(block.Height) without a nil check, this creates a potential nil pointer dereference. Wrap builder.AccessMetrics with notNil() to align with the project's defensive pattern used consistently for other critical dependencies.

🤖 Prompt for AI Agents
In `@engine/access/ingestion/engine.go` at line 412, The call to ingestion.New()
should defensively validate builder.AccessMetrics to avoid nil pointer deref at
e.accessMetrics.UpdateIngestionFinalizedBlockHeight(block.Height); wrap the
argument with notNil(builder.AccessMetrics) in the ingestion.New(...) invocation
(same place other deps like CollectionSyncer/CollectionIndexer use notNil()) so
ingestion receives a non-nil validated AccessMetrics implementation.


return nil
}
Expand Down
72 changes: 72 additions & 0 deletions engine/access/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func (s *Suite) initEngineAndSyncer() (*Engine, *collections.Syncer, *collection
syncer,
indexer,
s.collectionExecutedMetric,
metrics.NewNoopCollector(),
nil,
s.distributor,
)
Expand Down Expand Up @@ -567,6 +568,77 @@ func (s *Suite) TestCollectionSyncing() {
}, 2*time.Second, 100*time.Millisecond, "last full block height never updated")
}

// TestOnFinalizedBlockAlreadyIndexed checks that when a block has already been indexed
// (storage.ErrAlreadyExists), the engine logs a warning and continues processing by
// requesting collections and updating metrics. This can happen when the job queue processed
// index hasn't been updated yet after a previous indexing operation.
func (s *Suite) TestOnFinalizedBlockAlreadyIndexed() {
cluster := new(protocolmock.Cluster)
epoch := new(protocolmock.CommittedEpoch)
epochs := new(protocolmock.EpochQuery)
snap := new(protocolmock.Snapshot)
Comment on lines +576 to +579
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that these use new(module.mockObject) rather than module.NewMockObject(s.T())? This way the expectations aren't checked by default, which has bitten me once before.


finalSnapshot := protocolmock.NewSnapshot(s.T())
finalSnapshot.On("Head").Return(s.finalizedBlock, nil).Twice()
s.proto.state.On("Final").Return(finalSnapshot, nil).Twice()

epoch.On("ClusterByChainID", mock.Anything).Return(cluster, nil)
epochs.On("Current").Return(epoch, nil)
snap.On("Epochs").Return(epochs)

// prepare cluster committee members
clusterCommittee := unittest.IdentityListFixture(32 * 4).Filter(filter.HasRole[flow.Identity](flow.RoleCollection)).ToSkeleton()
cluster.On("Members").Return(clusterCommittee, nil)

eng, _, _ := s.initEngineAndSyncer()

irrecoverableCtx, cancel := irrecoverable.NewMockSignalerContextWithCancel(s.T(), s.ctx)
eng.ComponentManager.Start(irrecoverableCtx)
unittest.RequireCloseBefore(s.T(), eng.Ready(), 100*time.Millisecond, "could not start worker")
defer func() {
cancel()
unittest.RequireCloseBefore(s.T(), eng.Done(), 100*time.Millisecond, "could not stop worker")
}()

block := s.generateBlock(clusterCommittee, snap)
block.Height = s.finalizedBlock.Height + 1
s.blockMap[block.Height] = block
s.mockCollectionsForBlock(block)
s.finalizedBlock = block.ToHeader()

hotstuffBlock := hotmodel.Block{
BlockID: block.ID(),
}

// simulate that the block has already been indexed (e.g., by a previous job queue run)
// by returning storage.ErrAlreadyExists
s.blocks.On("BatchIndexBlockContainingCollectionGuarantees", mock.Anything, mock.Anything, block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))).
Return(storage.ErrAlreadyExists).Once()

missingCollectionCount := 4
wg := sync.WaitGroup{}
wg.Add(missingCollectionCount)

// even though block is already indexed, collections should still be requested
for _, cg := range block.Payload.Guarantees {
s.request.On("EntityByID", cg.CollectionID, mock.Anything).Return().Run(func(args mock.Arguments) {
wg.Done()
}).Once()
}

// force should be called once
s.request.On("Force").Return().Once()

// process the block through the finalized callback
s.distributor.OnFinalizedBlock(&hotstuffBlock)

unittest.RequireReturnsBefore(s.T(), wg.Wait, 100*time.Millisecond, "expect to process new block before timeout")

// assert that collections were still requested despite the block being already indexed
s.headers.AssertExpectations(s.T())
s.request.AssertNumberOfCalls(s.T(), "EntityByID", len(block.Payload.Guarantees))
}

func (s *Suite) mockGuarantorsForCollection(guarantee *flow.CollectionGuarantee, members flow.IdentitySkeletonList) {
cluster := protocolmock.NewCluster(s.T())
cluster.On("Members").Return(members, nil).Once()
Expand Down
3 changes: 3 additions & 0 deletions module/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,9 @@ type AccessMetrics interface {

// UpdateLastFullBlockHeight tracks the height of the last block for which all collections were received
UpdateLastFullBlockHeight(height uint64)

// UpdateIngestionFinalizedBlockHeight tracks the latest finalized block height processed by ingestion
UpdateIngestionFinalizedBlockHeight(height uint64)
}

type ExecutionResultStats struct {
Expand Down
29 changes: 20 additions & 9 deletions module/metrics/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ type AccessCollector struct {
module.TransactionValidationMetrics
module.BackendScriptsMetrics

connectionReused prometheus.Counter
connectionsInPool *prometheus.GaugeVec
connectionAdded prometheus.Counter
connectionEstablished prometheus.Counter
connectionInvalidated prometheus.Counter
connectionUpdated prometheus.Counter
connectionEvicted prometheus.Counter
lastFullBlockHeight prometheus.Gauge
maxReceiptHeight prometheus.Gauge
connectionReused prometheus.Counter
connectionsInPool *prometheus.GaugeVec
connectionAdded prometheus.Counter
connectionEstablished prometheus.Counter
connectionInvalidated prometheus.Counter
connectionUpdated prometheus.Counter
connectionEvicted prometheus.Counter
lastFullBlockHeight prometheus.Gauge
ingestionFinalizedBlockHeight prometheus.Gauge
maxReceiptHeight prometheus.Gauge

// used to skip heights that are lower than the current max height
maxReceiptHeightValue counters.StrictMonotonicCounter
Expand Down Expand Up @@ -106,6 +107,12 @@ func NewAccessCollector(opts ...AccessCollectorOpts) *AccessCollector {
Subsystem: subsystemIngestion,
Help: "gauge to track the highest consecutive finalized block height with all collections indexed",
}),
ingestionFinalizedBlockHeight: promauto.NewGauge(prometheus.GaugeOpts{
Name: "ingestion_finalized_block_height",
Namespace: namespaceAccess,
Subsystem: subsystemIngestion,
Help: "gauge to track the latest finalized block height processed by ingestion",
}),
maxReceiptHeight: promauto.NewGauge(prometheus.GaugeOpts{
Name: "max_receipt_height",
Namespace: namespaceAccess,
Expand Down Expand Up @@ -155,6 +162,10 @@ func (ac *AccessCollector) UpdateLastFullBlockHeight(height uint64) {
ac.lastFullBlockHeight.Set(float64(height))
}

func (ac *AccessCollector) UpdateIngestionFinalizedBlockHeight(height uint64) {
ac.ingestionFinalizedBlockHeight.Set(float64(height))
}

func (ac *AccessCollector) UpdateExecutionReceiptMaxHeight(height uint64) {
if ac.maxReceiptHeightValue.Set(height) {
ac.maxReceiptHeight.Set(float64(height))
Expand Down
1 change: 1 addition & 0 deletions module/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (nc *NoopCollector) TransactionValidationSkipped()
func (nc *NoopCollector) TransactionSubmissionFailed() {}
func (nc *NoopCollector) UpdateExecutionReceiptMaxHeight(height uint64) {}
func (nc *NoopCollector) UpdateLastFullBlockHeight(height uint64) {}
func (nc *NoopCollector) UpdateIngestionFinalizedBlockHeight(height uint64) {}
func (nc *NoopCollector) ChunkDataPackRequestProcessed() {}
func (nc *NoopCollector) ExecutionSync(syncing bool) {}
func (nc *NoopCollector) ExecutionBlockDataUploadStarted() {}
Expand Down
40 changes: 40 additions & 0 deletions module/mock/access_metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading