Skip to content

Fixed flat_map and flatten #701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 18, 2020
19 changes: 13 additions & 6 deletions src/stream/stream/flat_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,22 @@ where
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
return Poll::Ready(item);
let next_item = futures_core::ready!(inner.poll_next(cx));

if next_item.is_some() {
return Poll::Ready(next_item);
} else {
this.inner_stream.set(None);
}
}

match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
let inner = futures_core::ready!(this.stream.as_mut().poll_next(cx));

if inner.is_some() {
this.inner_stream.set(inner.map(IntoStream::into_stream));
} else {
return Poll::Ready(None);
}
}
}
}
}
17 changes: 12 additions & 5 deletions src/stream/stream/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,21 @@ where
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
return Poll::Ready(item);
let next_item = futures_core::ready!(inner.poll_next(cx));

if next_item.is_some() {
return Poll::Ready(next_item);
} else {
this.inner_stream.set(None);
}
}

match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
let inner = futures_core::ready!(this.stream.as_mut().poll_next(cx));

if inner.is_some() {
this.inner_stream.set(inner.map(IntoStream::into_stream));
} else {
return Poll::Ready(None);
}
}
}
Expand Down
75 changes: 75 additions & 0 deletions tests/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::convert::identity;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -98,3 +100,76 @@ fn merge_works_with_unfused_streams() {
});
assert_eq!(xs, vec![92, 92]);
}

struct S<T>(T);

impl<T: Stream + Unpin> Stream for S<T> {
type Item = T::Item;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe { Pin::new_unchecked(&mut self.0) }.poll_next(ctx)
}
}

struct StrictOnce {
polled: bool,
}

impl Stream for StrictOnce {
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
assert!(!self.polled, "Polled after completion!");
self.polled = true;
Poll::Ready(None)
}
}

struct Interchanger {
polled: bool,
}

impl Stream for Interchanger {
type Item = S<Box<dyn Stream<Item = ()> + Unpin>>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
if self.polled {
self.polled = false;
ctx.waker().wake_by_ref();
Poll::Pending
} else {
self.polled = true;
Poll::Ready(Some(S(Box::new(StrictOnce { polled: false }))))
}
}
}

// https://github.com/async-rs/async-std/pull/701
#[test]
fn flat_map_doesnt_poll_completed_inner_stream() {
task::block_on(async {
assert_eq!(
Interchanger { polled: false }
.take(2)
.flat_map(identity)
.count()
.await,
0
);
});
}

// https://github.com/async-rs/async-std/pull/701
#[test]
fn flatten_doesnt_poll_completed_inner_stream() {
task::block_on(async {
assert_eq!(
Interchanger { polled: false }
.take(2)
.flatten()
.count()
.await,
0
);
});
}