refactor: msg pool to make more structured part 3#7033
Conversation
…same message again
…elpers to methods, share via Arc
WalkthroughThis PR restructures the MessagePool internals by converting standalone helper functions into methods backed by MessagePool-owned caches, renaming pending_store to pending, introducing explicit message validation/insertion pipelines, implementing apply_head_change for chain reorg handling, and adding run_republish_cycle for centralized republish management. ChangesMessagePool Refactoring and Reorg/Republish Flow
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Warning Review ran into problems🔥 ProblemsStopped waiting for pipeline failures after 30000ms. One of your pipelines takes longer than our 30000ms fetch window to run, so review may not consider pipeline-failure results for inline comments if any failures occurred after the fetch window. Increase the timeout if you want to wait longer or run a Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Comment |
48da6f8 to
d353923
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/message_pool/msgpool/selection.rs (1)
653-662:⚠️ Potential issue | 🟠 Major | ⚡ Quick winKeep
run_head_changeside-effect free.
run_head_changeis documented as a simulation, but this change now passes&self.pendinginto it and the fallback path still callspending_store.remove(...). Any miss inrmsgscan therefore mutate the live mpool during selection, which is especially risky when address resolution is needed.Suggested fix
run_head_change( self.api.as_ref(), &self.caches.bls_sig, - &self.pending, &self.caches.key, cur_ts.clone(), ts.clone(), &mut result, )?; @@ pub(in crate::message_pool) fn run_head_change<T>( api: &T, bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>, - pending_store: &PendingStore, key_cache: &IdToAddressCache, from: Tipset, to: Tipset, rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>, ) -> Result<(), Error> @@ fn remove_applied_from_pool<T: Provider>( api: &T, key_cache: &IdToAddressCache, - pending_store: &PendingStore, ts: &Tipset, from: &Address, sequence: u64, rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>, ) -> Result<(), Error> { @@ if rmsgs .get_mut(from) .and_then(|temp| temp.remove(&sequence)) .is_none() && let Ok(resolved) = resolve_to_key(api, key_cache, from, ts) .inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}")) { - let _ = pending_store.remove(&resolved, sequence, true); + let _ = rmsgs + .get_mut(&resolved) + .and_then(|temp| temp.remove(&sequence)); } Ok(()) }Also applies to: 884-905
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/message_pool/msgpool/selection.rs` around lines 653 - 662, run_head_change is meant to be a pure simulation but currently receives a reference to the live pending set (self.pending) and the fallback path still calls pending_store.remove(...) which can mutate the live mpool; change the call sites of run_head_change (the call passing self.pending and the analogous call around lines 884-905) to pass an isolated copy or read-only snapshot of the pending data (e.g., clone the PendingStore or clone the rmsgs collection) so the simulation only operates on the clone, and update the fallback logic to remove from the real pending store only after the simulation decides removals are required; ensure functions and variables referenced include run_head_change, self.pending, pending_store.remove, and rmsgs so you locate and replace direct uses with the cloned/snapshot variants.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/message_pool/msgpool/msg_pool.rs`:
- Around line 247-251: The add(...) method currently calls add_to_pool(msg,
false, TrustPolicy::Trusted) for gossip inserts; change it to use
TrustPolicy::Untrusted so externally received gossip messages are subject to
admission limits and untrusted caps/gap rules. Locate the add function and
replace the TrustPolicy argument passed to add_to_pool with
TrustPolicy::Untrusted, ensuring the call signature (add_to_pool(msg, false,
...)) remains unchanged.
In `@src/message_pool/msgpool/reorg.rs`:
- Around line 41-43: The code currently swallows failures from calls like
self.api.load_tipset(ts.parents()), block message fetches, and reinsertion by
logging and continuing, which leaves apply_head_change partially applied while
still returning Ok(()); change those branches to propagate errors (return
Err(...)) instead of just tracing::error + continue. Locate the calls to
load_tipset, fetch_block_messages (or similar block-msg fetch functions), and
any reinsert/reapply paths inside the reorg processing routine (e.g., where
apply_head_change is invoked) and replace the log+continue behavior with early
returns that convert the underlying failure into a suitable error value returned
by the surrounding function so the caller can detect partial failure. Ensure the
error returned includes context about which operation failed (loading parent
tipset, fetching block messages, or reinserting) and references the tipset or
block identifier for easier debugging.
In `@src/message_pool/msgpool/republish.rs`:
- Around line 177-184: The bubble-down loop in republish.rs uses j for
comparisons but erroneously swaps using i and i + 1, preventing proper bubbling;
update the swap call in the loop (the line with chains.key_vec.swap) to swap j
and j + 1 (i.e., chains.key_vec.swap(j, j + 1)) so the element being compared
actually moves down the list in the while loop that checks
chains[j].compare(&chains[j + 1]).
---
Outside diff comments:
In `@src/message_pool/msgpool/selection.rs`:
- Around line 653-662: run_head_change is meant to be a pure simulation but
currently receives a reference to the live pending set (self.pending) and the
fallback path still calls pending_store.remove(...) which can mutate the live
mpool; change the call sites of run_head_change (the call passing self.pending
and the analogous call around lines 884-905) to pass an isolated copy or
read-only snapshot of the pending data (e.g., clone the PendingStore or clone
the rmsgs collection) so the simulation only operates on the clone, and update
the fallback logic to remove from the real pending store only after the
simulation decides removals are required; ensure functions and variables
referenced include run_head_change, self.pending, pending_store.remove, and
rmsgs so you locate and replace direct uses with the cloned/snapshot variants.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: cc44e6a6-4cb1-4105-b805-df5f487a3866
📒 Files selected for processing (13)
src/message_pool/config.rssrc/message_pool/errors.rssrc/message_pool/msgpool/mod.rssrc/message_pool/msgpool/msg_pool.rssrc/message_pool/msgpool/msg_set.rssrc/message_pool/msgpool/reorg.rssrc/message_pool/msgpool/republish.rssrc/message_pool/msgpool/selection.rssrc/message_pool/msgpool/utils.rssrc/message_pool/nonce_tracker.rssrc/rpc/methods/eth.rssrc/rpc/methods/gas.rssrc/utils/cache/lru.rs
💤 Files with no reviewable changes (3)
- src/message_pool/config.rs
- src/message_pool/errors.rs
- src/utils/cache/lru.rs
Summary of changes
Changes introduced in this pull request:
This PR is part 3 of restructuring of msg pool, it contains:
republish_cyclepart of the MessagePool, instead of being a free function with unlimited paramshead_change->apply_head_changerepublish_pending_messages->run_republish_cycleadd_helper->add_to_pool_uncheckedadd_tipset->add_to_poolRepublishState: groups the republished CID set + the early-wake trigger channelvalidate_static/validate_signature/validate_with_state)Error::MessageValueTooHighvariant (the same check is already performed byvalid_for_block_inclusion).Reference issue to close (if applicable)
Closes
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit
Refactor
Bug Fixes