refactor: msg pool to make more structured part 2#7006
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
🚧 Files skipped from review as they are similar to previous changes (3)
WalkthroughAdds a ChangesMessage Pool State Consolidation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
dd74a63 to
a86d8c4
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/message_pool/msgpool/local_store.rs`:
- Around line 26-28: The add() method currently unconditionally appends
resolved_from to self.local_addrs causing duplicates; change add() to avoid
duplicates by inserting only if the address is not already present (e.g., check
self.local_addrs.read().contains(&resolved_from) or convert local_addrs to a
HashSet and insert), so known_local_addrs() no longer grows per-message and
republish_pending_messages() won't re-resolve the same sender repeatedly; update
any code that assumes a Vec to handle the new container if you switch to
HashSet.
In `@src/message_pool/msgpool/mod.rs`:
- Around line 275-284: The republish trigger is using
RepublishState::mark_republished (which inserts) causing the logic to wake on
new CIDs instead of on CIDs already republished; change the check to a read-only
membership test by calling a new or existing RepublishState::was_republished
(implement it to return republished.contains(cid) without mutating state) and
use that in both loops (the branches around mpool_ctx.remove_from_selected_msgs
and the repub flag) so you only set repub = true when the CID was already in the
republished set.
In `@src/message_pool/msgpool/msg_pool.rs`:
- Around line 485-493: The load_local() implementation iterates
LocalStore::snapshot_msgs() (a HashSet) in non-deterministic order which causes
add() to fail with sequencing errors (SequenceTooLow, NonceGap,
DuplicateSequence) and may silently drop messages; fix by collecting
snapshot_msgs() into a vector, sort it deterministically by sender and
message().sequence before iterating, then call self.add(...) for each; update
the add() error handling in the closure used in load_local() so SequenceTooLow
still triggers local.remove_msg(&k) but other errors are either logged/warned
(including error kind) and left in local_msgs (or retried) rather than silently
ignored, referencing load_local, LocalStore::snapshot_msgs, add,
local.remove_msg, and the Error variants
SequenceTooLow/NonceGap/DuplicateSequence.
In `@src/message_pool/msgpool/republish.rs`:
- Around line 39-44: The trigger() method currently uses
self.trigger.send_async(()).await which can await and block head_change() when
the 4-slot wakeup buffer is full; replace the await send with a non-blocking
self.trigger.try_send(()) and treat a Full error as a no-op (return Ok(()))
because a full buffer already indicates a pending wake, while mapping other
errors into Error::Other with the error details. Keep the function signature and
callers (head_change(), republish_pending_messages()) unchanged; only change
send_async() -> try_send() and handle TrySendError::Full by dropping the signal
and returning Ok(()) while converting other TrySendError variants into the
existing Error::Other format.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 36a9a2fb-5de4-4a0e-85d3-0e9d2a8e6759
📒 Files selected for processing (5)
src/message_pool/msgpool/local_store.rssrc/message_pool/msgpool/mod.rssrc/message_pool/msgpool/msg_pool.rssrc/message_pool/msgpool/republish.rssrc/message_pool/msgpool/selection.rs
Codecov Report❌ Patch coverage is Additional details and impacted files
... and 5 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/message_pool/msgpool/republish.rs (1)
38-42:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
trigger()should treat a full channel as success, not an error.The current implementation maps all
try_senderrors (includingFull) to an error. However, a full buffer simply means the republish task is already scheduled to wake—the signal should be dropped silently rather than failinghead_change().Proposed fix
pub(in crate::message_pool) fn trigger(&self) -> Result<(), Error> { - self.trigger - .try_send(()) - .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}"))) + match self.trigger.try_send(()) { + Ok(()) | Err(flume::TrySendError::Full(())) => Ok(()), + Err(flume::TrySendError::Disconnected(())) => { + Err(Error::Other("Republish receiver dropped".to_owned())) + } + } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/message_pool/msgpool/republish.rs` around lines 38 - 42, The trigger() method currently maps all try_send errors to Error::Other; change it to treat a Full error as success (drop the signal silently) and only return an Err for Disconnected (or other non-Full) failures. Locate the pub(in crate::message_pool) fn trigger(&self) and adjust the try_send error handling so that match/if distinguishes tokio::sync::mpsc::error::TrySendError::Full => Ok(()) and returns Error::Other(...) for the disconnected case, ensuring callers like head_change() no longer fail when the channel buffer is full.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@src/message_pool/msgpool/republish.rs`:
- Around line 38-42: The trigger() method currently maps all try_send errors to
Error::Other; change it to treat a Full error as success (drop the signal
silently) and only return an Err for Disconnected (or other non-Full) failures.
Locate the pub(in crate::message_pool) fn trigger(&self) and adjust the try_send
error handling so that match/if distinguishes
tokio::sync::mpsc::error::TrySendError::Full => Ok(()) and returns
Error::Other(...) for the disconnected case, ensuring callers like head_change()
no longer fail when the channel buffer is full.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 6e5f7773-6a80-40ca-857e-c3ffe9e85a90
📒 Files selected for processing (3)
src/message_pool/msgpool/mod.rssrc/message_pool/msgpool/msg_pool.rssrc/message_pool/msgpool/republish.rs
…same message again
48da6f8 to
d353923
Compare
I believe this has been done. We now have |
Summary of changes
Changes introduced in this pull request:
This PR is part 2 of restructuring of msg pool, it contains:
This change should be applied on top of the refactor: msg pool to make more structured #6965
Next part will have major changes:
MessagePoolitself rather than each individual field, this will allow us to:headchangetrigger which will become part of theMessagePool, instead of being a free function with unlimited paramsMessagePool, instead of being a free function with unlimited paramsReference issue to close (if applicable)
Part of #7010
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit