Skip to content

Commit 997cda8

Browse files
committed
add ParallelStream::filter async-rs#2
Signed-off-by: Benjamin Coenen <[email protected]>
1 parent 8182694 commit 997cda8

File tree

2 files changed

+93
-0
lines changed

2 files changed

+93
-0
lines changed

src/par_stream/filter.rs

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// use async_std::prelude::*;
2+
use async_std::future::Future;
3+
use async_std::sync::{self, Receiver};
4+
use async_std::task;
5+
6+
use std::pin::Pin;
7+
use std::task::{Context, Poll};
8+
9+
use crate::ParallelStream;
10+
11+
pin_project_lite::pin_project! {
12+
/// A parallel stream that filters value of another stream with a function.
13+
#[derive(Debug)]
14+
pub struct Filter<S> where S: ParallelStream {
15+
#[pin]
16+
receiver: Receiver<S::Item>,
17+
limit: Option<usize>,
18+
}
19+
}
20+
21+
impl<S> Filter<S>
22+
where
23+
S: ParallelStream,
24+
{
25+
/// Create a new instance of `Filter`.
26+
pub fn new<F, Fut>(mut stream: S, mut f: F) -> Self
27+
where
28+
S: ParallelStream,
29+
F: FnMut(&S::Item) -> Fut + Send + Sync + Copy + 'static,
30+
Fut: Future<Output = bool> + Send,
31+
S::Item: Sync,
32+
{
33+
let (sender, receiver) = sync::channel(1);
34+
let limit = stream.get_limit();
35+
task::spawn(async move {
36+
while let Some(item) = stream.next().await {
37+
let sender = sender.clone();
38+
task::spawn(async move {
39+
let res = f(&item).await;
40+
if res {
41+
sender.send(item).await;
42+
}
43+
});
44+
}
45+
});
46+
Filter { receiver, limit }
47+
}
48+
}
49+
50+
impl<S> ParallelStream for Filter<S>
51+
where
52+
S: ParallelStream,
53+
{
54+
type Item = S::Item;
55+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
56+
use async_std::prelude::*;
57+
let this = self.project();
58+
this.receiver.poll_next(cx)
59+
}
60+
61+
fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
62+
self.limit = limit.into();
63+
self
64+
}
65+
66+
fn get_limit(&self) -> Option<usize> {
67+
self.limit
68+
}
69+
}
70+
71+
#[async_std::test]
72+
async fn smoke() {
73+
let s = async_std::stream::from_iter(vec![2, 1, 2, 3, 2]);
74+
let mut output: Vec<usize> = vec![];
75+
let mut stream = crate::from_stream(s).filter(|&n| async move { n % 2 == 0 });
76+
while let Some(n) = stream.next().await {
77+
output.push(n);
78+
}
79+
assert_eq!(output, vec![2usize; 3]);
80+
}

src/par_stream/mod.rs

+13
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ use std::pin::Pin;
55

66
use crate::FromParallelStream;
77

8+
pub use filter::Filter;
89
pub use for_each::ForEach;
910
pub use map::Map;
1011
pub use next::NextFuture;
1112
pub use take::Take;
1213

14+
mod filter;
1315
mod for_each;
1416
mod map;
1517
mod next;
@@ -40,6 +42,17 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {
4042
Map::new(self, f)
4143
}
4244

45+
/// Applies `f` to each item of this stream in parallel, producing a new
46+
/// stream with items filtered.
47+
fn filter<F, Fut>(self, f: F) -> Filter<Self>
48+
where
49+
F: FnMut(&Self::Item) -> Fut + Send + Sync + Copy + 'static,
50+
Fut: Future<Output = bool> + Send,
51+
Self::Item: Sync,
52+
{
53+
Filter::new(self, f)
54+
}
55+
4356
/// Applies `f` to each item of this stream in parallel, producing a new
4457
/// stream with the results.
4558
fn next(&mut self) -> NextFuture<'_, Self> {

0 commit comments

Comments
 (0)