Skip to content

feat: impl broadcast channel#90

Merged
tisonkun merged 16 commits intomainfrom
broadcast-channel
Dec 8, 2025
Merged

feat: impl broadcast channel#90
tisonkun merged 16 commits intomainfrom
broadcast-channel

Conversation

@tisonkun
Copy link
Copy Markdown
Collaborator

@tisonkun tisonkun commented Dec 7, 2025

This closes #88

I'd first implement an overflow policy to overwrite the oldest elements and let receivers be lagged.

We may later add more strategies and shift down the current impl to mea::broadcast::overflow::channel. But let's avoid to do that unless we find it really needed.

Besides, we can change the channel argument capacity to NonZeroUsize when we are making breaking changes.

Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Comment thread mea/src/broadcast/tests.rs Outdated
Comment on lines +182 to +183
// FIXME: wrapping just hit the head, but this should be lagged
assert_eq!(rx2.recv().await, Ok(4));
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to figure out what to do with this case. May or may not be able to work it out.

Perhaps a better "version" strategy and a proper forward solution.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the new note. Always use u64 as the version field so it won't be overflow in a reasonable real-world use case.

Comment thread mea/src/broadcast/mod.rs Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a multi-producer, multi-consumer broadcast channel for the mea library. The implementation follows a ring buffer design with atomic operations for lock-free sending and receiver lag detection.

Key Changes:

  • Added broadcast module with Sender and Receiver types supporting multiple producers and consumers
  • Implemented lag detection that reports missed messages when receivers fall behind
  • Added internal RwLock wrapper to handle poison errors consistently

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
mea/src/broadcast/mod.rs Core broadcast channel implementation with send/recv operations and error types
mea/src/broadcast/tests.rs Comprehensive test suite covering basic operations, lag handling, overflow, and edge cases
mea/src/internal/rwlock.rs New internal RwLock wrapper that unwraps poison errors for use in the broadcast buffer
mea/src/internal/mod.rs Exports the new RwLock wrapper for internal use
mea/src/lib.rs Integrates broadcast module into library public API and trait tests
README.md Documents the new broadcast::channel feature in the features list
AGENTS.md Adds project context documentation for AI agents working on the codebase

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread mea/src/broadcast/mod.rs Outdated
Comment thread mea/src/broadcast/mod.rs Outdated
Comment thread mea/src/broadcast/mod.rs Outdated
Comment thread mea/src/broadcast/mod.rs
Comment thread mea/src/broadcast/mod.rs Outdated
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread mea/src/broadcast/mod.rs
Comment thread mea/src/broadcast/mod.rs Outdated
Comment thread mea/src/broadcast/mod.rs Outdated
Comment thread mea/src/broadcast/tests.rs
Comment thread mea/src/broadcast/tests.rs
Comment thread mea/src/lib.rs Outdated
Comment thread mea/src/lib.rs Outdated
Signed-off-by: tison <wander4096@gmail.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread mea/src/internal/rwlock.rs
Signed-off-by: tison <wander4096@gmail.com>
@tisonkun tisonkun enabled auto-merge (squash) December 8, 2025 01:39
@tisonkun tisonkun merged commit 554e293 into main Dec 8, 2025
9 checks passed
@tisonkun tisonkun deleted the broadcast-channel branch December 8, 2025 01:41
Copy link
Copy Markdown
Contributor

@orthur2 orthur2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can submit a fix PR later.

Comment thread mea/src/broadcast/mod.rs
// wrapping). We treat this as a lag.
drop(slot);

let missed = tail.wrapping_sub(self.head).wrapping_sub(cap);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If slot.version != head (which means a new message was sent), we must reload the latest tail. Otherwise, missed can underflow and head will also be incorrect. I think we need to re-read tail, recompute the diff, and re-check the condition.

Comment thread mea/src/broadcast/mod.rs
/// tx.send(10);
/// assert_eq!(rx.try_recv(), Ok(10));
/// ```
pub fn send(&self, msg: T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In send function, we fetch_add(1) to bump tail_cnt and effectively publish a new position before writing the slot’s data/version. During that window a receiver can observe the new tail_cnt (diff > 0), read the slot, and find it hasn’t been written yet (old version or msg still None), which is currently treated as overwritten and can trigger errors. Would writing the slot first and only then publishing the new position be better?

Copy link
Copy Markdown
Contributor

@orthur2 orthur2 Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking more carefully, I believe my previous idea might have been wrong. Writing the slot first and then updating tail_cnt is not a good improvement. Because the implementation becomes more complex, and it also introduce blocking.
A more elegant solution I can thinkof is to keep the sender logic unchanged, and instead handle the “data not ready yet” case on the receiver side. By comparing slot.version with head, we can distinguish three states: slot.version < head means the slot hasn’t been written yet, slot.version == head is the readable case, and slot.version > head is the real lagged case. To avoid overflow issues, the comparison should use a wrapping diff (e.g., version_diff = head.wrapping_sub(slot.version) ) rather than direct < and >.”

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support broadcast channel

3 participants