Skip to content

Commit a0c9442

Browse files
bors[bot]montekki
andauthored
Merge #166
166: adds stream::nth combinator r=yoshuawuyts a=montekki Implements `nth` combinator. --- Stdlib: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.nth Ref: #129 Co-authored-by: Fedor Sakharov <[email protected]>
2 parents c3f6a51 + 43b7523 commit a0c9442

File tree

2 files changed

+101
-0
lines changed

2 files changed

+101
-0
lines changed

src/stream/stream/mod.rs

+60
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ mod all;
2525
mod any;
2626
mod min_by;
2727
mod next;
28+
mod nth;
2829
mod take;
2930

3031
pub use take::Take;
@@ -33,6 +34,7 @@ use all::AllFuture;
3334
use any::AnyFuture;
3435
use min_by::MinByFuture;
3536
use next::NextFuture;
37+
use nth::NthFuture;
3638

3739
use std::cmp::Ordering;
3840
use std::marker::PhantomData;
@@ -161,6 +163,64 @@ pub trait Stream {
161163
MinByFuture::new(self, compare)
162164
}
163165

166+
/// Returns the nth element of the stream.
167+
///
168+
/// # Examples
169+
///
170+
/// Basic usage:
171+
///
172+
/// ```
173+
/// # fn main() { async_std::task::block_on(async {
174+
/// #
175+
/// use std::collections::VecDeque;
176+
/// use async_std::stream::Stream;
177+
///
178+
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
179+
///
180+
/// let second = s.nth(1).await;
181+
/// assert_eq!(second, Some(2));
182+
/// #
183+
/// # }) }
184+
/// ```
185+
/// Calling `nth()` multiple times:
186+
///
187+
/// ```
188+
/// # fn main() { async_std::task::block_on(async {
189+
/// #
190+
/// use std::collections::VecDeque;
191+
/// use async_std::stream::Stream;
192+
///
193+
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
194+
///
195+
/// let second = s.nth(0).await;
196+
/// assert_eq!(second, Some(1));
197+
///
198+
/// let second = s.nth(0).await;
199+
/// assert_eq!(second, Some(2));
200+
/// #
201+
/// # }) }
202+
/// ```
203+
/// Returning `None` if the stream finished before returning `n` elements:
204+
/// ```
205+
/// # fn main() { async_std::task::block_on(async {
206+
/// #
207+
/// use std::collections::VecDeque;
208+
/// use async_std::stream::Stream;
209+
///
210+
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
211+
///
212+
/// let fourth = s.nth(4).await;
213+
/// assert_eq!(fourth, None);
214+
/// #
215+
/// # }) }
216+
/// ```
217+
fn nth(&mut self, n: usize) -> ret!('_, NthFuture, Option<Self::Item>)
218+
where
219+
Self: Sized,
220+
{
221+
NthFuture::new(self, n)
222+
}
223+
164224
/// Tests if every element of the stream matches a predicate.
165225
///
166226
/// `all()` takes a closure that returns `true` or `false`. It applies

src/stream/stream/nth.rs

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use std::pin::Pin;
2+
use std::task::{Context, Poll};
3+
4+
#[allow(missing_debug_implementations)]
5+
pub struct NthFuture<'a, S> {
6+
stream: &'a mut S,
7+
n: usize,
8+
}
9+
10+
impl<'a, S> NthFuture<'a, S> {
11+
pin_utils::unsafe_pinned!(stream: &'a mut S);
12+
pin_utils::unsafe_unpinned!(n: usize);
13+
14+
pub(crate) fn new(stream: &'a mut S, n: usize) -> Self {
15+
NthFuture { stream, n }
16+
}
17+
}
18+
19+
impl<'a, S> futures_core::future::Future for NthFuture<'a, S>
20+
where
21+
S: futures_core::stream::Stream + Unpin + Sized,
22+
{
23+
type Output = Option<S::Item>;
24+
25+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
26+
use futures_core::stream::Stream;
27+
28+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
29+
match next {
30+
Some(v) => match self.n {
31+
0 => Poll::Ready(Some(v)),
32+
_ => {
33+
*self.as_mut().n() -= 1;
34+
cx.waker().wake_by_ref();
35+
Poll::Pending
36+
}
37+
},
38+
None => Poll::Ready(None),
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)