Skip to content

Commit 1c62e0f

Browse files
committed
feat(futures-util/stream): unzip operator
closes: rust-lang#2234
1 parent bde29da commit 1c62e0f

File tree

2 files changed

+109
-0
lines changed

2 files changed

+109
-0
lines changed

futures-util/src/stream/stream/mod.rs

+43
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ mod collect;
2929
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
3030
pub use self::collect::Collect;
3131

32+
mod unzip;
33+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
34+
pub use self::unzip::Unzip;
35+
3236
mod concat;
3337
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
3438
pub use self::concat::Concat;
@@ -477,6 +481,45 @@ pub trait StreamExt: Stream {
477481
assert_future::<C, _>(Collect::new(self))
478482
}
479483

484+
/// Converts a stream of pairs into a future, which
485+
/// resolves to pair of containers.
486+
///
487+
/// `unzip()` produces a future, which resolves to two
488+
/// collections: one from the left elements of the pairs,
489+
/// and one from the right elements.
490+
///
491+
/// The returned future will be resolved when the stream terminates.
492+
///
493+
/// # Examples
494+
///
495+
/// ```
496+
/// # futures::executor::block_on(async {
497+
/// use futures::channel::mpsc;
498+
/// use futures::stream::StreamExt;
499+
/// use std::thread;
500+
///
501+
/// let (tx, rx) = mpsc::unbounded();
502+
///
503+
/// thread::spawn(move || {
504+
/// tx.unbounded_send((1, 2)).unwrap();
505+
/// tx.unbounded_send((3, 4)).unwrap();
506+
/// tx.unbounded_send((5, 6)).unwrap();
507+
/// });
508+
///
509+
/// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await;
510+
/// assert_eq!(o1, vec![1, 3, 5]);
511+
/// assert_eq!(o2, vec![2, 4, 6]);
512+
/// # });
513+
/// ```
514+
fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
515+
where
516+
FromA: Default + Extend<A>,
517+
FromB: Default + Extend<B>,
518+
Self: Sized + Stream<Item = (A, B)>,
519+
{
520+
assert_future::<(FromA, FromB), _>(Unzip::new(self))
521+
}
522+
480523
/// Concatenate all items of a stream into a single extendable
481524
/// destination, returning a future representing the end result.
482525
///
+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use core::mem;
2+
use core::pin::Pin;
3+
use futures_core::future::{FusedFuture, Future};
4+
use futures_core::stream::{FusedStream, Stream};
5+
use futures_core::task::{Context, Poll};
6+
use pin_project::pin_project;
7+
8+
/// Future for the [`unzip`](super::StreamExt::unzip) method.
9+
#[pin_project]
10+
#[derive(Debug)]
11+
#[must_use = "futures do nothing unless you `.await` or poll them"]
12+
pub struct Unzip<St, FromA, FromB> {
13+
#[pin]
14+
stream: St,
15+
left: FromA,
16+
right: FromB,
17+
}
18+
19+
impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> {
20+
fn finish(self: Pin<&mut Self>) -> (FromA, FromB) {
21+
let this = self.project();
22+
(
23+
mem::take(this.left),
24+
mem::take(this.right),
25+
)
26+
}
27+
28+
pub(super) fn new(stream: St) -> Self {
29+
Self {
30+
stream,
31+
left: Default::default(),
32+
right: Default::default(),
33+
}
34+
}
35+
}
36+
37+
impl<St, A, B, FromA, FromB> FusedFuture for Unzip<St, FromA, FromB>
38+
where St: FusedStream<Item = (A, B)>,
39+
FromA: Default + Extend<A>,
40+
FromB: Default + Extend<B>,
41+
{
42+
fn is_terminated(&self) -> bool {
43+
self.stream.is_terminated()
44+
}
45+
}
46+
47+
impl<St, A, B, FromA, FromB> Future for Unzip<St, FromA, FromB>
48+
where St: Stream<Item = (A, B)>,
49+
FromA: Default + Extend<A>,
50+
FromB: Default + Extend<B>,
51+
{
52+
type Output = (FromA, FromB);
53+
54+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(FromA, FromB)> {
55+
let mut this = self.as_mut().project();
56+
loop {
57+
match ready!(this.stream.as_mut().poll_next(cx)) {
58+
Some(e) => {
59+
this.left.extend(Some(e.0));
60+
this.right.extend(Some(e.1));
61+
},
62+
None => return Poll::Ready(self.finish()),
63+
}
64+
}
65+
}
66+
}

0 commit comments

Comments
 (0)