Skip to content

Commit 55bdea4

Browse files
committed
adds stream::filter_map combinator
1 parent ba43a05 commit 55bdea4

File tree

2 files changed

+89
-0
lines changed

2 files changed

+89
-0
lines changed

src/stream/stream/filter_map.rs

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
5+
/// A stream that both filters and maps.
6+
#[derive(Clone, Debug)]
7+
pub struct FilterMap<S, F, T, B> {
8+
stream: S,
9+
f: F,
10+
__from: PhantomData<T>,
11+
__to: PhantomData<B>,
12+
}
13+
14+
impl<S, F, T, B> FilterMap<S, F, T, B> {
15+
pin_utils::unsafe_pinned!(stream: S);
16+
pin_utils::unsafe_unpinned!(f: F);
17+
18+
pub(crate) fn new(stream: S, f: F) -> Self {
19+
FilterMap {
20+
stream,
21+
f,
22+
__from: PhantomData,
23+
__to: PhantomData,
24+
}
25+
}
26+
}
27+
28+
impl<S, F, B> futures_core::stream::Stream for FilterMap<S, F, S::Item, B>
29+
where
30+
S: futures_core::stream::Stream,
31+
F: FnMut(S::Item) -> Option<B>,
32+
{
33+
type Item = B;
34+
35+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
37+
match next {
38+
Some(v) => match (self.as_mut().f())(v) {
39+
Some(b) => Poll::Ready(Some(b)),
40+
None => {
41+
cx.waker().wake_by_ref();
42+
Poll::Pending
43+
}
44+
},
45+
None => Poll::Ready(None),
46+
}
47+
}
48+
}

src/stream/stream/mod.rs

+41
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
2424
mod all;
2525
mod any;
26+
mod filter_map;
2627
mod min_by;
2728
mod next;
2829
mod take;
@@ -31,6 +32,7 @@ pub use take::Take;
3132

3233
use all::AllFuture;
3334
use any::AnyFuture;
35+
use filter_map::FilterMap;
3436
use min_by::MinByFuture;
3537
use next::NextFuture;
3638

@@ -128,6 +130,45 @@ pub trait Stream {
128130
}
129131
}
130132

133+
/// Both filters and maps a stream.
134+
///
135+
/// # Examples
136+
///
137+
/// Basic usage:
138+
///
139+
/// ```
140+
///
141+
/// # fn main() { async_std::task::block_on(async {
142+
/// #
143+
/// use std::collections::VecDeque;
144+
/// use async_std::stream::Stream;
145+
///
146+
/// let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect();
147+
///
148+
/// let mut parsed = s.filter_map(|a| a.parse::<u32>().ok());
149+
///
150+
/// let one = parsed.next().await;
151+
/// assert_eq!(one, Some(1));
152+
///
153+
/// let three = parsed.next().await;
154+
/// assert_eq!(three, Some(3));
155+
///
156+
/// let five = parsed.next().await;
157+
/// assert_eq!(five, Some(5));
158+
///
159+
/// let end = parsed.next().await;
160+
/// assert_eq!(end, None);
161+
///
162+
/// #
163+
/// # }) }
164+
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F, Self::Item, B>
165+
where
166+
Self: Sized,
167+
F: FnMut(Self::Item) -> Option<B>,
168+
{
169+
FilterMap::new(self, f)
170+
}
171+
131172
/// Returns the element that gives the minimum value with respect to the
132173
/// specified comparison function. If several elements are equally minimum,
133174
/// the first element is returned. If the stream is empty, `None` is returned.

0 commit comments

Comments
 (0)