Skip to content

Commit 45819a3

Browse files
authored
Merge pull request #5 from tyranron/support-futures
Support Future and Stream traits
2 parents daf93b5 + f2a85b1 commit 45819a3

5 files changed

Lines changed: 164 additions & 5 deletions

File tree

.github/workflows/rust.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ jobs:
1010
steps:
1111
- uses: actions/checkout@v1
1212
- name: Build
13-
run: cargo build --verbose
13+
run: cargo build --all-features --verbose
1414
- name: Run tests
15-
run: cargo test --verbose
15+
run: cargo test --all-features --verbose

Cargo.toml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[package]
22
name = "send_wrapper"
3-
version = "0.4.0"
3+
version = "0.5.0"
4+
edition = "2018"
45
authors = ["Thomas Keh"]
56
license = "MIT/Apache-2.0"
67
description = """
@@ -13,3 +14,13 @@ readme = "README.md"
1314
repository = "https://github.com/thk1/send_wrapper"
1415
documentation = "https://docs.rs/send_wrapper"
1516
categories = ["rust-patterns"]
17+
18+
[features]
19+
futures = ["futures-core"]
20+
21+
[dependencies]
22+
futures-core = { version = "0.3", optional = true }
23+
24+
[dev-dependencies]
25+
futures-executor = "0.3"
26+
futures-util = "0.3"

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ can leave the main thread at all, like required in the given
1717

1818
[Documentation](https://docs.rs/send_wrapper)
1919

20+
21+
22+
2023
# Examples
2124

2225
```rust
@@ -64,12 +67,48 @@ let value = wrapped_value.deref();
6467
// let mut value: &mut NonSendType = &mut wrapped_value;
6568
```
6669

70+
71+
## Using with `Future`
72+
73+
To use `SendWrapper` on `Future`s, you should enable `futures` Cargo feature first:
74+
```toml
75+
send_wrapper = { version = "0.5", features = ["futures"] }
76+
```
77+
78+
And then, just wrap your `Future` (or `Stream`):
79+
```rust
80+
use futures::{executor, future::{self, BoxFuture}};
81+
use send_wrapper::SendWrapper;
82+
83+
// `Rc` is a `!Send` type,
84+
let value = Rc::new(42);
85+
// so this `Future` is `!Send` too as increments `Rc`'s inner counter.
86+
let future = future::lazy(|_| value.clone());
87+
88+
// We now wrap the `future` with `SendWrapper` (value is moved inside),
89+
let wrapped_future = SendWrapper::new(future);
90+
// so now it's `Send` + `Sync` (`BoxFuture` trait object contains `Send` requirement).
91+
let boxed_future: BoxFuture<_> = Box::pin(wrapped_future);
92+
93+
let t = thread::spawn(move || {
94+
// This would panic (because `future` is polled in wrong thread):
95+
// executor::block_on(boxed_future)
96+
});
97+
```
98+
99+
100+
101+
67102
# License
68103

69104
`send_wrapper` is distributed under the terms of both the MIT license and the Apache License (Version 2.0).
70105

71106
See LICENSE-APACHE, and LICENSE-MIT for details.
72107

108+
109+
110+
111+
73112
[Rust]: https://www.rust-lang.org
74113
[`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
75114
[`gtk-rs`]: http://gtk-rs.org/

src/futures.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
//! [`Future`] and [`Stream`] support for [`SendWrapper`].
2+
//!
3+
//! [`Future`]: std::future::Future
4+
//! [`Stream`]: futures_core::Stream
5+
6+
use std::{
7+
future::Future,
8+
ops::{Deref as _, DerefMut as _},
9+
pin::Pin,
10+
task,
11+
};
12+
13+
use futures_core::Stream;
14+
15+
use super::SendWrapper;
16+
17+
const POLL_ERROR: &'static str =
18+
"Polling SendWrapper<T> variable from a thread different to the one it has been created with.";
19+
20+
impl<F: Future> Future for SendWrapper<F> {
21+
type Output = F::Output;
22+
23+
/// Polls this [`SendWrapper`] [`Future`].
24+
///
25+
/// # Panics
26+
/// Polling panics if it is done from a different thread than the one the [`SendWrapper`]
27+
/// instance has been created with.
28+
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
29+
if !self.valid() {
30+
panic!(POLL_ERROR);
31+
}
32+
// This is safe as `SendWrapper` itself points to the inner `Future`.
33+
// So, as long as `SendWrapper` is pinned, the inner `Future` is pinned too.
34+
unsafe { self.map_unchecked_mut(Self::deref_mut) }.poll(cx)
35+
}
36+
}
37+
38+
impl<S: Stream> Stream for SendWrapper<S> {
39+
type Item = S::Item;
40+
41+
/// Polls this [`SendWrapper`] [`Stream`].
42+
///
43+
/// # Panics
44+
/// Polling panics if it is done from a different thread than the one the [`SendWrapper`]
45+
/// instance has been created with.
46+
fn poll_next(
47+
self: Pin<&mut Self>,
48+
cx: &mut task::Context<'_>,
49+
) -> task::Poll<Option<Self::Item>> {
50+
if !self.valid() {
51+
panic!(POLL_ERROR);
52+
}
53+
// This is safe as `SendWrapper` itself points to the inner `Stream`.
54+
// So, as long as `SendWrapper` is pinned, the inner `Stream` is pinned too.
55+
unsafe { self.map_unchecked_mut(Self::deref_mut) }.poll_next(cx)
56+
}
57+
58+
#[inline]
59+
fn size_hint(&self) -> (usize, Option<usize>) {
60+
self.deref().size_hint()
61+
}
62+
}
63+
64+
#[cfg(test)]
65+
mod tests {
66+
use std::thread;
67+
68+
use futures_executor as executor;
69+
use futures_util::{future, stream, StreamExt};
70+
71+
use crate::SendWrapper;
72+
73+
#[test]
74+
fn test_future() {
75+
let w1 = SendWrapper::new(future::ready(42));
76+
let w2 = w1.clone();
77+
assert_eq!(
78+
format!("{:?}", executor::block_on(w1)),
79+
format!("{:?}", executor::block_on(w2)),
80+
);
81+
}
82+
83+
#[test]
84+
fn test_future_panic() {
85+
let w = SendWrapper::new(future::ready(42));
86+
let t = thread::spawn(move || executor::block_on(w));
87+
assert!(t.join().is_err());
88+
}
89+
90+
#[test]
91+
fn test_stream() {
92+
let mut w1 = SendWrapper::new(stream::once(future::ready(42)));
93+
let mut w2 = SendWrapper::new(stream::once(future::ready(42)));
94+
assert_eq!(
95+
format!("{:?}", executor::block_on(w1.next())),
96+
format!("{:?}", executor::block_on(w2.next())),
97+
);
98+
}
99+
100+
#[test]
101+
fn test_stream_panic() {
102+
let mut w = SendWrapper::new(stream::once(future::ready(42)));
103+
let t = thread::spawn(move || executor::block_on(w.next()));
104+
assert!(t.join().is_err());
105+
}
106+
}

src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@
8080
//! [`GTK+`]: https://www.gtk.org/
8181
//! [using `Glib`]: http://gtk-rs.org/docs/glib/source/fn.idle_add.html
8282
83+
#[cfg(feature = "futures")]
84+
pub mod futures;
85+
8386
use std::fmt;
8487
use std::marker::Send;
8588
use std::ops::{Deref, DerefMut, Drop};
@@ -232,13 +235,13 @@ impl<T: Clone> Clone for SendWrapper<T> {
232235

233236
#[cfg(test)]
234237
mod tests {
235-
236238
use std::ops::Deref;
237239
use std::rc::Rc;
238240
use std::sync::mpsc::channel;
239241
use std::sync::Arc;
240242
use std::thread;
241-
use SendWrapper;
243+
244+
use super::SendWrapper;
242245

243246
#[test]
244247
fn test_deref() {

0 commit comments

Comments
 (0)