Skip to content

Simpler selection futures #2112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions futures-util/src/future/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures_sink::Sink;

/// Combines two different futures, streams, or sinks having the same associated types into a single
/// type.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Either<A, B> {
/// First branch of the type
Left(A),
Expand Down Expand Up @@ -280,10 +280,7 @@ mod if_std {
A: AsyncBufRead,
B: AsyncBufRead,
{
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<&[u8]>> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
unsafe {
match self.get_unchecked_mut() {
Either::Left(x) => Pin::new_unchecked(x).poll_fill_buf(cx),
Expand Down
83 changes: 83 additions & 0 deletions futures-util/src/future/first.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use crate::future::Either;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;

/// Future for the [`first()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug, Clone, Default)]
pub struct First<F1, F2> {
future1: F1,
future2: F2,
}

impl<F1: Unpin, F2: Unpin> Unpin for First<F1, F2> {}

impl<F1, F2> First<F1, F2> {
unsafe_pinned!(future1: F1);
unsafe_pinned!(future2: F2);
Comment on lines +18 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use pin_project instead of pin_utils::unsafe_pinned. (Since #2128, we have been using it.)

}

impl<F1: Future, F2: Future> Future for First<F1, F2> {
type Output = Either<F1::Output, F2::Output>;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().future1().poll(cx) {
Poll::Ready(out) => Poll::Ready(Either::Left(out)),
Poll::Pending => match self.future2().poll(cx) {
Poll::Ready(out) => Poll::Ready(Either::Right(out)),
Poll::Pending => Poll::Pending,
},
}
}
}

// We don't provide FusedFuture, because the overhead of implementing it (
// which requires a separate bool or Option field) is precisely the same as
// calling .fuse()

/// Waits for either one of two differently-typed futures to complete.
///
/// This function will return a new future which awaits for either one of both
/// futures to complete. The returned future will finish with the value of
/// whichever future finishes first.
///
/// The future will discard the future that didn't complete; see `select` for
/// a future that will instead return the incomplete future.
///
/// Note that this function consumes the receiving futures and returns a
/// wrapped version of them.
///
/// Also note that if both this and the second future have the same
/// output type you can use the `Either::into_immer` method to
/// conveniently extract out the value at the end.
pub fn first<F1, F2>(future1: F1, future2: F2) -> First<F1, F2> {
First { future1, future2 }
}

#[test]
fn test_first() {
use crate::future::{pending, ready, FutureExt};
use crate::task::noop_waker_ref;

let mut context = Context::from_waker(noop_waker_ref());

assert_eq!(
first(ready(10), ready(20)).poll_unpin(&mut context),
Poll::Ready(Either::Left(10))
);
assert_eq!(
first(ready(10), pending::<()>()).poll_unpin(&mut context),
Poll::Ready(Either::Left(10))
);
assert_eq!(
first(pending::<()>(), ready(20)).poll_unpin(&mut context),
Poll::Ready(Either::Right(20))
);
assert_eq!(
first(pending::<()>(), pending::<()>()).poll_unpin(&mut context),
Poll::Pending
);
}
111 changes: 111 additions & 0 deletions futures-util/src/future/first_all.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use alloc::vec::Vec;
use core::iter::FromIterator;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};

/// Future for the [`first_all()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug, Clone)]
pub struct FirstAll<F> {
// Critical safety invariant: after FirstAll is created, this vector can
// never be reallocated, in order to ensure that Pin is upheld.
futures: Vec<F>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to use Pin<Box<[F]>> + this helper function, instead of Vec<F> + Pin::new_unchecked

}

// Safety: once created, the contents of the vector don't change, and they'll
// remain in place permanently.
impl<F> Unpin for FirstAll<F> {}

impl<F: Future> Future for FirstAll<F> {
type Output = F::Output;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match this.futures.iter_mut().find_map(move |fut| {
// Safety: we promise that the future is never moved out of the vec,
// and that the vec never reallocates once FirstAll has been created
// (specifically after the first poll)
let pinned = unsafe { Pin::new_unchecked(fut) };
match pinned.poll(cx) {
Poll::Ready(out) => Some(out),
Poll::Pending => None,
}
}) {
Some(out) => Poll::Ready(out),
None => Poll::Pending,
}
}
}

// We don't provide FusedFuture, because the overhead of implementing it (
// which requires clearing the vector after Ready is returned) is precisely
// the same as using .fuse()

impl<Fut: Future> FromIterator<Fut> for FirstAll<Fut> {
fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self {
first_all(iter)
}
}

/// Creates a new future which will return the result of the first completed
/// future out of a list.
///
/// The returned future will wait for any future within `futures` to be ready.
/// Upon completion the item resolved will be returned.
///
/// The remaining futures will be discarded when the returned future is
/// dropped; see `select_all` for a version that returns the incomplete
/// futures if you need to poll over them further.
///
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// # Panics
///
/// This function will panic if the iterator specified contains no items.
pub fn first_all<I>(futures: I) -> FirstAll<I::Item>
where
I: IntoIterator,
I::Item: Future,
{
let futures = Vec::from_iter(futures);
assert!(!futures.is_empty(), "Need at least 1 future for first_any");
FirstAll { futures }
}

#[test]
fn test_first_all() {
use crate::future::FutureExt;
use crate::task::noop_waker_ref;
use futures_channel::oneshot::channel;

let mut futures = vec![];
let mut senders = vec![];

for _ in 0..10 {
let (send, recv) = channel();
futures.push(recv);
senders.push(send);
}

let (send, recv) = channel();
futures.push(recv);

for _ in 0..10 {
let (send, recv) = channel();
futures.push(recv);
senders.push(send);
}

let mut fut = first_all(futures);
let mut context = Context::from_waker(noop_waker_ref());

let poll = fut.poll_unpin(&mut context);
assert_eq!(poll, Poll::Pending);

send.send(10).unwrap();
let poll = fut.poll_unpin(&mut context);
assert_eq!(poll, Poll::Ready(Ok(10)));
}
Loading