Skip to content

Commit 65dcaf4

Browse files
authored
Merge pull request #426 from yjhmelody/stream-eq
Add stream eq
2 parents cc949f4 + 1ab3d90 commit 65dcaf4

File tree

2 files changed

+101
-0
lines changed

2 files changed

+101
-0
lines changed

src/stream/stream/eq.rs

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use std::pin::Pin;
2+
3+
use pin_project_lite::pin_project;
4+
5+
use super::fuse::Fuse;
6+
use crate::future::Future;
7+
use crate::prelude::*;
8+
use crate::stream::Stream;
9+
use crate::task::{Context, Poll};
10+
11+
pin_project! {
12+
// Lexicographically compares the elements of this `Stream` with those
13+
// of another.
14+
#[doc(hidden)]
15+
#[allow(missing_debug_implementations)]
16+
pub struct EqFuture<L: Stream, R: Stream> {
17+
#[pin]
18+
l: Fuse<L>,
19+
#[pin]
20+
r: Fuse<R>,
21+
}
22+
}
23+
24+
impl<L: Stream, R: Stream> EqFuture<L, R>
25+
where
26+
L::Item: PartialEq<R::Item>,
27+
{
28+
pub(super) fn new(l: L, r: R) -> Self {
29+
EqFuture {
30+
l: l.fuse(),
31+
r: r.fuse(),
32+
}
33+
}
34+
}
35+
36+
impl<L: Stream, R: Stream> Future for EqFuture<L, R>
37+
where
38+
L: Stream + Sized,
39+
R: Stream + Sized,
40+
L::Item: PartialEq<R::Item>,
41+
{
42+
type Output = bool;
43+
44+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45+
let mut this = self.project();
46+
47+
loop {
48+
let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx));
49+
let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx));
50+
51+
if this.l.done && this.r.done {
52+
return Poll::Ready(true);
53+
}
54+
55+
match (l_val, r_val) {
56+
(Some(l), Some(r)) if l != r => {
57+
return Poll::Ready(false);
58+
}
59+
_ => {}
60+
}
61+
}
62+
}
63+
}

src/stream/stream/mod.rs

+38
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod any;
2626
mod chain;
2727
mod cmp;
2828
mod enumerate;
29+
mod eq;
2930
mod filter;
3031
mod filter_map;
3132
mod find;
@@ -60,6 +61,7 @@ use all::AllFuture;
6061
use any::AnyFuture;
6162
use cmp::CmpFuture;
6263
use enumerate::Enumerate;
64+
use eq::EqFuture;
6365
use filter_map::FilterMap;
6466
use find::FindFuture;
6567
use find_map::FindMapFuture;
@@ -1622,6 +1624,42 @@ extension_trait! {
16221624
GeFuture::new(self, other)
16231625
}
16241626

1627+
#[doc = r#"
1628+
Determines if the elements of this `Stream` are lexicographically
1629+
equal to those of another.
1630+
1631+
# Examples
1632+
1633+
```
1634+
# fn main() { async_std::task::block_on(async {
1635+
#
1636+
use async_std::prelude::*;
1637+
use std::collections::VecDeque;
1638+
1639+
let single: VecDeque<isize> = vec![1].into_iter().collect();
1640+
let single_eq: VecDeque<isize> = vec![10].into_iter().collect();
1641+
let multi: VecDeque<isize> = vec![1,2].into_iter().collect();
1642+
let multi_eq: VecDeque<isize> = vec![1,5].into_iter().collect();
1643+
assert_eq!(single.clone().eq(single.clone()).await, true);
1644+
assert_eq!(single_eq.clone().eq(single.clone()).await, false);
1645+
assert_eq!(multi.clone().eq(single_eq.clone()).await, false);
1646+
assert_eq!(multi_eq.clone().eq(multi.clone()).await, false);
1647+
#
1648+
# }) }
1649+
```
1650+
"#]
1651+
fn eq<S>(
1652+
self,
1653+
other: S
1654+
) -> impl Future<Output = bool> [EqFuture<Self, S>]
1655+
where
1656+
Self: Sized + Stream,
1657+
S: Sized + Stream,
1658+
<Self as Stream>::Item: PartialEq<S::Item>,
1659+
{
1660+
EqFuture::new(self, other)
1661+
}
1662+
16251663
#[doc = r#"
16261664
Determines if the elements of this `Stream` are lexicographically
16271665
greater than those of another.

0 commit comments

Comments
 (0)