Skip to content

Commit 6c1b5eb

Browse files
authored
Merge pull request #667 from olegnn/option_take_while
Use `take_while` instead of `scan` in `impl` of `Product`, `Sum` and `FromStream` for `Option` and `Result`
2 parents beb8d24 + ed7ddac commit 6c1b5eb

File tree

7 files changed

+113
-82
lines changed

7 files changed

+113
-82
lines changed

Diff for: src/option/from_stream.rs

+10-11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::pin::Pin;
22

33
use crate::prelude::*;
44
use crate::stream::{FromStream, IntoStream};
5+
use std::convert::identity;
56

67
impl<T, V> FromStream<Option<T>> for Option<V>
78
where
@@ -17,24 +18,22 @@ where
1718
let stream = stream.into_stream();
1819

1920
Box::pin(async move {
20-
// Using `scan` here because it is able to stop the stream early
21+
// Using `take_while` here because it is able to stop the stream early
2122
// if a failure occurs
22-
let mut found_error = false;
23+
let mut found_none = false;
2324
let out: V = stream
24-
.scan((), |_, elem| {
25-
match elem {
26-
Some(elem) => Some(elem),
27-
None => {
28-
found_error = true;
29-
// Stop processing the stream on error
30-
None
31-
}
25+
.take_while(|elem| {
26+
elem.is_some() || {
27+
found_none = true;
28+
// Stop processing the stream on `None`
29+
false
3230
}
3331
})
32+
.filter_map(identity)
3433
.collect()
3534
.await;
3635

37-
if found_error { None } else { Some(out) }
36+
if found_none { None } else { Some(out) }
3837
})
3938
}
4039
}

Diff for: src/option/product.rs

+16-17
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::pin::Pin;
22

33
use crate::prelude::*;
4-
use crate::stream::{Stream, Product};
4+
use crate::stream::{Product, Stream};
5+
use std::convert::identity;
56

67
impl<T, U> Product<Option<U>> for Option<T>
78
where
@@ -36,29 +37,27 @@ where
3637
```
3738
"#]
3839
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
39-
where S: Stream<Item = Option<U>> + 'a
40+
where
41+
S: Stream<Item = Option<U>> + 'a,
4042
{
4143
Box::pin(async move {
42-
// Using `scan` here because it is able to stop the stream early
44+
// Using `take_while` here because it is able to stop the stream early
4345
// if a failure occurs
4446
let mut found_none = false;
45-
let out = <T as Product<U>>::product(stream
46-
.scan((), |_, elem| {
47-
match elem {
48-
Some(elem) => Some(elem),
49-
None => {
47+
let out = <T as Product<U>>::product(
48+
stream
49+
.take_while(|elem| {
50+
elem.is_some() || {
5051
found_none = true;
51-
// Stop processing the stream on error
52-
None
52+
// Stop processing the stream on `None`
53+
false
5354
}
54-
}
55-
})).await;
55+
})
56+
.filter_map(identity),
57+
)
58+
.await;
5659

57-
if found_none {
58-
None
59-
} else {
60-
Some(out)
61-
}
60+
if found_none { None } else { Some(out) }
6261
})
6362
}
6463
}

Diff for: src/option/sum.rs

+15-16
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::pin::Pin;
22

33
use crate::prelude::*;
44
use crate::stream::{Stream, Sum};
5+
use std::convert::identity;
56

67
impl<T, U> Sum<Option<U>> for Option<T>
78
where
@@ -31,29 +32,27 @@ where
3132
```
3233
"#]
3334
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
34-
where S: Stream<Item = Option<U>> + 'a
35+
where
36+
S: Stream<Item = Option<U>> + 'a,
3537
{
3638
Box::pin(async move {
37-
// Using `scan` here because it is able to stop the stream early
39+
// Using `take_while` here because it is able to stop the stream early
3840
// if a failure occurs
3941
let mut found_none = false;
40-
let out = <T as Sum<U>>::sum(stream
41-
.scan((), |_, elem| {
42-
match elem {
43-
Some(elem) => Some(elem),
44-
None => {
42+
let out = <T as Sum<U>>::sum(
43+
stream
44+
.take_while(|elem| {
45+
elem.is_some() || {
4546
found_none = true;
46-
// Stop processing the stream on error
47-
None
47+
// Stop processing the stream on `None`
48+
false
4849
}
49-
}
50-
})).await;
50+
})
51+
.filter_map(identity),
52+
)
53+
.await;
5154

52-
if found_none {
53-
None
54-
} else {
55-
Some(out)
56-
}
55+
if found_none { None } else { Some(out) }
5756
})
5857
}
5958
}

Diff for: src/result/from_stream.rs

+20-12
Original file line numberDiff line numberDiff line change
@@ -34,26 +34,34 @@ where
3434
let stream = stream.into_stream();
3535

3636
Box::pin(async move {
37-
// Using `scan` here because it is able to stop the stream early
37+
// Using `take_while` here because it is able to stop the stream early
3838
// if a failure occurs
39+
let mut is_error = false;
3940
let mut found_error = None;
4041
let out: V = stream
41-
.scan((), |_, elem| {
42-
match elem {
43-
Ok(elem) => Some(elem),
44-
Err(err) => {
45-
found_error = Some(err);
46-
// Stop processing the stream on error
47-
None
48-
}
42+
.take_while(|elem| {
43+
// Stop processing the stream on `Err`
44+
!is_error
45+
&& (elem.is_ok() || {
46+
is_error = true;
47+
// Capture first `Err`
48+
true
49+
})
50+
})
51+
.filter_map(|elem| match elem {
52+
Ok(value) => Some(value),
53+
Err(err) => {
54+
found_error = Some(err);
55+
None
4956
}
5057
})
5158
.collect()
5259
.await;
5360

54-
match found_error {
55-
Some(err) => Err(err),
56-
None => Ok(out),
61+
if is_error {
62+
Err(found_error.unwrap())
63+
} else {
64+
Ok(out)
5765
}
5866
})
5967
}

Diff for: src/result/product.rs

+26-13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::pin::Pin;
22

33
use crate::prelude::*;
4-
use crate::stream::{Stream, Product};
4+
use crate::stream::{Product, Stream};
55

66
impl<T, U, E> Product<Result<U, E>> for Result<T, E>
77
where
@@ -36,26 +36,39 @@ where
3636
```
3737
"#]
3838
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
39-
where S: Stream<Item = Result<U, E>> + 'a
39+
where
40+
S: Stream<Item = Result<U, E>> + 'a,
4041
{
4142
Box::pin(async move {
42-
// Using `scan` here because it is able to stop the stream early
43+
// Using `take_while` here because it is able to stop the stream early
4344
// if a failure occurs
45+
let mut is_error = false;
4446
let mut found_error = None;
45-
let out = <T as Product<U>>::product(stream
46-
.scan((), |_, elem| {
47-
match elem {
48-
Ok(elem) => Some(elem),
47+
let out = <T as Product<U>>::product(
48+
stream
49+
.take_while(|elem| {
50+
// Stop processing the stream on `Err`
51+
!is_error
52+
&& (elem.is_ok() || {
53+
is_error = true;
54+
// Capture first `Err`
55+
true
56+
})
57+
})
58+
.filter_map(|elem| match elem {
59+
Ok(value) => Some(value),
4960
Err(err) => {
5061
found_error = Some(err);
51-
// Stop processing the stream on error
5262
None
5363
}
54-
}
55-
})).await;
56-
match found_error {
57-
Some(err) => Err(err),
58-
None => Ok(out)
64+
}),
65+
)
66+
.await;
67+
68+
if is_error {
69+
Err(found_error.unwrap())
70+
} else {
71+
Ok(out)
5972
}
6073
})
6174
}

Diff for: src/result/sum.rs

+25-12
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,39 @@ where
3636
```
3737
"#]
3838
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
39-
where S: Stream<Item = Result<U, E>> + 'a
39+
where
40+
S: Stream<Item = Result<U, E>> + 'a,
4041
{
4142
Box::pin(async move {
42-
// Using `scan` here because it is able to stop the stream early
43+
// Using `take_while` here because it is able to stop the stream early
4344
// if a failure occurs
45+
let mut is_error = false;
4446
let mut found_error = None;
45-
let out = <T as Sum<U>>::sum(stream
46-
.scan((), |_, elem| {
47-
match elem {
48-
Ok(elem) => Some(elem),
47+
let out = <T as Sum<U>>::sum(
48+
stream
49+
.take_while(|elem| {
50+
// Stop processing the stream on `Err`
51+
!is_error
52+
&& (elem.is_ok() || {
53+
is_error = true;
54+
// Capture first `Err`
55+
true
56+
})
57+
})
58+
.filter_map(|elem| match elem {
59+
Ok(value) => Some(value),
4960
Err(err) => {
5061
found_error = Some(err);
51-
// Stop processing the stream on error
5262
None
5363
}
54-
}
55-
})).await;
56-
match found_error {
57-
Some(err) => Err(err),
58-
None => Ok(out)
64+
}),
65+
)
66+
.await;
67+
68+
if is_error {
69+
Err(found_error.unwrap())
70+
} else {
71+
Ok(out)
5972
}
6073
})
6174
}

Diff for: src/unit/from_stream.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ impl FromStream<()> for () {
88
fn from_stream<'a, S: IntoStream<Item = ()> + 'a>(
99
stream: S,
1010
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
11-
Box::pin(stream.into_stream().for_each(|_| ()))
11+
Box::pin(stream.into_stream().for_each(drop))
1212
}
1313
}

0 commit comments

Comments
 (0)