From 0918fe15e516971ba03906a0017c257da28ae64d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 30 Aug 2019 16:18:53 +0200 Subject: [PATCH 1/5] init stream::from_fn Signed-off-by: Yoshua Wuyts --- src/stream/from_fn.rs | 56 +++++++++++++++++++++++++++++++++++++++++++ src/stream/mod.rs | 2 ++ 2 files changed, 58 insertions(+) create mode 100644 src/stream/from_fn.rs diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs new file mode 100644 index 000000000..9c5c0ca60 --- /dev/null +++ b/src/stream/from_fn.rs @@ -0,0 +1,56 @@ +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::stream::Stream; + +/// Creates a new stream where each iteration calls the provided closure. +/// +/// This allows creating a custom iterator with any behavior +/// without using the more verbose syntax of creating a dedicated type +/// and implementing the `Iterator` trait for it. +#[inline] +pub fn from_fn(f: F) -> FromFn + where F: FnMut() -> Box>>, + F: Unpin +{ + FromFn{ + closure: f, + fut: None, + } +} + +/// A stream where each iteration calls the provided closure. +/// +/// This `struct` is created by the [`stream::from_fn`] function. +/// See its documentation for more. +/// +/// [`stream::from_fn`]: fn.from_fn.html +#[derive(Clone)] +pub struct FromFn { + closure: F, + fut: Option>>>, +} + +impl Stream for FromFn + where F: FnMut() -> Box>> +{ + type Item = T; + + #[inline] + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let None = self.fut { + self.fut = Some((self.closure)()); + } + + let pinned = unsafe { Pin::new_unchecked(&mut self.fut.unwrap()) }; + pinned.poll(cx) + } +} + +impl fmt::Debug for FromFn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FromFn").finish() + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 8dcc6d54a..19fa3b8f4 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -22,11 +22,13 @@ //! ``` pub use empty::{empty, Empty}; +pub use from_fn::{from_fn, FromFn}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use stream::{Stream, Take}; mod empty; +mod from_fn; mod once; mod repeat; mod stream; From ac4727e814c641793afb7162403e416677f2870b Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 30 Aug 2019 16:24:04 +0200 Subject: [PATCH 2/5] final logic Signed-off-by: Yoshua Wuyts --- src/stream/from_fn.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index 9c5c0ca60..56ab1c81a 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -3,6 +3,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use futures::ready; use futures::stream::Stream; /// Creates a new stream where each iteration calls the provided closure. @@ -27,7 +28,6 @@ pub fn from_fn(f: F) -> FromFn /// See its documentation for more. /// /// [`stream::from_fn`]: fn.from_fn.html -#[derive(Clone)] pub struct FromFn { closure: F, fut: Option>>>, @@ -44,8 +44,11 @@ impl Stream for FromFn self.fut = Some((self.closure)()); } - let pinned = unsafe { Pin::new_unchecked(&mut self.fut.unwrap()) }; - pinned.poll(cx) + let pinned = unsafe { Pin::new_unchecked(&mut self.fut.as_mut().unwrap()) }; + let out = ready!(pinned.poll(cx)); + + self.fut = None; + Poll::Ready(out) } } From a2d8e6173e06001d4b1227dd15d8b9c5a0d4b4c2 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 30 Aug 2019 16:28:43 +0200 Subject: [PATCH 3/5] from_fn example Signed-off-by: Yoshua Wuyts --- src/stream/from_fn.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index 56ab1c81a..1f6eb1ae4 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -11,6 +11,28 @@ use futures::stream::Stream; /// This allows creating a custom iterator with any behavior /// without using the more verbose syntax of creating a dedicated type /// and implementing the `Iterator` trait for it. +/// +/// # Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// let mut count = 0; +/// let counter = async_std::stream::from_fn(|| async move { +/// // Increment our count. This is why we started at zero. +/// count += 1; +/// +/// // Check to see if we've finished counting or not. +/// if count < 6 { +/// Some(count) +/// } else { +/// None +/// } +/// }); +/// assert_eq!(counter.collect::>().await, &[1, 2, 3, 4, 5]); +/// # +/// # }) } +/// ``` #[inline] pub fn from_fn(f: F) -> FromFn where F: FnMut() -> Box>>, From 4ad4209cd5fd2e3600281eab6a043f937a8aef48 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 30 Aug 2019 20:04:36 +0200 Subject: [PATCH 4/5] use taiki-e version, and atomic example Signed-off-by: Yoshua Wuyts --- src/stream/from_fn.rs | 117 ++++++++++++++++++++++++------------------ 1 file changed, 68 insertions(+), 49 deletions(-) diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index 1f6eb1ae4..afcfa8f9a 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -1,81 +1,100 @@ +use futures::stream::Stream; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use futures::ready; -use futures::stream::Stream; - -/// Creates a new stream where each iteration calls the provided closure. +/// Creates a new stream where each iteration calls the provided closure +/// `F: FnMut() -> Option`. /// -/// This allows creating a custom iterator with any behavior +/// This allows creating a custom stream with any behavior /// without using the more verbose syntax of creating a dedicated type -/// and implementing the `Iterator` trait for it. +/// and implementing the `Stream` trait for it. +/// +/// Note that the `FromFn` stream doesn’t make assumptions about the behavior of the closure, +/// and therefore conservatively does not implement [`FusedStream`](futures_core::stream::FusedStream). +/// +/// The closure can use captures and its environment to track state across iterations. Depending on +/// how the stream is used, this may require specifying the `move` keyword on the closure. /// /// # Examples -/// +/// /// ``` -/// # fn main() { async_std::task::block_on(async { -/// # -/// let mut count = 0; -/// let counter = async_std::stream::from_fn(|| async move { +/// # futures::executor::block_on(async { +/// use async_std::{future, stream}; +/// use std::sync::atomic::{AtomicUsize, Ordering}; +/// +/// static COUNT: AtomicUsize = AtomicUsize::new(0); +/// let stream = stream::from_fn(|| { /// // Increment our count. This is why we started at zero. -/// count += 1; +/// +/// let count = COUNT.fetch_add(1, Ordering::SeqCst); /// /// // Check to see if we've finished counting or not. /// if count < 6 { -/// Some(count) +/// future::ready(Some(count)) /// } else { -/// None +/// future::ready(None) /// } /// }); -/// assert_eq!(counter.collect::>().await, &[1, 2, 3, 4, 5]); -/// # -/// # }) } +/// assert_eq!(stream.collect::>().await, &[1, 2, 3, 4, 5]); +/// # }); /// ``` -#[inline] -pub fn from_fn(f: F) -> FromFn - where F: FnMut() -> Box>>, - F: Unpin +pub fn from_fn(f: F) -> FromFn +where + F: FnMut() -> Fut, + Fut: Future>, +{ + FromFn { f, fut: None } +} + +/// Stream for the [`from_fn`] function. +#[must_use = "streams do nothing unless polled"] +pub struct FromFn { + f: F, + fut: Option, +} + +impl Unpin for FromFn {} + +impl fmt::Debug for FromFn +where + Fut: fmt::Debug, { - FromFn{ - closure: f, - fut: None, + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FromFn").field("fut", &self.fut).finish() } } -/// A stream where each iteration calls the provided closure. -/// -/// This `struct` is created by the [`stream::from_fn`] function. -/// See its documentation for more. -/// -/// [`stream::from_fn`]: fn.from_fn.html -pub struct FromFn { - closure: F, - fut: Option>>>, +impl FromFn { + unsafe_unpinned!(f: F); + unsafe_pinned!(fut: Option); } -impl Stream for FromFn - where F: FnMut() -> Box>> +impl Stream for FromFn +where + F: FnMut() -> Fut, + Fut: Future>, { - type Item = T; + type Item = Item; #[inline] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let None = self.fut { - self.fut = Some((self.closure)()); + if self.fut.is_none() { + let fut = (self.as_mut().f())(); + self.as_mut().fut().set(Some(fut)); } - - let pinned = unsafe { Pin::new_unchecked(&mut self.fut.as_mut().unwrap()) }; - let out = ready!(pinned.poll(cx)); - self.fut = None; - Poll::Ready(out) - } -} - -impl fmt::Debug for FromFn { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FromFn").finish() + self.as_mut() + .fut() + .as_pin_mut() + .unwrap() + .poll(cx) + .map(|item| { + self.as_mut().fut().set(None); + item + }) } } From 4d392984cba2afda520abe3a21eda341a508b705 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 30 Aug 2019 20:11:21 +0200 Subject: [PATCH 5/5] docs Signed-off-by: Yoshua Wuyts --- src/stream/from_fn.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index afcfa8f9a..b1d59e5e0 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -6,8 +6,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -/// Creates a new stream where each iteration calls the provided closure -/// `F: FnMut() -> Option`. +/// Creates a new stream where each iteration calls the provided closure. /// /// This allows creating a custom stream with any behavior /// without using the more verbose syntax of creating a dedicated type @@ -50,7 +49,12 @@ where FromFn { f, fut: None } } -/// Stream for the [`from_fn`] function. +/// A stream where each iteration calls the provided closure. +/// +/// This `struct` is created by the [`stream::from_fn`] function. +/// See its documentation for more. +/// +/// [`stream::from_fn`]: fn.from_fn.html #[must_use = "streams do nothing unless polled"] pub struct FromFn { f: F, @@ -64,7 +68,7 @@ where Fut: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FromFn").field("fut", &self.fut).finish() + f.debug_struct("FromFn").finish() } }