-
Notifications
You must be signed in to change notification settings - Fork 209
[Access] Access ingestion error handle #8385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c260982
8cf0dee
5a8f072
b2fb9ac
f0a151d
8118078
5774665
d8cabea
20b60eb
3ba7452
e6f1bcc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -221,6 +221,7 @@ func (s *Suite) initEngineAndSyncer() (*Engine, *collections.Syncer, *collection | |
| syncer, | ||
| indexer, | ||
| s.collectionExecutedMetric, | ||
| metrics.NewNoopCollector(), | ||
| nil, | ||
| s.distributor, | ||
| ) | ||
|
|
@@ -567,6 +568,77 @@ func (s *Suite) TestCollectionSyncing() { | |
| }, 2*time.Second, 100*time.Millisecond, "last full block height never updated") | ||
| } | ||
|
|
||
| // TestOnFinalizedBlockAlreadyIndexed checks that when a block has already been indexed | ||
| // (storage.ErrAlreadyExists), the engine logs a warning and continues processing by | ||
| // requesting collections and updating metrics. This can happen when the job queue processed | ||
| // index hasn't been updated yet after a previous indexing operation. | ||
| func (s *Suite) TestOnFinalizedBlockAlreadyIndexed() { | ||
| cluster := new(protocolmock.Cluster) | ||
| epoch := new(protocolmock.CommittedEpoch) | ||
| epochs := new(protocolmock.EpochQuery) | ||
| snap := new(protocolmock.Snapshot) | ||
|
Comment on lines
+576
to
+579
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it intentional that these use |
||
|
|
||
| finalSnapshot := protocolmock.NewSnapshot(s.T()) | ||
| finalSnapshot.On("Head").Return(s.finalizedBlock, nil).Twice() | ||
| s.proto.state.On("Final").Return(finalSnapshot, nil).Twice() | ||
|
|
||
| epoch.On("ClusterByChainID", mock.Anything).Return(cluster, nil) | ||
| epochs.On("Current").Return(epoch, nil) | ||
| snap.On("Epochs").Return(epochs) | ||
|
|
||
| // prepare cluster committee members | ||
| clusterCommittee := unittest.IdentityListFixture(32 * 4).Filter(filter.HasRole[flow.Identity](flow.RoleCollection)).ToSkeleton() | ||
| cluster.On("Members").Return(clusterCommittee, nil) | ||
|
|
||
| eng, _, _ := s.initEngineAndSyncer() | ||
|
|
||
| irrecoverableCtx, cancel := irrecoverable.NewMockSignalerContextWithCancel(s.T(), s.ctx) | ||
| eng.ComponentManager.Start(irrecoverableCtx) | ||
| unittest.RequireCloseBefore(s.T(), eng.Ready(), 100*time.Millisecond, "could not start worker") | ||
| defer func() { | ||
| cancel() | ||
| unittest.RequireCloseBefore(s.T(), eng.Done(), 100*time.Millisecond, "could not stop worker") | ||
| }() | ||
|
|
||
| block := s.generateBlock(clusterCommittee, snap) | ||
| block.Height = s.finalizedBlock.Height + 1 | ||
| s.blockMap[block.Height] = block | ||
| s.mockCollectionsForBlock(block) | ||
| s.finalizedBlock = block.ToHeader() | ||
|
|
||
| hotstuffBlock := hotmodel.Block{ | ||
| BlockID: block.ID(), | ||
| } | ||
|
|
||
| // simulate that the block has already been indexed (e.g., by a previous job queue run) | ||
| // by returning storage.ErrAlreadyExists | ||
| s.blocks.On("BatchIndexBlockContainingCollectionGuarantees", mock.Anything, mock.Anything, block.ID(), []flow.Identifier(flow.GetIDs(block.Payload.Guarantees))). | ||
| Return(storage.ErrAlreadyExists).Once() | ||
|
|
||
| missingCollectionCount := 4 | ||
| wg := sync.WaitGroup{} | ||
| wg.Add(missingCollectionCount) | ||
|
|
||
| // even though block is already indexed, collections should still be requested | ||
| for _, cg := range block.Payload.Guarantees { | ||
| s.request.On("EntityByID", cg.CollectionID, mock.Anything).Return().Run(func(args mock.Arguments) { | ||
| wg.Done() | ||
| }).Once() | ||
| } | ||
|
|
||
| // force should be called once | ||
| s.request.On("Force").Return().Once() | ||
|
|
||
| // process the block through the finalized callback | ||
| s.distributor.OnFinalizedBlock(&hotstuffBlock) | ||
|
|
||
| unittest.RequireReturnsBefore(s.T(), wg.Wait, 100*time.Millisecond, "expect to process new block before timeout") | ||
|
|
||
| // assert that collections were still requested despite the block being already indexed | ||
| s.headers.AssertExpectations(s.T()) | ||
| s.request.AssertNumberOfCalls(s.T(), "EntityByID", len(block.Payload.Guarantees)) | ||
| } | ||
|
|
||
| func (s *Suite) mockGuarantorsForCollection(guarantee *flow.CollectionGuarantee, members flow.IdentitySkeletonList) { | ||
| cluster := protocolmock.NewCluster(s.T()) | ||
| cluster.On("Members").Return(members, nil).Once() | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 288
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 5989
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 407
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 132
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 4208
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 800
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 682
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 755
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 760
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 928
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 526
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 171
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 712
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 413
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 222
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 689
🏁 Script executed:
Repository: onflow/flow-go
Length of output: 1432
Add
notNil()wrapper forbuilder.AccessMetricsin ingestion.New() call to ensure defensive validation.At line 2280 in
cmd/access/node_builder/access_node_builder.go,builder.AccessMetricsis passed toingestion.New()without thenotNil()validation, unlike other critical dependencies in the same call (CollectionSyncer,CollectionIndexer,collectionExecutedMetric,TxResultErrorMessagesCore). SinceaccessMetricsis an interface type that can be nil, and the code at line 412 inengine/access/ingestion/engine.gocallse.accessMetrics.UpdateIngestionFinalizedBlockHeight(block.Height)without a nil check, this creates a potential nil pointer dereference. Wrapbuilder.AccessMetricswithnotNil()to align with the project's defensive pattern used consistently for other critical dependencies.🤖 Prompt for AI Agents