Conversation
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
| // FIXME: wrapping just hit the head, but this should be lagged | ||
| assert_eq!(rx2.recv().await, Ok(4)); |
There was a problem hiding this comment.
We need to figure out what to do with this case. May or may not be able to work it out.
Perhaps a better "version" strategy and a proper forward solution.
There was a problem hiding this comment.
See the new note. Always use u64 as the version field so it won't be overflow in a reasonable real-world use case.
There was a problem hiding this comment.
Pull request overview
This PR implements a multi-producer, multi-consumer broadcast channel for the mea library. The implementation follows a ring buffer design with atomic operations for lock-free sending and receiver lag detection.
Key Changes:
- Added
broadcastmodule withSenderandReceivertypes supporting multiple producers and consumers - Implemented lag detection that reports missed messages when receivers fall behind
- Added internal
RwLockwrapper to handle poison errors consistently
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| mea/src/broadcast/mod.rs | Core broadcast channel implementation with send/recv operations and error types |
| mea/src/broadcast/tests.rs | Comprehensive test suite covering basic operations, lag handling, overflow, and edge cases |
| mea/src/internal/rwlock.rs | New internal RwLock wrapper that unwraps poison errors for use in the broadcast buffer |
| mea/src/internal/mod.rs | Exports the new RwLock wrapper for internal use |
| mea/src/lib.rs | Integrates broadcast module into library public API and trait tests |
| README.md | Documents the new broadcast::channel feature in the features list |
| AGENTS.md | Adds project context documentation for AI agents working on the codebase |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: tison <wander4096@gmail.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
orthur2
left a comment
There was a problem hiding this comment.
I can submit a fix PR later.
| // wrapping). We treat this as a lag. | ||
| drop(slot); | ||
|
|
||
| let missed = tail.wrapping_sub(self.head).wrapping_sub(cap); |
There was a problem hiding this comment.
If slot.version != head (which means a new message was sent), we must reload the latest tail. Otherwise, missed can underflow and head will also be incorrect. I think we need to re-read tail, recompute the diff, and re-check the condition.
| /// tx.send(10); | ||
| /// assert_eq!(rx.try_recv(), Ok(10)); | ||
| /// ``` | ||
| pub fn send(&self, msg: T) { |
There was a problem hiding this comment.
In send function, we fetch_add(1) to bump tail_cnt and effectively publish a new position before writing the slot’s data/version. During that window a receiver can observe the new tail_cnt (diff > 0), read the slot, and find it hasn’t been written yet (old version or msg still None), which is currently treated as overwritten and can trigger errors. Would writing the slot first and only then publishing the new position be better?
There was a problem hiding this comment.
After thinking more carefully, I believe my previous idea might have been wrong. Writing the slot first and then updating tail_cnt is not a good improvement. Because the implementation becomes more complex, and it also introduce blocking.
A more elegant solution I can thinkof is to keep the sender logic unchanged, and instead handle the “data not ready yet” case on the receiver side. By comparing slot.version with head, we can distinguish three states: slot.version < head means the slot hasn’t been written yet, slot.version == head is the readable case, and slot.version > head is the real lagged case. To avoid overflow issues, the comparison should use a wrapping diff (e.g., version_diff = head.wrapping_sub(slot.version) ) rather than direct < and >.”
This closes #88
I'd first implement an overflow policy to overwrite the oldest elements and let receivers be lagged.
We may later add more strategies and shift down the current impl to
mea::broadcast::overflow::channel. But let's avoid to do that unless we find it really needed.Besides, we can change the
channelargumentcapacitytoNonZeroUsizewhen we are making breaking changes.