Skip to content

Commit 6674dc0

Browse files
authored
Merge pull request #739 from devashishdxt/futures-timer-update
Update futures-timer to 3.0.2
2 parents 2dbebe5 + 68fa054 commit 6674dc0

File tree

3 files changed

+6
-123
lines changed

3 files changed

+6
-123
lines changed

Diff for: Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ crossbeam-deque = { version = "0.7.3", optional = true }
6161
crossbeam-utils = { version = "0.7.2", optional = true }
6262
futures-core = { version = "0.3.4", optional = true, default-features = false }
6363
futures-io = { version = "0.3.4", optional = true }
64-
futures-timer = { version = "2.0.2", optional = true }
64+
futures-timer = { version = "3.0.2", optional = true }
6565
kv-log-macro = { version = "1.0.4", optional = true }
6666
log = { version = "0.4.8", features = ["kv_unstable"], optional = true }
6767
memchr = { version = "2.3.3", optional = true }

Diff for: src/stream/interval.rs

+3-120
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::pin::Pin;
22
use std::task::{Context, Poll};
3-
use std::time::{Duration, Instant};
3+
use std::time::Duration;
44

55
use crate::future::Future;
66
use crate::stream::Stream;
@@ -71,125 +71,8 @@ impl Stream for Interval {
7171
if Pin::new(&mut self.delay).poll(cx).is_pending() {
7272
return Poll::Pending;
7373
}
74-
let when = Instant::now();
75-
let next = next_interval(when, Instant::now(), self.interval);
76-
self.delay.reset(next);
74+
let interval = self.interval;
75+
self.delay.reset(interval);
7776
Poll::Ready(Some(()))
7877
}
7978
}
80-
81-
/// Converts Duration object to raw nanoseconds if possible
82-
///
83-
/// This is useful to divide intervals.
84-
///
85-
/// While technically for large duration it's impossible to represent any
86-
/// duration as nanoseconds, the largest duration we can represent is about
87-
/// 427_000 years. Large enough for any interval we would use or calculate in
88-
/// async-std.
89-
fn duration_to_nanos(dur: Duration) -> Option<u64> {
90-
dur.as_secs()
91-
.checked_mul(1_000_000_000)
92-
.and_then(|v| v.checked_add(u64::from(dur.subsec_nanos())))
93-
}
94-
95-
fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant {
96-
let new = prev + interval;
97-
if new > now {
98-
return new;
99-
}
100-
101-
let spent_ns = duration_to_nanos(now.duration_since(prev)).expect("interval should be expired");
102-
let interval_ns =
103-
duration_to_nanos(interval).expect("interval is less that 427 thousand years");
104-
let mult = spent_ns / interval_ns + 1;
105-
assert!(
106-
mult < (1 << 32),
107-
"can't skip more than 4 billion intervals of {:?} \
108-
(trying to skip {})",
109-
interval,
110-
mult
111-
);
112-
prev + interval * (mult as u32)
113-
}
114-
115-
#[cfg(test)]
116-
mod test {
117-
use super::next_interval;
118-
use std::cmp::Ordering;
119-
use std::time::{Duration, Instant};
120-
121-
struct Timeline(Instant);
122-
123-
impl Timeline {
124-
fn new() -> Timeline {
125-
Timeline(Instant::now())
126-
}
127-
fn at(&self, millis: u64) -> Instant {
128-
self.0 + Duration::from_millis(millis)
129-
}
130-
fn at_ns(&self, sec: u64, nanos: u32) -> Instant {
131-
self.0 + Duration::new(sec, nanos)
132-
}
133-
}
134-
135-
fn dur(millis: u64) -> Duration {
136-
Duration::from_millis(millis)
137-
}
138-
139-
// The math around Instant/Duration isn't 100% precise due to rounding
140-
// errors, see #249 for more info
141-
fn almost_eq(a: Instant, b: Instant) -> bool {
142-
match a.cmp(&b) {
143-
Ordering::Equal => true,
144-
Ordering::Greater => a - b < Duration::from_millis(1),
145-
Ordering::Less => b - a < Duration::from_millis(1),
146-
}
147-
}
148-
149-
#[test]
150-
fn norm_next() {
151-
let tm = Timeline::new();
152-
assert!(almost_eq(
153-
next_interval(tm.at(1), tm.at(2), dur(10)),
154-
tm.at(11)
155-
));
156-
assert!(almost_eq(
157-
next_interval(tm.at(7777), tm.at(7788), dur(100)),
158-
tm.at(7877)
159-
));
160-
assert!(almost_eq(
161-
next_interval(tm.at(1), tm.at(1000), dur(2100)),
162-
tm.at(2101)
163-
));
164-
}
165-
166-
#[test]
167-
fn fast_forward() {
168-
let tm = Timeline::new();
169-
assert!(almost_eq(
170-
next_interval(tm.at(1), tm.at(1000), dur(10)),
171-
tm.at(1001)
172-
));
173-
assert!(almost_eq(
174-
next_interval(tm.at(7777), tm.at(8888), dur(100)),
175-
tm.at(8977)
176-
));
177-
assert!(almost_eq(
178-
next_interval(tm.at(1), tm.at(10000), dur(2100)),
179-
tm.at(10501)
180-
));
181-
}
182-
183-
/// TODO: this test actually should be successful, but since we can't
184-
/// multiply Duration on anything larger than u32 easily we decided
185-
/// to allow it to fail for now
186-
#[test]
187-
#[should_panic(expected = "can't skip more than 4 billion intervals")]
188-
fn large_skip() {
189-
let tm = Timeline::new();
190-
assert_eq!(
191-
next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)),
192-
tm.at_ns(25, 1)
193-
);
194-
}
195-
}

Diff for: src/stream/stream/throttle.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::future::Future;
22
use std::pin::Pin;
3-
use std::time::{Duration, Instant};
3+
use std::time::Duration;
44

55
use futures_timer::Delay;
66
use pin_project_lite::pin_project;
@@ -59,7 +59,7 @@ impl<S: Stream> Stream for Throttle<S> {
5959
Poll::Ready(None) => Poll::Ready(None),
6060
Poll::Ready(Some(v)) => {
6161
*this.blocked = true;
62-
this.delay.reset(Instant::now() + *this.duration);
62+
this.delay.reset(*this.duration);
6363
Poll::Ready(Some(v))
6464
}
6565
}

0 commit comments

Comments
 (0)