Skip to content

Commit 4af796b

Browse files
committed
add ParallelStream::find_map async-rs#2
Signed-off-by: Benjamin Coenen <[email protected]>
1 parent 8182694 commit 4af796b

File tree

2 files changed

+113
-0
lines changed

2 files changed

+113
-0
lines changed

src/par_stream/find_map.rs

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// use async_std::prelude::*;
2+
use async_std::future::Future;
3+
use async_std::sync::{self, Receiver};
4+
use async_std::task;
5+
6+
use std::pin::Pin;
7+
use std::task::{Context, Poll};
8+
9+
use crate::ParallelStream;
10+
11+
pin_project_lite::pin_project! {
12+
/// A parallel stream that FindMaps value of another stream with a function.
13+
#[derive(Debug)]
14+
pub struct FindMap<T> {
15+
#[pin]
16+
receiver: Receiver<T>,
17+
limit: Option<usize>,
18+
already_sent: bool,
19+
}
20+
}
21+
22+
impl<T> FindMap<T>
23+
where
24+
T: Send + 'static,
25+
{
26+
/// Create a new instance of `FindMap`.
27+
pub fn new<S, F, Fut>(mut stream: S, mut f: F) -> Self
28+
where
29+
S: ParallelStream,
30+
F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static,
31+
Fut: Future<Output = Option<T>> + Send,
32+
{
33+
let (sender, receiver) = sync::channel(1);
34+
let limit = stream.get_limit();
35+
task::spawn(async move {
36+
while let Some(item) = stream.next().await {
37+
let sender = sender.clone();
38+
task::spawn(async move {
39+
let res = f(item).await;
40+
if let Some(res) = res {
41+
sender.send(res).await;
42+
}
43+
});
44+
}
45+
});
46+
FindMap {
47+
receiver,
48+
limit,
49+
already_sent: false,
50+
}
51+
}
52+
}
53+
54+
impl<T> ParallelStream for FindMap<T>
55+
where
56+
T: Send + 'static,
57+
{
58+
type Item = T;
59+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60+
use async_std::prelude::*;
61+
let this = self.project();
62+
if *this.already_sent {
63+
return Poll::Ready(None);
64+
}
65+
if let Poll::Ready(elt) = this.receiver.poll_next(cx) {
66+
if let Some(elt) = elt {
67+
*this.already_sent = true;
68+
return Poll::Ready(Some(elt));
69+
}
70+
return Poll::Ready(None);
71+
}
72+
Poll::Pending
73+
}
74+
75+
fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
76+
self.limit = limit.into();
77+
self
78+
}
79+
80+
fn get_limit(&self) -> Option<usize> {
81+
self.limit
82+
}
83+
}
84+
85+
#[async_std::test]
86+
async fn smoke() {
87+
let s = async_std::stream::from_iter(vec![1, 1, 2, 3, 2]);
88+
let mut output: Vec<usize> = vec![];
89+
let mut stream = crate::from_stream(s).find_map(|n| async move {
90+
if n % 2 == 0 {
91+
Some(42usize)
92+
} else {
93+
None
94+
}
95+
});
96+
while let Some(n) = stream.next().await {
97+
output.push(n);
98+
}
99+
assert_eq!(output, vec![42]);
100+
}

src/par_stream/mod.rs

+13
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ use std::pin::Pin;
55

66
use crate::FromParallelStream;
77

8+
pub use find_map::FindMap;
89
pub use for_each::ForEach;
910
pub use map::Map;
1011
pub use next::NextFuture;
1112
pub use take::Take;
1213

14+
mod find_map;
1315
mod for_each;
1416
mod map;
1517
mod next;
@@ -29,6 +31,17 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {
2931
/// Get the max concurrency limit
3032
fn get_limit(&self) -> Option<usize>;
3133

34+
/// Applies `f` to each item of this stream in parallel, producing a new
35+
/// stream with the results.
36+
fn find_map<F, T, Fut>(self, f: F) -> FindMap<T>
37+
where
38+
F: FnMut(Self::Item) -> Fut + Send + Sync + Copy + 'static,
39+
T: Send + 'static,
40+
Fut: Future<Output = Option<T>> + Send,
41+
{
42+
FindMap::new(self, f)
43+
}
44+
3245
/// Applies `f` to each item of this stream in parallel, producing a new
3346
/// stream with the results.
3447
fn map<F, T, Fut>(self, f: F) -> Map<T>

0 commit comments

Comments
 (0)