Skip to content

Commit f30ba82

Browse files
committed
Add a method to learn about cancel on Complete
This commits adds a `poll_cancel` method to learn about when the `Oneshot` half of a complete/oneshot pair has gone away. This can then be used to detect when a computation is no longer wanted. Closes rust-lang#63
1 parent df0e550 commit f30ba82

2 files changed

Lines changed: 141 additions & 1 deletion

File tree

src/oneshot.rs

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use std::sync::Arc;
2+
use std::sync::atomic::{AtomicBool, Ordering};
23

34
use {Future, Poll};
45
use slot::{Slot, Token};
5-
use task;
6+
use lock::Lock;
7+
use task::{self, TaskHandle};
68

79
/// A future representing the completion of a computation happening elsewhere in
810
/// memory.
@@ -24,6 +26,8 @@ pub struct Complete<T> {
2426

2527
struct Inner<T> {
2628
slot: Slot<Option<T>>,
29+
oneshot_gone: AtomicBool,
30+
notify_cancel: Lock<Option<TaskHandle>>,
2731
}
2832

2933
/// Creates a new in-memory oneshot used to represent completing a computation.
@@ -55,6 +59,8 @@ struct Inner<T> {
5559
pub fn oneshot<T>() -> (Complete<T>, Oneshot<T>) {
5660
let inner = Arc::new(Inner {
5761
slot: Slot::new(None),
62+
oneshot_gone: AtomicBool::new(false),
63+
notify_cancel: Lock::new(None),
5864
});
5965
let oneshot = Oneshot {
6066
inner: inner.clone(),
@@ -78,6 +84,57 @@ impl<T> Complete<T> {
7884
self.send(Some(t))
7985
}
8086

87+
/// Polls this `Complete` half to detect whether the `Oneshot` this has
88+
/// paired with has gone away.
89+
///
90+
/// This function can be used to learn about when the `Oneshot` (consumer)
91+
/// half has gone away and nothing will be able to receive a message sent
92+
/// from `complete`.
93+
///
94+
/// Like `Future::poll`, this function will panic if it's not called from
95+
/// within the context of a task. In otherwords, this should only ever be
96+
/// called from inside another future.
97+
///
98+
/// If `Poll::Ok` is returned then it means that the `Oneshot` has
99+
/// disappeared and the result this `Complete` would otherwise produce
100+
/// should no longer be produced.
101+
///
102+
/// If `Poll::NotReady` is returned then the `Oneshot` is still alive and
103+
/// may be able to receive a message if sent. The current task, however,
104+
/// is scheduled to receive a notification if the corresponding `Oneshot`
105+
/// goes away.
106+
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
107+
// Fast path up first, just read the flag and see if our other half is
108+
// gone.
109+
if self.inner.oneshot_gone.load(Ordering::SeqCst) {
110+
return Poll::Ok(())
111+
}
112+
113+
// If our other half is not gone then we need to park our current task
114+
// and move it into the `notify_cancel` slot to get notified when it's
115+
// actually gone.
116+
//
117+
// If `try_lock` fails, then the `Oneshot` is in the process of using
118+
// it, so we can deduce that it's now in the process of going away and
119+
// hence we're canceled. If it succeeds then we just store our handle.
120+
//
121+
// Crucially we then check `oneshot_gone` *again* before we return.
122+
// While we were storing our handle inside `notify_cancel` the `Oneshot`
123+
// may have been dropped. The first thing it does is set the flag, and
124+
// if it fails to acquire the lock it assumes that we'll see the flag
125+
// later on. So... we then try to see the flag later on!
126+
let handle = task::park();
127+
match self.inner.notify_cancel.try_lock() {
128+
Some(mut p) => *p = Some(handle),
129+
None => return Poll::Ok(()),
130+
}
131+
if self.inner.oneshot_gone.load(Ordering::SeqCst) {
132+
Poll::Ok(())
133+
} else {
134+
Poll::NotReady
135+
}
136+
}
137+
81138
fn send(&mut self, t: Option<T>) {
82139
if let Err(e) = self.inner.slot.try_produce(t) {
83140
self.inner.slot.on_empty(Some(e.into_inner()), |slot, item| {
@@ -125,8 +182,27 @@ impl<T> Future for Oneshot<T> {
125182

126183
impl<T> Drop for Oneshot<T> {
127184
fn drop(&mut self) {
185+
// First up, if we squirreled away a task to get notified once the
186+
// oneshot was filled in, we cancel that notification. We'll never end
187+
// up actually receiving data (as we're being dropped) so no need to
188+
// hold onto the task.
128189
if let Some(cancel_token) = self.cancel_token.take() {
129190
self.inner.slot.cancel(cancel_token)
130191
}
192+
193+
// Next up, inform the `Complete` half that we're going away. First up
194+
// we flag ourselves as gone, and next we'll attempt to wake up any
195+
// handle that was stored.
196+
//
197+
// If we fail to acquire the lock on the handle, that means that a
198+
// `Complete` is in the process of storing one, and it'll check
199+
// `oneshot_gone` on its way out to see our write here.
200+
self.inner.oneshot_gone.store(true, Ordering::SeqCst);
201+
if let Some(mut handle) = self.inner.notify_cancel.try_lock() {
202+
if let Some(task) = handle.take() {
203+
drop(handle);
204+
task.unpark()
205+
}
206+
}
131207
}
132208
}

tests/oneshot.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
extern crate futures;
2+
3+
use std::sync::mpsc::{channel, Sender};
4+
use std::thread;
5+
6+
use futures::{oneshot, Complete, Future, Poll};
7+
use futures::task::Task;
8+
9+
#[test]
10+
fn smoke_poll() {
11+
let (mut tx, rx) = oneshot::<u32>();
12+
Task::new().enter(|| {
13+
assert!(tx.poll_cancel().is_not_ready());
14+
assert!(tx.poll_cancel().is_not_ready());
15+
drop(rx);
16+
assert!(tx.poll_cancel().is_ready());
17+
assert!(tx.poll_cancel().is_ready());
18+
})
19+
}
20+
21+
#[test]
22+
fn cancel_notifies() {
23+
let (tx, rx) = oneshot::<u32>();
24+
let (tx2, rx2) = channel();
25+
26+
WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget();
27+
drop(rx);
28+
rx2.recv().unwrap().unwrap();
29+
}
30+
31+
struct WaitForCancel {
32+
tx: Complete<u32>,
33+
}
34+
35+
impl Future for WaitForCancel {
36+
type Item = ();
37+
type Error = ();
38+
39+
fn poll(&mut self) -> Poll<(), ()> {
40+
self.tx.poll_cancel()
41+
}
42+
}
43+
44+
#[test]
45+
fn cancel_lots() {
46+
let (tx, rx) = channel::<(Complete<_>, Sender<_>)>();
47+
let t = thread::spawn(move || {
48+
for (tx, tx2) in rx {
49+
WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget();
50+
}
51+
52+
});
53+
54+
for _ in 0..20000 {
55+
let (otx, orx) = oneshot::<u32>();
56+
let (tx2, rx2) = channel();
57+
tx.send((otx, tx2)).unwrap();
58+
drop(orx);
59+
rx2.recv().unwrap().unwrap();
60+
}
61+
drop(tx);
62+
63+
t.join().unwrap();
64+
}

0 commit comments

Comments
 (0)