Skip to content

Commit e938527

Browse files
yoshuawuytsStjepan Glavina
authored and
Stjepan Glavina
committed
add stream::interval (#298)
* add stream::interval Signed-off-by: Yoshua Wuyts <[email protected]> * fix tests Signed-off-by: Yoshua Wuyts <[email protected]> * cargo fmt Signed-off-by: Yoshua Wuyts <[email protected]> * cross-docs Signed-off-by: Yoshua Wuyts <[email protected]> * update deps Signed-off-by: Yoshua Wuyts <[email protected]>
1 parent 35fc85a commit e938527

File tree

5 files changed

+217
-13
lines changed

5 files changed

+217
-13
lines changed

Diff for: Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ crossbeam-channel = "0.3.9"
3232
crossbeam-deque = "0.7.1"
3333
futures-core-preview = "=0.3.0-alpha.19"
3434
futures-io-preview = "=0.3.0-alpha.19"
35-
futures-timer = "0.4.0"
35+
futures-timer = "1.0.2"
3636
lazy_static = "1.4.0"
3737
log = { version = "0.4.8", features = ["kv_unstable"] }
3838
memchr = "2.2.1"

Diff for: src/io/timeout.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use std::pin::Pin;
22
use std::task::{Context, Poll};
33
use std::time::Duration;
44

5-
use futures_core::future::TryFuture;
65
use futures_timer::Delay;
6+
use pin_utils::unsafe_pinned;
77

88
use crate::future::Future;
99
use crate::io;
@@ -36,39 +36,39 @@ pub async fn timeout<F, T>(dur: Duration, f: F) -> io::Result<T>
3636
where
3737
F: Future<Output = io::Result<T>>,
3838
{
39-
let f = TimeoutFuture {
39+
Timeout {
4040
timeout: Delay::new(dur),
4141
future: f,
42-
};
43-
f.await
42+
}
43+
.await
4444
}
4545

46-
// Future returned by the [`io::timeout`](./fn.timeout.html) function.
46+
/// Future returned by the `FutureExt::timeout` method.
4747
#[derive(Debug)]
48-
pub struct TimeoutFuture<F, T>
48+
pub struct Timeout<F, T>
4949
where
5050
F: Future<Output = io::Result<T>>,
5151
{
5252
future: F,
5353
timeout: Delay,
5454
}
5555

56-
impl<F, T> TimeoutFuture<F, T>
56+
impl<F, T> Timeout<F, T>
5757
where
5858
F: Future<Output = io::Result<T>>,
5959
{
60-
pin_utils::unsafe_pinned!(future: F);
61-
pin_utils::unsafe_pinned!(timeout: Delay);
60+
unsafe_pinned!(future: F);
61+
unsafe_pinned!(timeout: Delay);
6262
}
6363

64-
impl<F, T> Future for TimeoutFuture<F, T>
64+
impl<F, T> Future for Timeout<F, T>
6565
where
6666
F: Future<Output = io::Result<T>>,
6767
{
6868
type Output = io::Result<T>;
6969

7070
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71-
match self.as_mut().future().try_poll(cx) {
71+
match self.as_mut().future().poll(cx) {
7272
Poll::Pending => {}
7373
other => return other,
7474
}

Diff for: src/stream/interval.rs

+198
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
use std::pin::Pin;
2+
use std::task::{Context, Poll};
3+
use std::time::{Duration, Instant};
4+
5+
use futures_core::future::Future;
6+
use futures_core::stream::Stream;
7+
use pin_utils::unsafe_pinned;
8+
9+
use futures_timer::Delay;
10+
11+
/// Creates a new stream that yields at a set interval.
12+
///
13+
/// The stream first yields after `dur`, and continues to yield every
14+
/// `dur` after that. The stream accounts for time elapsed between calls, and
15+
/// will adjust accordingly to prevent time skews.
16+
///
17+
/// Each interval may be slightly longer than the specified duration, but never
18+
/// less.
19+
///
20+
/// Note that intervals are not intended for high resolution timers, but rather
21+
/// they will likely fire some granularity after the exact instant that they're
22+
/// otherwise indicated to fire at.
23+
///
24+
/// See also: [`task::sleep`].
25+
///
26+
/// [`task::sleep`]: ../task/fn.sleep.html
27+
///
28+
/// # Examples
29+
///
30+
/// Basic example:
31+
///
32+
/// ```no_run
33+
/// use async_std::prelude::*;
34+
/// use async_std::stream;
35+
/// use std::time::Duration;
36+
///
37+
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
38+
/// #
39+
/// let mut interval = stream::interval(Duration::from_secs(4));
40+
/// while let Some(_) = interval.next().await {
41+
/// println!("prints every four seconds");
42+
/// }
43+
/// #
44+
/// # Ok(()) }) }
45+
/// ```
46+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
47+
#[doc(inline)]
48+
pub fn interval(dur: Duration) -> Interval {
49+
Interval {
50+
delay: Delay::new(dur),
51+
interval: dur,
52+
}
53+
}
54+
55+
/// A stream representing notifications at fixed interval
56+
///
57+
#[derive(Debug)]
58+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
59+
#[doc(inline)]
60+
pub struct Interval {
61+
delay: Delay,
62+
interval: Duration,
63+
}
64+
65+
impl Interval {
66+
unsafe_pinned!(delay: Delay);
67+
}
68+
69+
impl Stream for Interval {
70+
type Item = ();
71+
72+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
73+
if Pin::new(&mut *self).delay().poll(cx).is_pending() {
74+
return Poll::Pending;
75+
}
76+
let when = Instant::now();
77+
let next = next_interval(when, Instant::now(), self.interval);
78+
self.delay.reset(next);
79+
Poll::Ready(Some(()))
80+
}
81+
}
82+
83+
/// Converts Duration object to raw nanoseconds if possible
84+
///
85+
/// This is useful to divide intervals.
86+
///
87+
/// While technically for large duration it's impossible to represent any
88+
/// duration as nanoseconds, the largest duration we can represent is about
89+
/// 427_000 years. Large enough for any interval we would use or calculate in
90+
/// tokio.
91+
fn duration_to_nanos(dur: Duration) -> Option<u64> {
92+
dur.as_secs()
93+
.checked_mul(1_000_000_000)
94+
.and_then(|v| v.checked_add(u64::from(dur.subsec_nanos())))
95+
}
96+
97+
fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant {
98+
let new = prev + interval;
99+
if new > now {
100+
return new;
101+
}
102+
103+
let spent_ns = duration_to_nanos(now.duration_since(prev)).expect("interval should be expired");
104+
let interval_ns =
105+
duration_to_nanos(interval).expect("interval is less that 427 thousand years");
106+
let mult = spent_ns / interval_ns + 1;
107+
assert!(
108+
mult < (1 << 32),
109+
"can't skip more than 4 billion intervals of {:?} \
110+
(trying to skip {})",
111+
interval,
112+
mult
113+
);
114+
prev + interval * (mult as u32)
115+
}
116+
117+
#[cfg(test)]
118+
mod test {
119+
use super::next_interval;
120+
use std::time::{Duration, Instant};
121+
122+
struct Timeline(Instant);
123+
124+
impl Timeline {
125+
fn new() -> Timeline {
126+
Timeline(Instant::now())
127+
}
128+
fn at(&self, millis: u64) -> Instant {
129+
self.0 + Duration::from_millis(millis)
130+
}
131+
fn at_ns(&self, sec: u64, nanos: u32) -> Instant {
132+
self.0 + Duration::new(sec, nanos)
133+
}
134+
}
135+
136+
fn dur(millis: u64) -> Duration {
137+
Duration::from_millis(millis)
138+
}
139+
140+
// The math around Instant/Duration isn't 100% precise due to rounding
141+
// errors, see #249 for more info
142+
fn almost_eq(a: Instant, b: Instant) -> bool {
143+
if a == b {
144+
true
145+
} else if a > b {
146+
a - b < Duration::from_millis(1)
147+
} else {
148+
b - a < Duration::from_millis(1)
149+
}
150+
}
151+
152+
#[test]
153+
fn norm_next() {
154+
let tm = Timeline::new();
155+
assert!(almost_eq(
156+
next_interval(tm.at(1), tm.at(2), dur(10)),
157+
tm.at(11)
158+
));
159+
assert!(almost_eq(
160+
next_interval(tm.at(7777), tm.at(7788), dur(100)),
161+
tm.at(7877)
162+
));
163+
assert!(almost_eq(
164+
next_interval(tm.at(1), tm.at(1000), dur(2100)),
165+
tm.at(2101)
166+
));
167+
}
168+
169+
#[test]
170+
fn fast_forward() {
171+
let tm = Timeline::new();
172+
assert!(almost_eq(
173+
next_interval(tm.at(1), tm.at(1000), dur(10)),
174+
tm.at(1001)
175+
));
176+
assert!(almost_eq(
177+
next_interval(tm.at(7777), tm.at(8888), dur(100)),
178+
tm.at(8977)
179+
));
180+
assert!(almost_eq(
181+
next_interval(tm.at(1), tm.at(10000), dur(2100)),
182+
tm.at(10501)
183+
));
184+
}
185+
186+
/// TODO: this test actually should be successful, but since we can't
187+
/// multiply Duration on anything larger than u32 easily we decided
188+
/// to allow it to fail for now
189+
#[test]
190+
#[should_panic(expected = "can't skip more than 4 billion intervals")]
191+
fn large_skip() {
192+
let tm = Timeline::new();
193+
assert_eq!(
194+
next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)),
195+
tm.at_ns(25, 1)
196+
);
197+
}
198+
}

Diff for: src/stream/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ cfg_if! {
4444
mod extend;
4545
mod from_stream;
4646
mod into_stream;
47+
mod interval;
4748

4849
pub use double_ended_stream::DoubleEndedStream;
4950
pub use exact_size_stream::ExactSizeStream;
5051
pub use extend::Extend;
5152
pub use from_stream::FromStream;
52-
pub use into_stream::IntoStream;
5353
pub use fused_stream::FusedStream;
54+
pub use into_stream::IntoStream;
55+
pub use interval::{interval, Interval};
5456

5557
pub use stream::Merge;
5658
}

Diff for: src/task/sleep.rs

+4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ use crate::io;
1111
///
1212
/// [`std::thread::sleep`]: https://doc.rust-lang.org/std/thread/fn.sleep.html
1313
///
14+
/// See also: [`stream::interval`].
15+
///
16+
/// [`stream::interval`]: ../stream/fn.interval.html
17+
///
1418
/// # Examples
1519
///
1620
/// ```

0 commit comments

Comments
 (0)