Skip to content

feat: Add sink::from_fn #2254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions futures-util/src/sink/from_fn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use core::{future::Future, pin::Pin};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
use pin_project::pin_project;

/// Sink for the [`from_fn`] function.
#[pin_project]
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct FromFn<F, R> {
function: F,
#[pin]
future: Option<R>,
}

/// Create a sink from a function which processes one item at a time.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::sink::{self, SinkExt};
///
/// let from_fn = sink::from_fn(|i: i32| {
/// async move {
/// eprintln!("{}", i);
/// Ok::<_, futures::never::Never>(())
/// }
/// });
/// futures::pin_mut!(from_fn);
/// from_fn.send(5).await?;
/// # Ok::<(), futures::never::Never>(()) }).unwrap();
/// ```
pub fn from_fn<F, R>(function: F) -> FromFn<F, R> {
FromFn {
function,
future: None,
}
}

impl<F, R, T, E> Sink<T> for FromFn<F, R>
where
F: FnMut(T) -> R,
R: Future<Output = Result<(), E>>,
{
type Error = E;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}

fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
let mut this = self.project();
debug_assert!(this.future.is_none());
let future = (this.function)(item);
this.future.set(Some(future));
Ok(())
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Poll::Ready(if let Some(future) = this.future.as_mut().as_pin_mut() {
let result = ready!(future.poll(cx));
this.future.set(None);
result
} else {
Ok(())
})
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
}
5 changes: 4 additions & 1 deletion futures-util/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ pub use self::fanout::Fanout;
mod flush;
pub use self::flush::Flush;

mod from_fn;
pub use self::from_fn::{FromFn, from_fn};

mod err_into;
pub use self::err_into::SinkErrInto;

Expand Down Expand Up @@ -264,7 +267,7 @@ pub trait SinkExt<Item>: Sink<Item> {
{
CompatSink::new(self)
}

/// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
/// sink types.
fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
Expand Down
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ pub mod sink {
pub use futures_sink::Sink;

pub use futures_util::sink::{
Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With,
Close, Flush, FromFn, from_fn, Send, SendAll, SinkErrInto, SinkMapErr, With,
SinkExt, Fanout, Drain, drain,
WithFlatMap,
};
Expand Down
55 changes: 48 additions & 7 deletions futures/tests/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ mod unwrap {

mod flag_cx {
use futures::task::{self, ArcWake, Context};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

// An Unpark struct that records unpark events for inspection
pub struct Flag(AtomicBool);
Expand Down Expand Up @@ -129,7 +129,10 @@ mod manual_flush {
Ok(())
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
if self.data.is_empty() {
Poll::Ready(Ok(()))
} else {
Expand Down Expand Up @@ -325,9 +328,9 @@ fn mpsc_blocking_start_send() {
use futures::executor::block_on;
use futures::future::{self, FutureExt};

use start_send_fut::StartSendFut;
use flag_cx::flag_cx;
use sassert_next::sassert_next;
use start_send_fut::StartSendFut;
use unwrap::unwrap;

let (mut tx, mut rx) = mpsc::channel::<i32>(0);
Expand Down Expand Up @@ -461,8 +464,8 @@ fn with_flush_propagate() {
use futures::sink::{Sink, SinkExt};
use std::pin::Pin;

use manual_flush::ManualFlush;
use flag_cx::flag_cx;
use manual_flush::ManualFlush;
use unwrap::unwrap;

let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>);
Expand Down Expand Up @@ -508,10 +511,10 @@ fn buffer() {
use futures::future::FutureExt;
use futures::sink::SinkExt;

use start_send_fut::StartSendFut;
use allowance::manual_allow;
use flag_cx::flag_cx;
use start_send_fut::StartSendFut;
use unwrap::unwrap;
use allowance::manual_allow;

let (sink, allow) = manual_allow::<i32>();
let sink = sink.buffer(2);
Expand Down Expand Up @@ -553,8 +556,8 @@ fn fanout_backpressure() {
use futures::sink::SinkExt;
use futures::stream::StreamExt;

use start_send_fut::StartSendFut;
use flag_cx::flag_cx;
use start_send_fut::StartSendFut;
use unwrap::unwrap;

let (left_send, mut left_recv) = mpsc::channel(0);
Expand Down Expand Up @@ -610,6 +613,44 @@ fn sink_map_err() {
);
}

#[test]
fn sink_from_fn() {
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::poll_fn;
use futures::sink::{self, Sink, SinkExt};
use futures::task::Poll;

block_on(poll_fn(|cx| {
let (tx, mut rx) = mpsc::channel(1);
let from_fn = sink::from_fn(|i: i32| {
let mut tx = tx.clone();
async move {
tx.send(i).await.unwrap();
Ok::<_, String>(())
}
});
futures::pin_mut!(from_fn);
assert_eq!(from_fn.as_mut().start_send(1), Ok(()));
assert_eq!(from_fn.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
assert_eq!(rx.try_next().unwrap(), Some(1));

assert_eq!(from_fn.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(from_fn.as_mut().start_send(2), Ok(()));
assert_eq!(from_fn.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(from_fn.as_mut().start_send(3), Ok(()));
assert_eq!(rx.try_next().unwrap(), Some(2));
assert!(rx.try_next().is_err());
assert_eq!(from_fn.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(from_fn.as_mut().start_send(4), Ok(()));
assert_eq!(from_fn.as_mut().poll_flush(cx), Poll::Pending); // Channel full
assert_eq!(rx.try_next().unwrap(), Some(3));
assert_eq!(rx.try_next().unwrap(), Some(4));

Poll::Ready(())
}))
}

#[test]
fn err_into() {
use futures::channel::mpsc;
Expand Down