diff --git a/futures-util/src/sink/from_fn.rs b/futures-util/src/sink/from_fn.rs new file mode 100644 index 0000000000..4b0871b459 --- /dev/null +++ b/futures-util/src/sink/from_fn.rs @@ -0,0 +1,74 @@ +use core::{future::Future, pin::Pin}; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; +use pin_project::pin_project; + +/// Sink for the [`from_fn`] function. +#[pin_project] +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct FromFn { + function: F, + #[pin] + future: Option, +} + +/// Create a sink from a function which processes one item at a time. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::sink::{self, SinkExt}; +/// +/// let from_fn = sink::from_fn(|i: i32| { +/// async move { +/// eprintln!("{}", i); +/// Ok::<_, futures::never::Never>(()) +/// } +/// }); +/// futures::pin_mut!(from_fn); +/// from_fn.send(5).await?; +/// # Ok::<(), futures::never::Never>(()) }).unwrap(); +/// ``` +pub fn from_fn(function: F) -> FromFn { + FromFn { + function, + future: None, + } +} + +impl Sink for FromFn +where + F: FnMut(T) -> R, + R: Future>, +{ + type Error = E; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } + + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + let mut this = self.project(); + debug_assert!(this.future.is_none()); + let future = (this.function)(item); + this.future.set(Some(future)); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + Poll::Ready(if let Some(future) = this.future.as_mut().as_pin_mut() { + let result = ready!(future.poll(cx)); + this.future.set(None); + result + } else { + Ok(()) + }) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} diff --git a/futures-util/src/sink/mod.rs b/futures-util/src/sink/mod.rs index b0e2c83f00..3bba6bf7c4 100644 --- a/futures-util/src/sink/mod.rs +++ b/futures-util/src/sink/mod.rs @@ -29,6 +29,9 @@ pub use self::fanout::Fanout; mod flush; pub use self::flush::Flush; +mod from_fn; +pub use self::from_fn::{FromFn, from_fn}; + mod err_into; pub use self::err_into::SinkErrInto; @@ -264,7 +267,7 @@ pub trait SinkExt: Sink { { CompatSink::new(self) } - + /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`] /// sink types. fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll> diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 66f1be0983..18c26d8aae 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -416,7 +416,7 @@ pub mod sink { pub use futures_sink::Sink; pub use futures_util::sink::{ - Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With, + Close, Flush, FromFn, from_fn, Send, SendAll, SinkErrInto, SinkMapErr, With, SinkExt, Fanout, Drain, drain, WithFlatMap, }; diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs index 8ed201e394..645ceb5a4f 100644 --- a/futures/tests/sink.rs +++ b/futures/tests/sink.rs @@ -32,8 +32,8 @@ mod unwrap { mod flag_cx { use futures::task::{self, ArcWake, Context}; - use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; // An Unpark struct that records unpark events for inspection pub struct Flag(AtomicBool); @@ -129,7 +129,10 @@ mod manual_flush { Ok(()) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { if self.data.is_empty() { Poll::Ready(Ok(())) } else { @@ -325,9 +328,9 @@ fn mpsc_blocking_start_send() { use futures::executor::block_on; use futures::future::{self, FutureExt}; - use start_send_fut::StartSendFut; use flag_cx::flag_cx; use sassert_next::sassert_next; + use start_send_fut::StartSendFut; use unwrap::unwrap; let (mut tx, mut rx) = mpsc::channel::(0); @@ -461,8 +464,8 @@ fn with_flush_propagate() { use futures::sink::{Sink, SinkExt}; use std::pin::Pin; - use manual_flush::ManualFlush; use flag_cx::flag_cx; + use manual_flush::ManualFlush; use unwrap::unwrap; let mut sink = ManualFlush::new().with(future::ok::, ()>); @@ -508,10 +511,10 @@ fn buffer() { use futures::future::FutureExt; use futures::sink::SinkExt; - use start_send_fut::StartSendFut; + use allowance::manual_allow; use flag_cx::flag_cx; + use start_send_fut::StartSendFut; use unwrap::unwrap; - use allowance::manual_allow; let (sink, allow) = manual_allow::(); let sink = sink.buffer(2); @@ -553,8 +556,8 @@ fn fanout_backpressure() { use futures::sink::SinkExt; use futures::stream::StreamExt; - use start_send_fut::StartSendFut; use flag_cx::flag_cx; + use start_send_fut::StartSendFut; use unwrap::unwrap; let (left_send, mut left_recv) = mpsc::channel(0); @@ -610,6 +613,44 @@ fn sink_map_err() { ); } +#[test] +fn sink_from_fn() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future::poll_fn; + use futures::sink::{self, Sink, SinkExt}; + use futures::task::Poll; + + block_on(poll_fn(|cx| { + let (tx, mut rx) = mpsc::channel(1); + let from_fn = sink::from_fn(|i: i32| { + let mut tx = tx.clone(); + async move { + tx.send(i).await.unwrap(); + Ok::<_, String>(()) + } + }); + futures::pin_mut!(from_fn); + assert_eq!(from_fn.as_mut().start_send(1), Ok(())); + assert_eq!(from_fn.as_mut().poll_flush(cx), Poll::Ready(Ok(()))); + assert_eq!(rx.try_next().unwrap(), Some(1)); + + assert_eq!(from_fn.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(from_fn.as_mut().start_send(2), Ok(())); + assert_eq!(from_fn.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(from_fn.as_mut().start_send(3), Ok(())); + assert_eq!(rx.try_next().unwrap(), Some(2)); + assert!(rx.try_next().is_err()); + assert_eq!(from_fn.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(from_fn.as_mut().start_send(4), Ok(())); + assert_eq!(from_fn.as_mut().poll_flush(cx), Poll::Pending); // Channel full + assert_eq!(rx.try_next().unwrap(), Some(3)); + assert_eq!(rx.try_next().unwrap(), Some(4)); + + Poll::Ready(()) + })) +} + #[test] fn err_into() { use futures::channel::mpsc;