From 5ab1b26b93a634fd2a01210e1fe8c551329afd31 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 31 Aug 2019 00:54:43 +0900 Subject: [PATCH] Add stream::from_fn --- futures-util/src/stream/from_fn.rs | 98 ++++++++++++++++++++++++++++++ futures-util/src/stream/mod.rs | 3 + futures-util/src/stream/unfold.rs | 10 ++- futures/src/lib.rs | 7 ++- 4 files changed, 109 insertions(+), 9 deletions(-) create mode 100644 futures-util/src/stream/from_fn.rs diff --git a/futures-util/src/stream/from_fn.rs b/futures-util/src/stream/from_fn.rs new file mode 100644 index 0000000000..e5a0d3de10 --- /dev/null +++ b/futures-util/src/stream/from_fn.rs @@ -0,0 +1,98 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +/// Creates a new stream where each iteration calls the provided closure +/// `F: FnMut() -> Option`. +/// +/// This allows creating a custom stream with any behavior +/// without using the more verbose syntax of creating a dedicated type +/// and implementing the `Stream` trait for it. +/// +/// Note that the `FromFn` stream doesn’t make assumptions about the behavior of the closure, +/// and therefore conservatively does not implement [`FusedStream`](futures_core::stream::FusedStream). +/// +/// The closure can use captures and its environment to track state across iterations. Depending on +/// how the stream is used, this may require specifying the `move` keyword on the closure. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// use futures::stream::{self, StreamExt}; +/// +/// let mut count = 0; +/// let stream = stream::from_fn(|| { +/// // Increment our count. This is why we started at zero. +/// count += 1; +/// +/// // Check to see if we've finished counting or not. +/// if count < 6 { +/// future::ready(Some(count)) +/// } else { +/// future::ready(None) +/// } +/// }); +/// assert_eq!(stream.collect::>().await, &[1, 2, 3, 4, 5]); +/// # }); +/// ``` +pub fn from_fn(f: F) -> FromFn +where + F: FnMut() -> Fut, + Fut: Future>, +{ + FromFn { f, fut: None } +} + +/// Stream for the [`from_fn`] function. +#[must_use = "streams do nothing unless polled"] +pub struct FromFn { + f: F, + fut: Option, +} + +impl Unpin for FromFn {} + +impl fmt::Debug for FromFn +where + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FromFn").field("fut", &self.fut).finish() + } +} + +impl FromFn { + unsafe_unpinned!(f: F); + unsafe_pinned!(fut: Option); +} + +impl Stream for FromFn +where + F: FnMut() -> Fut, + Fut: Future>, +{ + type Item = Item; + + #[inline] + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.fut.is_none() { + let fut = (self.as_mut().f())(); + self.as_mut().fut().set(Some(fut)); + } + + self.as_mut() + .fut() + .as_pin_mut() + .unwrap() + .poll(cx) + .map(|item| { + self.as_mut().fut().set(None); + item + }) + } +} diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 1ce75ee121..44e684dc1f 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -59,6 +59,9 @@ pub use self::forward::Forward; mod for_each; pub use self::for_each::ForEach; +mod from_fn; +pub use self::from_fn::{from_fn, FromFn}; + mod fuse; pub use self::fuse::Fuse; diff --git a/futures-util/src/stream/unfold.rs b/futures-util/src/stream/unfold.rs index 4774a70f4f..2ad1795286 100644 --- a/futures-util/src/stream/unfold.rs +++ b/futures-util/src/stream/unfold.rs @@ -32,21 +32,19 @@ use pin_utils::{unsafe_pinned, unsafe_unpinned}; /// /// ``` /// # futures::executor::block_on(async { -/// use futures::future; /// use futures::stream::{self, StreamExt}; /// -/// let stream = stream::unfold(0, |state| { +/// let stream = stream::unfold(0, |state| async move { /// if state <= 2 { /// let next_state = state + 1; /// let yielded = state * 2; -/// future::ready(Some((yielded, next_state))) +/// Some((yielded, next_state)) /// } else { -/// future::ready(None) +/// None /// } /// }); /// -/// let result = stream.collect::>().await; -/// assert_eq!(result, vec![0, 2, 4]); +/// assert_eq!(stream.collect::>().await, vec![0, 2, 4]); /// # }); /// ``` pub fn unfold(init: T, f: F) -> Unfold diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 747ffd59b6..98de00a901 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -407,12 +407,13 @@ pub mod stream { pub use futures_core::stream::{BoxStream, LocalBoxStream}; pub use futures_util::stream::{ - iter, Iter, - repeat, Repeat, empty, Empty, - pending, Pending, + from_fn, FromFn, + iter, Iter, once, Once, + pending, Pending, poll_fn, PollFn, + repeat, Repeat, select, Select, unfold, Unfold,