Refactor jobqueue to require initialized progress consumer - take 2#8404
Refactor jobqueue to require initialized progress consumer - take 2#8404peterargue merged 5 commits intomasterfrom
Conversation
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
📝 WalkthroughWalkthroughReplace ConsumerProgressInitializer usages with fully-initialized storage.ConsumerProgress across builders, engines, jobqueue, indexer, verification consumers, and tests; call Initialize(...) earlier, add error propagation, and update function signatures and call sites accordingly. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
No actionable comments were generated in the recent review. 🎉 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
module/state_synchronization/indexer/indexer.go (1)
73-84:⚠️ Potential issue | 🟡 MinorThe TODO on line 80 warrants attention:
initHeightvsprocessedHeight.ProcessedIndex().The
lastProcessedHeightis initialized with theinitHeightparameter, but after this refactor,processedHeightis an already-initializedConsumerProgresswhoseProcessedIndex()could differ frominitHeight. If they diverge,onBlockIndexed()(line 156–178) could either re-notify already-processed heights or skip some. The same concern applies toNewProcessedHeightRecorderManager(initHeight)on line 83.Since this is flagged by the author's own TODO, consider resolving it in this PR or creating a follow-up issue — especially now that
processedHeight.ProcessedIndex()is readily accessible at construction time.Would you like me to open an issue to track resolving this
initHeightvsprocessedHeight.ProcessedIndex()discrepancy?
🤖 Fix all issues with AI agents
In `@engine/verification/assigner/blockconsumer/consumer.go`:
- Around line 30-40: The doc comment for NewBlockConsumer is stale (it mentions
returning "the default processed index"); update the comment to accurately
describe that NewBlockConsumer constructs and returns a *BlockConsumer and an
error (not a processed index). Locate the comment immediately above the
NewBlockConsumer function and replace the two-line description with a short
summary like "NewBlockConsumer creates and initializes a BlockConsumer and
returns the consumer or an error" (keeping reference to NewBlockConsumer and its
actual return values).
🧹 Nitpick comments (3)
engine/access/access_test.go (1)
1286-1292: Minor style inconsistency: two-step vs chained initialization.In
TestGetSealedTransaction, the initialization is chained (e.g.,store.NewConsumerProgress(...).Initialize(...)), whereas here inTestExecuteScripta two-step approach is used (assign initializer first, then call.Initialize()). Both are correct, but you may want to pick one style for consistency across the test file.module/state_synchronization/requester/execution_data_requester.go (1)
178-183: Stale comment: initialization no longer happens inside this constructor.The comment on line 181 says "rootHeight will be used to init the last processed height," but initialization now occurs upstream before the progress is passed in. Consider updating this comment to reflect the new behavior.
📝 Suggested comment update
// blockConsumer ensures every sealed block's execution data is downloaded. // It listens to block finalization events from `finalizationNotifier`, then checks if there // are new sealed blocks with `sealedBlockReader`. If there are, it starts workers to process // them with `processingBlockJob`, which fetches execution data. At most `fetchWorkers` workers // will be created for concurrent processing. When a sealed block's execution data has been // downloaded, it updates and persists the highest consecutive downloaded height with - // `processedHeight`. That way, if the node crashes, it reads the `processedHeight` and resume - // from `processedHeight + 1`. If the database is empty, rootHeight will be used to init the - // last processed height. Once the execution data is fetched and stored, it notifies + // `processedHeight`. That way, if the node crashes, it reads the `processedHeight` and resumes + // from `processedHeight + 1`. The `processedHeight` must be initialized before being passed to + // this constructor. Once the execution data is fetched and stored, it notifies // `executionDataNotifier`.module/jobqueue/consumer_behavior_test.go (1)
562-573: DoubleInitializecall is safe but worth noting.In
runWithSeatchAhead,newTestConsumerinternally callscp.Initialize(defaultIndex)(line 584), and then line 569 callsprogressInitializer.Initialize(defaultIndex)again to obtain aConsumerProgresshandle for test assertions. This works becauseInitializeis idempotent, but it means the initializer is initialized twice — once to build the consumer, and once to get a reference for the test.A slightly cleaner approach would be to have
newTestConsumerreturn both the consumer and the progress, avoiding the redundant call. That said, this is a test-only concern and the current code is functionally correct.
| exeDataNotifier: engine.NewNotifier(), | ||
| blockIndexedNotifier: engine.NewNotifier(), | ||
| lastProcessedHeight: atomic.NewUint64(initHeight), | ||
| lastProcessedHeight: atomic.NewUint64(initHeight), // TODO(peter): I think this should be processedHeight.ProcessedIndex(), not initHeight |
The existing logic requires an
storage.ConsumerProgressInitializerand a default value, then callsInitialize()in the constructor. This means that all data must be available at construction. Since the data is already available, let's do the initialization within the bootstrap, and pass in the completed object. I find this to be a more logical flow, and allows access to theConsumerProgressearlier in the process so it can be queried to initialize other state within the components.first attempt: #7971
abandoned due to merge conflicts
Summary by CodeRabbit
Refactor
Tests