From 31cf932d808bdfb3cbdf024968b333f23d510547 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 16 Nov 2019 00:24:59 +0900 Subject: [PATCH 1/5] wip: Add stream unzip --- src/stream/stream/mod.rs | 13 ++++++++++ src/stream/stream/unzip.rs | 53 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 src/stream/stream/unzip.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 893858375..900bde3a9 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -130,6 +130,7 @@ cfg_unstable! { pub use flat_map::FlatMap; pub use timeout::{TimeoutError, Timeout}; pub use throttle::Throttle; + pub use unzip::UnzipFuture; mod count; mod merge; @@ -138,6 +139,7 @@ cfg_unstable! { mod partition; mod timeout; mod throttle; + mod unzip; } extension_trait! { @@ -1717,6 +1719,17 @@ extension_trait! { Zip::new(self, other) } + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn unzip(self) -> impl Future [UnzipFuture] + where + FromA: Default + Extend, + FromB: Default + Extend, + Self: Stream + Sized, + { + UnzipFuture::new(self) + } + #[doc = r#" Transforms a stream into a collection. diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs new file mode 100644 index 000000000..ef1ff3a06 --- /dev/null +++ b/src/stream/stream/unzip.rs @@ -0,0 +1,53 @@ +use std::future::Future; +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + pub struct UnzipFuture { + #[pin] + stream: S, + res: (FromA, FromB), + } +} + +impl UnzipFuture +where + FromA: Default, + FromB: Default, +{ + pub(super) fn new(stream: S) -> Self { + UnzipFuture { + stream, + res: (FromA::default(), FromB::default()), + } + } +} + +impl Future for UnzipFuture +where + S: Stream, + FromA: Default + Extend + Copy, + FromB: Default + Extend + Copy, +{ + type Output = (FromA, FromB); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + match next { + Some((a, b)) => { + this.res.0.extend(Some(a)); + this.res.1.extend(Some(b)); + Poll::Pending + } + None => Poll::Ready(*this.res), + } + } +} From 603b3c508559129bf2f866291511e35db0a93c4d Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 16 Nov 2019 01:16:35 +0900 Subject: [PATCH 2/5] add: Add stream unzip --- src/stream/stream/mod.rs | 2 +- src/stream/stream/unzip.rs | 30 ++++++++++++++++++------------ 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 900bde3a9..04aa4d68e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -124,13 +124,13 @@ cfg_unstable! { use count::CountFuture; use partition::PartitionFuture; + use unzip::UnzipFuture; pub use merge::Merge; pub use flatten::Flatten; pub use flat_map::FlatMap; pub use timeout::{TimeoutError, Timeout}; pub use throttle::Throttle; - pub use unzip::UnzipFuture; mod count; mod merge; diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs index ef1ff3a06..4f5dfa198 100644 --- a/src/stream/stream/unzip.rs +++ b/src/stream/stream/unzip.rs @@ -7,12 +7,13 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { + #[derive(Clone, Debug)] #[cfg(all(feature = "default", feature = "unstable"))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - pub struct UnzipFuture { + pub struct UnzipFuture { #[pin] stream: S, - res: (FromA, FromB), + res: Option<(FromA, FromB)>, } } @@ -24,7 +25,7 @@ where pub(super) fn new(stream: S) -> Self { UnzipFuture { stream, - res: (FromA::default(), FromB::default()), + res: Some((FromA::default(), FromB::default())), } } } @@ -32,22 +33,27 @@ where impl Future for UnzipFuture where S: Stream, - FromA: Default + Extend + Copy, - FromB: Default + Extend + Copy, + FromA: Default + Extend, + FromB: Default + Extend, { type Output = (FromA, FromB); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); - let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); - match next { - Some((a, b)) => { - this.res.0.extend(Some(a)); - this.res.1.extend(Some(b)); - Poll::Pending + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + match next { + Some((a, b)) => { + let mut res = this.res.take().unwrap(); + res.0.extend(Some(a)); + res.1.extend(Some(b)); + + *this.res = Some(res); + } + None => return Poll::Ready(this.res.take().unwrap()), } - None => Poll::Ready(*this.res), } } } From 91ee4c7b9fddcf0c245d9527b75fec2642846702 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 16 Nov 2019 01:16:48 +0900 Subject: [PATCH 3/5] doc: Add stream unzip doc --- src/stream/stream/mod.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 04aa4d68e..e0e9b7e44 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1719,6 +1719,33 @@ extension_trait! { Zip::new(self, other) } + #[doc = r#" + Converts an stream of pairs into a pair of containers. + + unzip() consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements. + + This function is, in some sense, the opposite of [`zip`]. + + [`zip`]: trait.Stream.html#method.zip + + # Example + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let s = stream::from_iter(vec![(1,2), (3,4)]); + + let (left, right): (Vec<_>, Vec<_>) = s.unzip().await; + + assert_eq!(left, [1, 3]); + assert_eq!(right, [2, 4]); + # + # }) } + ``` + "#] #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn unzip(self) -> impl Future [UnzipFuture] From a05b6a38104c26a1ffd2eb7ed6e4e32912c856fb Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 16 Nov 2019 01:36:53 +0900 Subject: [PATCH 4/5] fix: mutable ref --- src/stream/stream/unzip.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs index 4f5dfa198..e0832ff71 100644 --- a/src/stream/stream/unzip.rs +++ b/src/stream/stream/unzip.rs @@ -46,11 +46,9 @@ where match next { Some((a, b)) => { - let mut res = this.res.take().unwrap(); + let res = this.res.as_mut().unwrap(); res.0.extend(Some(a)); res.1.extend(Some(b)); - - *this.res = Some(res); } None => return Poll::Ready(this.res.take().unwrap()), } From d146d95a3934ab214c8ec22c2cb029c562b60ef0 Mon Sep 17 00:00:00 2001 From: nasa Date: Wed, 20 Nov 2019 21:38:42 +0900 Subject: [PATCH 5/5] Update src/stream/stream/mod.rs Co-Authored-By: Taiki Endo --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e0e9b7e44..51c413900 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1722,7 +1722,7 @@ extension_trait! { #[doc = r#" Converts an stream of pairs into a pair of containers. - unzip() consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements. + `unzip()` consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements. This function is, in some sense, the opposite of [`zip`].