Skip to content

Commit fbe9a18

Browse files
authored
Merge pull request #5 from srijs/futures-0.2
convert to futures 0.2 (alpha)
2 parents 924b578 + 682e792 commit fbe9a18

File tree

9 files changed

+73
-58
lines changed

9 files changed

+73
-58
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ Timeouts and intervals for futures.
1212
"""
1313

1414
[dependencies]
15-
futures = "0.1.15"
15+
futures = "0.2.0-alpha"

src/delay.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::sync::atomic::Ordering::SeqCst;
1010
use std::time::{Duration, Instant};
1111

1212
use futures::{Future, Poll, Async};
13-
use futures::task::AtomicTask;
13+
use futures::task::{self, AtomicWaker};
1414

1515
use arc_list::Node;
1616
use {TimerHandle, ScheduledTimer};
@@ -57,7 +57,7 @@ impl Delay {
5757
let state = Arc::new(Node::new(ScheduledTimer {
5858
at: Mutex::new(Some(at)),
5959
state: AtomicUsize::new(0),
60-
task: AtomicTask::new(),
60+
waker: AtomicWaker::new(),
6161
inner: handle.inner,
6262
slot: Mutex::new(None),
6363
}));
@@ -69,7 +69,7 @@ impl Delay {
6969
return Delay { state: None, when: at }
7070
}
7171

72-
inner.task.notify();
72+
inner.waker.wake();
7373
Delay {
7474
state: Some(state),
7575
when: at,
@@ -126,7 +126,7 @@ impl Delay {
126126
// If we fail to push our node then we've become an inert timer, so
127127
// we'll want to clear our `state` field accordingly
128128
timeouts.list.push(state)?;
129-
timeouts.task.notify();
129+
timeouts.waker.wake();
130130
}
131131

132132
Ok(())
@@ -141,7 +141,7 @@ impl Future for Delay {
141141
type Item = ();
142142
type Error = io::Error;
143143

144-
fn poll(&mut self) -> Poll<(), io::Error> {
144+
fn poll(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
145145
let state = match self.state {
146146
Some(ref state) => state,
147147
None => return Err(io::Error::new(io::ErrorKind::Other,
@@ -151,7 +151,7 @@ impl Future for Delay {
151151
return Ok(Async::Ready(()))
152152
}
153153

154-
state.task.register();
154+
state.waker.register(cx.waker());
155155

156156
// Now that we've registered, do the full check of our own internal
157157
// state. If we've fired the first bit is set, and if we've been
@@ -160,7 +160,7 @@ impl Future for Delay {
160160
n if n & 0b01 != 0 => Ok(Async::Ready(())),
161161
n if n & 0b10 != 0 => Err(io::Error::new(io::ErrorKind::Other,
162162
"timer has gone away")),
163-
_ => Ok(Async::NotReady),
163+
_ => Ok(Async::Pending),
164164
}
165165
}
166166
}
@@ -174,7 +174,7 @@ impl Drop for Delay {
174174
if let Some(timeouts) = state.inner.upgrade() {
175175
*state.at.lock().unwrap() = None;
176176
if timeouts.list.push(state).is_ok() {
177-
timeouts.task.notify();
177+
timeouts.waker.wake();
178178
}
179179
}
180180
}

src/ext.rs

+12-11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub trait FutureExt: Future + Sized {
3030
///
3131
/// use std::time::Duration;
3232
/// use futures::prelude::*;
33+
/// use futures::executor::block_on;
3334
/// use futures_timer::FutureExt;
3435
///
3536
/// # fn long_future() -> futures::future::FutureResult<(), std::io::Error> {
@@ -40,7 +41,7 @@ pub trait FutureExt: Future + Sized {
4041
/// let future = long_future();
4142
/// let timed_out = future.timeout(Duration::from_secs(1));
4243
///
43-
/// match timed_out.wait() {
44+
/// match block_on(timed_out) {
4445
/// Ok(item) => println!("got {:?} within enough time!", item),
4546
/// Err(_) => println!("took too long to produce the item"),
4647
/// }
@@ -86,16 +87,16 @@ impl<F> Future for Timeout<F>
8687
type Item = F::Item;
8788
type Error = F::Error;
8889

89-
fn poll(&mut self) -> Poll<F::Item, F::Error> {
90-
match self.future.poll()? {
91-
Async::NotReady => {}
90+
fn poll(&mut self, cx: &mut task::Context) -> Poll<F::Item, F::Error> {
91+
match self.future.poll(cx)? {
92+
Async::Pending => {}
9293
other => return Ok(other)
9394
}
9495

95-
if self.timeout.poll()?.is_ready() {
96+
if self.timeout.poll(cx)?.is_ready() {
9697
Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into())
9798
} else {
98-
Ok(Async::NotReady)
99+
Ok(Async::Pending)
99100
}
100101
}
101102
}
@@ -142,20 +143,20 @@ impl<S> Stream for TimeoutStream<S>
142143
type Item = S::Item;
143144
type Error = S::Error;
144145

145-
fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
146-
match self.stream.poll() {
147-
Ok(Async::NotReady) => {}
146+
fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
147+
match self.stream.poll_next(cx) {
148+
Ok(Async::Pending) => {}
148149
other => {
149150
self.timeout.reset(self.dur);
150151
return other
151152
}
152153
}
153154

154-
if self.timeout.poll()?.is_ready() {
155+
if self.timeout.poll(cx)?.is_ready() {
155156
self.timeout.reset(self.dur);
156157
Err(io::Error::new(io::ErrorKind::TimedOut, "stream item timed out").into())
157158
} else {
158-
Ok(Async::NotReady)
159+
Ok(Async::Pending)
159160
}
160161
}
161162
}

src/global.rs

+21-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
44
use std::thread;
55
use std::time::Instant;
66

7-
use futures::executor::{spawn, Notify};
7+
use futures::prelude::*;
8+
use futures::executor::{SpawnError, Executor};
89

910
use {TimerHandle, Timer};
1011

@@ -50,15 +51,18 @@ impl Drop for HelperThread {
5051
}
5152
}
5253

53-
fn run(timer: Timer, done: Arc<AtomicBool>) {
54-
let mut timer = spawn(timer);
54+
fn run(mut timer: Timer, done: Arc<AtomicBool>) {
5555
let me = Arc::new(ThreadUnpark {
5656
thread: thread::current(),
5757
});
58+
let mut local_map = task::LocalMap::new();
59+
let waker = task::Waker::from(me);
60+
let mut exec = NonFunctionalExecutor;
61+
let mut cx = task::Context::new(&mut local_map, &waker, &mut exec);
5862
while !done.load(Ordering::SeqCst) {
59-
drop(timer.poll_future_notify(&me, 0));
60-
timer.get_mut().advance();
61-
match timer.get_mut().next_event() {
63+
drop(timer.poll(&mut cx));
64+
timer.advance();
65+
match timer.next_event() {
6266
// Ok, block for the specified time
6367
Some(when) => {
6468
let now = Instant::now();
@@ -79,8 +83,16 @@ struct ThreadUnpark {
7983
thread: thread::Thread,
8084
}
8185

82-
impl Notify for ThreadUnpark {
83-
fn notify(&self, _unpark_id: usize) {
84-
self.thread.unpark()
86+
impl task::Wake for ThreadUnpark {
87+
fn wake(arc_self: &Arc<Self>) {
88+
arc_self.thread.unpark()
89+
}
90+
}
91+
92+
struct NonFunctionalExecutor;
93+
94+
impl Executor for NonFunctionalExecutor {
95+
fn spawn(&mut self, _: Box<Future<Item = (), Error = Never> + 'static + Send>) -> Result<(), SpawnError> {
96+
Err(SpawnError::shutdown())
8597
}
8698
}

src/interval.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ impl Stream for Interval {
5858
type Item = ();
5959
type Error = io::Error;
6060

61-
fn poll(&mut self) -> Poll<Option<()>, io::Error> {
62-
if self.delay.poll()?.is_not_ready() {
63-
return Ok(Async::NotReady)
61+
fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<()>, io::Error> {
62+
if self.delay.poll(cx)?.is_pending() {
63+
return Ok(Async::Pending)
6464
}
6565
let next = next_interval(delay::fires_at(&self.delay),
6666
Instant::now(),

src/lib.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
7575
use std::sync::{Arc, Weak, Mutex};
7676
use std::time::Instant;
7777

78-
use futures::task::AtomicTask;
79-
use futures::{Future, Async, Poll};
78+
use futures::task::AtomicWaker;
79+
use futures::{Future, Async, Poll, task};
8080

8181
use arc_list::{ArcList, Node};
8282
use heap::{Heap, Slot};
@@ -129,12 +129,12 @@ struct Inner {
129129
list: ArcList<ScheduledTimer>,
130130

131131
/// The blocked `Timer` task to receive notifications to the `list` above.
132-
task: AtomicTask,
132+
waker: AtomicWaker,
133133
}
134134

135135
/// Shared state between the `Timer` and a `Delay`.
136136
struct ScheduledTimer {
137-
task: AtomicTask,
137+
waker: AtomicWaker,
138138

139139
// The lowest bit here is whether the timer has fired or not, the second
140140
// lowest bit is whether the timer has been invalidated, and all the other
@@ -164,7 +164,7 @@ impl Timer {
164164
Timer {
165165
inner: Arc::new(Inner {
166166
list: ArcList::new(),
167-
task: AtomicTask::new(),
167+
waker: AtomicWaker::new(),
168168
}),
169169
timer_heap: Heap::new(),
170170
}
@@ -210,7 +210,7 @@ impl Timer {
210210
*heap_timer.node.slot.lock().unwrap() = None;
211211
let bits = heap_timer.gen << 2;
212212
match heap_timer.node.state.compare_exchange(bits, bits | 0b01, SeqCst, SeqCst) {
213-
Ok(_) => heap_timer.node.task.notify(),
213+
Ok(_) => heap_timer.node.waker.wake(),
214214
Err(_b) => {}
215215
}
216216
}
@@ -249,16 +249,16 @@ impl Timer {
249249

250250
fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) {
251251
node.state.fetch_or(0b10, SeqCst);
252-
node.task.notify();
252+
node.waker.wake();
253253
}
254254
}
255255

256256
impl Future for Timer {
257257
type Item = ();
258258
type Error = ();
259259

260-
fn poll(&mut self) -> Poll<(), ()> {
261-
self.inner.task.register();
260+
fn poll(&mut self, cx: &mut task::Context) -> Poll<(), ()> {
261+
self.inner.waker.register(cx.waker());
262262
let mut list = self.inner.list.take();
263263
while let Some(node) = list.pop() {
264264
let at = *node.at.lock().unwrap();
@@ -267,7 +267,7 @@ impl Future for Timer {
267267
None => self.remove(node),
268268
}
269269
}
270-
Ok(Async::NotReady)
270+
Ok(Async::Pending)
271271
}
272272
}
273273

tests/interval.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ extern crate futures_timer;
44
use std::time::{Instant, Duration};
55

66
use futures::prelude::*;
7+
use futures::executor::block_on;
78
use futures_timer::Interval;
89

910
#[test]
1011
fn single() {
1112
let dur = Duration::from_millis(10);
1213
let start = Instant::now();
1314
let interval = Interval::new(dur);
14-
interval.take(1).collect().wait().unwrap();
15+
block_on(interval.take(1).collect()).unwrap();
1516
assert!(start.elapsed() >= dur);
1617
}
1718

@@ -20,7 +21,7 @@ fn two_times() {
2021
let dur = Duration::from_millis(10);
2122
let start = Instant::now();
2223
let interval = Interval::new(dur);
23-
let result = interval.take(2).collect().wait().unwrap();
24+
let result = block_on(interval.take(2).collect()).unwrap();
2425
assert!(start.elapsed() >= dur*2);
2526
assert_eq!(result, vec![(), ()]);
2627
}

tests/smoke.rs

+11-10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::time::{Instant, Duration};
55

66
use futures::future;
77
use futures::prelude::*;
8+
use futures::executor::block_on;
89
use futures_timer::{Timer, Delay};
910

1011
fn far_future() -> Instant {
@@ -16,7 +17,7 @@ fn works() {
1617
let i = Instant::now();
1718
let dur = Duration::from_millis(100);
1819
let d = Delay::new(dur);
19-
d.wait().unwrap();
20+
block_on(d).unwrap();
2021
assert!(i.elapsed() > dur);
2122
}
2223

@@ -25,7 +26,7 @@ fn error_after_inert() {
2526
let t = Timer::new();
2627
let handle = t.handle();
2728
drop(t);
28-
assert!(Delay::new_handle(far_future(), handle).poll().is_err());
29+
assert!(block_on(Delay::new_handle(far_future(), handle)).is_err());
2930
}
3031

3132
#[test]
@@ -34,20 +35,20 @@ fn drop_makes_inert() {
3435
let handle = t.handle();
3536
let timeout = Delay::new_handle(far_future(), handle);
3637
drop(t);
37-
assert!(timeout.wait().is_err());
38+
assert!(block_on(timeout).is_err());
3839
}
3940

4041
#[test]
4142
fn reset() {
4243
let i = Instant::now();
4344
let dur = Duration::from_millis(100);
4445
let mut d = Delay::new(dur);
45-
future::poll_fn(|| d.poll()).wait().unwrap();
46+
block_on(future::poll_fn(|cx| d.poll(cx))).unwrap();
4647
assert!(i.elapsed() > dur);
4748

4849
let i = Instant::now();
4950
d.reset(dur);
50-
future::poll_fn(|| d.poll()).wait().unwrap();
51+
block_on(future::poll_fn(|cx| d.poll(cx))).unwrap();
5152
assert!(i.elapsed() > dur);
5253
}
5354

@@ -57,12 +58,12 @@ fn drop_timer_wakes() {
5758
let handle = t.handle();
5859
let mut timeout = Delay::new_handle(far_future(), handle);
5960
let mut t = Some(t);
60-
assert!(future::poll_fn(|| {
61-
match timeout.poll() {
62-
Ok(Async::NotReady) => {}
61+
assert!(block_on(future::poll_fn(|cx| {
62+
match timeout.poll(cx) {
63+
Ok(Async::Pending) => {}
6364
other => return other,
6465
}
6566
drop(t.take());
66-
Ok(Async::NotReady)
67-
}).wait().is_err());
67+
Ok(Async::Pending)
68+
})).is_err());
6869
}

0 commit comments

Comments
 (0)