Skip to content

Refactor jobqueue to require initialized progress consumer - take 2#8404

Merged
peterargue merged 5 commits intomasterfrom
peter/refactor-component-consumers-v2
Feb 11, 2026
Merged

Refactor jobqueue to require initialized progress consumer - take 2#8404
peterargue merged 5 commits intomasterfrom
peter/refactor-component-consumers-v2

Conversation

@peterargue
Copy link
Contributor

@peterargue peterargue commented Feb 9, 2026

The existing logic requires an storage.ConsumerProgressInitializer and a default value, then calls Initialize() in the constructor. This means that all data must be available at construction. Since the data is already available, let's do the initialization within the bootstrap, and pass in the completed object. I find this to be a more logical flow, and allows access to the ConsumerProgress earlier in the process so it can be queried to initialize other state within the components.

first attempt: #7971
abandoned due to merge conflicts

Summary by CodeRabbit

  • Refactor

    • Progress trackers are now initialized eagerly and passed as ready objects into components instead of initializer placeholders, and initialization errors are propagated immediately to improve startup robustness.
    • Constructors and consumers now accept prepared progress objects, simplifying runtime wiring.
  • Tests

    • Tests updated to initialize progress explicitly and assert initialization success; mocks and expectations adjusted to reflect the new initialization flow.

@peterargue peterargue requested a review from a team as a code owner February 9, 2026 17:50
@github-actions
Copy link
Contributor

github-actions bot commented Feb 9, 2026

Dependency Review

✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.

Scanned Files

None

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 9, 2026

📝 Walkthrough

Walkthrough

Replace ConsumerProgressInitializer usages with fully-initialized storage.ConsumerProgress across builders, engines, jobqueue, indexer, verification consumers, and tests; call Initialize(...) earlier, add error propagation, and update function signatures and call sites accordingly.

Changes

Cohort / File(s) Summary
Node builders
cmd/access/node_builder/access_node_builder.go, cmd/observer/node_builder/observer_builder.go, cmd/verification_builder.go
Rename progress variables to initializer names, call .Initialize(...) early using builder/sealed heights, propagate initialization errors, and pass initialized storage.ConsumerProgress into downstream components.
Access ingestion (engines & tests)
engine/access/ingestion/engine.go, engine/access/ingestion/engine_test.go, engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go, engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go
Constructor parameters changed to accept storage.ConsumerProgress; processed heights are initialized before engine construction; tests updated to call .Initialize(...) with error checks and some sealed-root init args removed.
Ingestion2 / Finalized block processing
engine/access/ingestion2/finalized_block_processor.go, engine/access/ingestion2/engine_test.go
NewFinalizedBlockProcessor now takes storage.ConsumerProgress (not initializer); removed local finalized block retrieval for init height; added error handling around jobqueue consumer creation; tests initialize progress first.
Verification consumers & tests
engine/verification/assigner/blockconsumer/consumer.go, engine/verification/assigner/blockconsumer/consumer_test.go, engine/verification/fetcher/chunkconsumer/consumer.go, engine/verification/fetcher/chunkconsumer/consumer_test.go
Consumer constructors accept prepared storage.ConsumerProgress (not initializer). NewBlockConsumer return signature simplified (removed returned default index). Call sites and tests updated to .Initialize(...) and adapt to new returns.
Jobqueue module & tests
module/jobqueue/consumer.go, module/jobqueue/component_consumer.go, module/jobqueue/*_test.go, module/jobqueue/consumer_behavior_test.go
NewConsumer/NewComponentConsumer signatures accept prepared storage.ConsumerProgress and drop defaultIndex; removed initializer-to-progress creation inside NewConsumer; tests updated to initialize progress before passing it.
State synchronization (indexer & requester)
module/state_synchronization/indexer/indexer.go, module/state_synchronization/indexer/indexer_test.go, module/state_synchronization/requester/execution_data_requester.go, module/state_synchronization/requester/execution_data_requester_test.go
NewIndexer and execution data requester accept storage.ConsumerProgress directly; constructors remove initialBlockHeight/default-index args; tests create and initialize progress objects up-front and check errors.
Tests, utilities, mocks
engine/testutil/mock/nodes.go, engine/testutil/nodes.go, engine/access/access_test.go, engine/access/ingestion2/*_test.go, engine/access/ingestion/*_test.go
Mock/struct fields updated to storage.ConsumerProgress; replaced initializer assignments with .Initialize(...) calls and error checks in test/setup code; adapted call sites to new signatures and return shapes.
Module API adjustments / Manifest
module/state_synchronization/requester/execution_data_requester.go, module/jobqueue/*, go.mod
Multiple public/internal function signatures changed from storage.ConsumerProgressInitializer to storage.ConsumerProgress; removed defaultIndex/init-height parameters across jobqueue, indexer, requester, and various consumer constructors; manifest/go.mod updated accordingly.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Suggested reviewers

  • janezpodhostnik
  • zhangchiqing
  • fxamacker

Poem

🐇
I hopped through code at break of day,
Initialized progress all the way.
No lazy starts, no hidden fuss—
Consumers ready, errors caught for us.
A tiny rabbit cheers: progress, hooray!

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 68.75% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main refactoring: changing jobqueue to require pre-initialized progress consumers instead of initializing them in constructors.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch peter/refactor-component-consumers-v2

No actionable comments were generated in the recent review. 🎉


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-commenter
Copy link

codecov-commenter commented Feb 9, 2026

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
module/state_synchronization/indexer/indexer.go (1)

73-84: ⚠️ Potential issue | 🟡 Minor

The TODO on line 80 warrants attention: initHeight vs processedHeight.ProcessedIndex().

The lastProcessedHeight is initialized with the initHeight parameter, but after this refactor, processedHeight is an already-initialized ConsumerProgress whose ProcessedIndex() could differ from initHeight. If they diverge, onBlockIndexed() (line 156–178) could either re-notify already-processed heights or skip some. The same concern applies to NewProcessedHeightRecorderManager(initHeight) on line 83.

Since this is flagged by the author's own TODO, consider resolving it in this PR or creating a follow-up issue — especially now that processedHeight.ProcessedIndex() is readily accessible at construction time.

Would you like me to open an issue to track resolving this initHeight vs processedHeight.ProcessedIndex() discrepancy?

🤖 Fix all issues with AI agents
In `@engine/verification/assigner/blockconsumer/consumer.go`:
- Around line 30-40: The doc comment for NewBlockConsumer is stale (it mentions
returning "the default processed index"); update the comment to accurately
describe that NewBlockConsumer constructs and returns a *BlockConsumer and an
error (not a processed index). Locate the comment immediately above the
NewBlockConsumer function and replace the two-line description with a short
summary like "NewBlockConsumer creates and initializes a BlockConsumer and
returns the consumer or an error" (keeping reference to NewBlockConsumer and its
actual return values).
🧹 Nitpick comments (3)
engine/access/access_test.go (1)

1286-1292: Minor style inconsistency: two-step vs chained initialization.

In TestGetSealedTransaction, the initialization is chained (e.g., store.NewConsumerProgress(...).Initialize(...)), whereas here in TestExecuteScript a two-step approach is used (assign initializer first, then call .Initialize()). Both are correct, but you may want to pick one style for consistency across the test file.

module/state_synchronization/requester/execution_data_requester.go (1)

178-183: Stale comment: initialization no longer happens inside this constructor.

The comment on line 181 says "rootHeight will be used to init the last processed height," but initialization now occurs upstream before the progress is passed in. Consider updating this comment to reflect the new behavior.

📝 Suggested comment update
 	// blockConsumer ensures every sealed block's execution data is downloaded.
 	// It listens to block finalization events from `finalizationNotifier`, then checks if there
 	// are new sealed blocks with `sealedBlockReader`. If there are, it starts workers to process
 	// them with `processingBlockJob`, which fetches execution data. At most `fetchWorkers` workers
 	// will be created for concurrent processing. When a sealed block's execution data has been
 	// downloaded, it updates and persists the highest consecutive downloaded height with
-	// `processedHeight`. That way, if the node crashes, it reads the `processedHeight` and resume
-	// from `processedHeight + 1`. If the database is empty, rootHeight will be used to init the
-	// last processed height. Once the execution data is fetched and stored, it notifies
+	// `processedHeight`. That way, if the node crashes, it reads the `processedHeight` and resumes
+	// from `processedHeight + 1`. The `processedHeight` must be initialized before being passed to
+	// this constructor. Once the execution data is fetched and stored, it notifies
 	// `executionDataNotifier`.
module/jobqueue/consumer_behavior_test.go (1)

562-573: Double Initialize call is safe but worth noting.

In runWithSeatchAhead, newTestConsumer internally calls cp.Initialize(defaultIndex) (line 584), and then line 569 calls progressInitializer.Initialize(defaultIndex) again to obtain a ConsumerProgress handle for test assertions. This works because Initialize is idempotent, but it means the initializer is initialized twice — once to build the consumer, and once to get a reference for the test.

A slightly cleaner approach would be to have newTestConsumer return both the consumer and the progress, avoiding the redundant call. That said, this is a test-only concern and the current code is functionally correct.

Copy link
Member

@zhangchiqing zhangchiqing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG

exeDataNotifier: engine.NewNotifier(),
blockIndexedNotifier: engine.NewNotifier(),
lastProcessedHeight: atomic.NewUint64(initHeight),
lastProcessedHeight: atomic.NewUint64(initHeight), // TODO(peter): I think this should be processedHeight.ProcessedIndex(), not initHeight
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, makes sense

@peterargue peterargue added this pull request to the merge queue Feb 11, 2026
Merged via the queue into master with commit fb82eb0 Feb 11, 2026
61 checks passed
@peterargue peterargue deleted the peter/refactor-component-consumers-v2 branch February 11, 2026 18:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants