Skip to content

Commit 3c6d41c

Browse files
authored
Merge pull request #541 from yjhmelody/stream-partition
add stream-partition
2 parents 837604b + d76b32e commit 3c6d41c

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

src/stream/stream/mod.rs

+42
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,11 @@ cfg_unstable! {
120120

121121
use crate::stream::into_stream::IntoStream;
122122
use crate::stream::{FromStream, Product, Sum};
123+
use crate::stream::Extend;
123124

124125
use count::CountFuture;
126+
use partition::PartitionFuture;
127+
125128
pub use merge::Merge;
126129
pub use flatten::Flatten;
127130
pub use flat_map::FlatMap;
@@ -132,6 +135,7 @@ cfg_unstable! {
132135
mod merge;
133136
mod flatten;
134137
mod flat_map;
138+
mod partition;
135139
mod timeout;
136140
mod throttle;
137141
}
@@ -1308,6 +1312,44 @@ extension_trait! {
13081312
FoldFuture::new(self, init, f)
13091313
}
13101314

1315+
#[doc = r#"
1316+
A combinator that applies a function to every element in a stream
1317+
creating two collections from it.
1318+
1319+
# Examples
1320+
1321+
Basic usage:
1322+
1323+
```
1324+
# fn main() { async_std::task::block_on(async {
1325+
#
1326+
use async_std::prelude::*;
1327+
use async_std::stream;
1328+
1329+
let (even, odd): (Vec<i32>, Vec<i32>) = stream::from_iter(vec![1, 2, 3])
1330+
.partition(|&n| n % 2 == 0).await;
1331+
1332+
assert_eq!(even, vec![2]);
1333+
assert_eq!(odd, vec![1, 3]);
1334+
1335+
#
1336+
# }) }
1337+
```
1338+
"#]
1339+
#[cfg(feature = "unstable")]
1340+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1341+
fn partition<B, F>(
1342+
self,
1343+
f: F,
1344+
) -> impl Future<Output = (B, B)> [PartitionFuture<Self, F, B>]
1345+
where
1346+
Self: Sized,
1347+
F: FnMut(&Self::Item) -> bool,
1348+
B: Default + Extend<Self::Item>,
1349+
{
1350+
PartitionFuture::new(self, f)
1351+
}
1352+
13111353
#[doc = r#"
13121354
Call a closure on each element of the stream.
13131355

src/stream/stream/partition.rs

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::default::Default;
4+
use pin_project_lite::pin_project;
5+
6+
use crate::stream::Stream;
7+
use crate::task::{Context, Poll};
8+
9+
pin_project! {
10+
#[derive(Debug)]
11+
#[allow(missing_debug_implementations)]
12+
#[cfg(all(feature = "default", feature = "unstable"))]
13+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
14+
pub struct PartitionFuture<S, F, B> {
15+
#[pin]
16+
stream: S,
17+
f: F,
18+
res: Option<(B, B)>,
19+
}
20+
}
21+
22+
impl<S, F, B: Default> PartitionFuture<S, F, B> {
23+
pub(super) fn new(stream: S, f: F) -> Self {
24+
Self {
25+
stream,
26+
f,
27+
res: Some((B::default(), B::default())),
28+
}
29+
}
30+
}
31+
32+
impl<S, F, B> Future for PartitionFuture<S, F, B>
33+
where
34+
S: Stream + Sized,
35+
F: FnMut(&S::Item) -> bool,
36+
B: Default + Extend<S::Item>,
37+
{
38+
type Output = (B, B);
39+
40+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41+
let mut this = self.project();
42+
43+
loop {
44+
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
45+
46+
match next {
47+
Some(v) => {
48+
let mut res = this.res.take().unwrap();
49+
match (this.f)(&v) {
50+
true => res.0.extend(Some(v)),
51+
false => res.1.extend(Some(v)),
52+
};
53+
54+
*this.res = Some(res);
55+
}
56+
None => return Poll::Ready(this.res.take().unwrap()),
57+
}
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)