Skip to content

Commit b942d0a

Browse files
committed
add stream-min
1 parent da795de commit b942d0a

File tree

2 files changed

+97
-0
lines changed

2 files changed

+97
-0
lines changed

src/stream/stream/min.rs

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use std::marker::PhantomData;
2+
use std::cmp::{Ordering, Ord};
3+
use std::pin::Pin;
4+
5+
use pin_project_lite::pin_project;
6+
7+
use crate::future::Future;
8+
use crate::stream::Stream;
9+
use crate::task::{Context, Poll};
10+
11+
pin_project! {
12+
#[doc(hidden)]
13+
#[allow(missing_debug_implementations)]
14+
pub struct MinFuture<S, F, T> {
15+
#[pin]
16+
stream: S,
17+
_compare: PhantomData<F>,
18+
min: Option<T>,
19+
}
20+
}
21+
22+
impl<S, F, T> MinFuture<S, F, T> {
23+
pub(super) fn new(stream: S) -> Self {
24+
Self {
25+
stream,
26+
_compare: PhantomData,
27+
min: None,
28+
}
29+
}
30+
}
31+
32+
impl<S, F> Future for MinFuture<S, F, S::Item>
33+
where
34+
S: Stream,
35+
S::Item: Ord,
36+
F: FnMut(&S::Item, &S::Item) -> Ordering,
37+
{
38+
type Output = Option<S::Item>;
39+
40+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41+
let this = self.project();
42+
let next = futures_core::ready!(this.stream.poll_next(cx));
43+
44+
match next {
45+
Some(new) => {
46+
cx.waker().wake_by_ref();
47+
match this.min.take() {
48+
None => *this.min = Some(new),
49+
50+
Some(old) => match new.cmp(&old) {
51+
Ordering::Less => *this.min = Some(new),
52+
_ => *this.min = Some(old),
53+
},
54+
}
55+
Poll::Pending
56+
}
57+
None => Poll::Ready(this.min.take()),
58+
}
59+
}
60+
}

src/stream/stream/mod.rs

+37
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ mod le;
4141
mod lt;
4242
mod map;
4343
mod max_by;
44+
mod min;
4445
mod min_by;
4546
mod min_by_key;
4647
mod next;
@@ -71,6 +72,7 @@ use last::LastFuture;
7172
use le::LeFuture;
7273
use lt::LtFuture;
7374
use max_by::MaxByFuture;
75+
use min::MinFuture;
7476
use min_by::MinByFuture;
7577
use min_by_key::MinByKeyFuture;
7678
use next::NextFuture;
@@ -753,6 +755,41 @@ extension_trait! {
753755
self,
754756
compare: F,
755757
) -> impl Future<Output = Option<Self::Item>> [MinByFuture<Self, F, Self::Item>]
758+
where
759+
Self: Sized,
760+
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
761+
{
762+
MinByFuture::new(self, compare)
763+
}
764+
765+
#[doc = r#"
766+
Returns the element that gives the minimum value. If several elements are equally minimum,
767+
the first element is returned. If the stream is empty, `None` is returned.
768+
769+
# Examples
770+
771+
```
772+
# fn main() { async_std::task::block_on(async {
773+
#
774+
use std::collections::VecDeque;
775+
776+
use async_std::prelude::*;
777+
778+
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
779+
780+
let min = s.clone().min().await;
781+
assert_eq!(min, Some(1));
782+
783+
let min = VecDeque::<usize>::new().min().await;
784+
assert_eq!(min, None);
785+
#
786+
# }) }
787+
```
788+
"#]
789+
fn min_by<F>(
790+
self,
791+
compare: F,
792+
) -> impl Future<Output = Option<Self::Item>> [MinByFuture<Self, F, Self::Item>]
756793
where
757794
Self: Sized,
758795
F: FnMut(&Self::Item, &Self::Item) -> Ordering,

0 commit comments

Comments
 (0)