Skip to content

Commit c8475ca

Browse files
bors[bot]sunjay
andauthored
Merge #207
207: Added the ability to collect a stream of results r=yoshuawuyts a=sunjay As requested here: https://twitter.com/yoshuawuyts/status/1174026374316773377 The standard library has a very useful implementation of `FromIterator` that takes an iterator of `Result<T, E>` values and is able to produce a value of type `Result<Vec<T>, E>`. I asked for this in `async-std` and @yoshuawuyts recommended that I contribute the impl. It turns out that the implementation in the standard library is even more general than I initially thought. It allows any collection that implements `FromIterator` to be collected from an iterator of `Result<T, E>` values. That means that you can collect into `Result<Vec<T>, E>`, `Result<HashSet<T>, E>`, etc. I wanted to add a similarly generic impl for this crate so we can also support collecting into any collection that implements `FromStream`. The implementation for this is based heavily on [what exists in `std`](https://github.com/rust-lang/rust/blob/9150f844e2624eb013ec78ca08c1d416e6644026/src/libcore/result.rs#L1379-L1429). I made a new `result` module since that's where this impl is in `std`. I still wanted to maintain the conventions of this repo, so I copied the `vec` module that @yoshuawuyts created in #125. Much like in that PR, the new `result` module is private. There is a doctest in the documentation for `collect` that both teaches that this feature exists and tests that it works in some simple cases. ## Documentation Screenshot ![image](https://user-images.githubusercontent.com/530939/65075935-de89ae00-d965-11e9-9cd6-8b19b694ed3e.png) Co-authored-by: Sunjay Varma <[email protected]>
2 parents 4f9e7d3 + c87dab2 commit c8475ca

File tree

4 files changed

+73
-0
lines changed

4 files changed

+73
-0
lines changed

Diff for: src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ pub mod io;
4848
pub mod net;
4949
pub mod os;
5050
pub mod prelude;
51+
mod result;
5152
pub mod stream;
5253
pub mod sync;
5354
pub mod task;

Diff for: src/result/from_stream.rs

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use crate::stream::{FromStream, IntoStream, Stream};
2+
3+
use std::pin::Pin;
4+
5+
impl<T: Send, E: Send, V> FromStream<Result<T, E>> for Result<V, E>
6+
where
7+
V: FromStream<T>,
8+
{
9+
/// Takes each element in the stream: if it is an `Err`, no further
10+
/// elements are taken, and the `Err` is returned. Should no `Err`
11+
/// occur, a container with the values of each `Result` is returned.
12+
#[inline]
13+
fn from_stream<'a, S: IntoStream<Item = Result<T, E>>>(
14+
stream: S,
15+
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
16+
where
17+
<S as IntoStream>::IntoStream: Send + 'a,
18+
{
19+
let stream = stream.into_stream();
20+
21+
Pin::from(Box::new(async move {
22+
pin_utils::pin_mut!(stream);
23+
24+
// Using `scan` here because it is able to stop the stream early
25+
// if a failure occurs
26+
let mut found_error = None;
27+
let out: V = stream
28+
.scan((), |_, elem| {
29+
match elem {
30+
Ok(elem) => Some(elem),
31+
Err(err) => {
32+
found_error = Some(err);
33+
// Stop processing the stream on error
34+
None
35+
}
36+
}
37+
})
38+
.collect()
39+
.await;
40+
41+
match found_error {
42+
Some(err) => Err(err),
43+
None => Ok(out),
44+
}
45+
}))
46+
}
47+
}

Diff for: src/result/mod.rs

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
//! The Rust core error handling type
2+
//!
3+
//! This module provides the `Result<T, E>` type for returning and
4+
//! propagating errors.
5+
6+
mod from_stream;
7+
8+
#[doc(inline)]
9+
pub use std::result::Result;

Diff for: src/stream/stream/mod.rs

+16
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,22 @@ pub trait Stream {
726726
/// let buf: Vec<u8> = s.collect().await;
727727
///
728728
/// assert_eq!(buf, vec![9; 3]);
729+
///
730+
/// // You can also collect streams of Result values
731+
/// // into any collection that implements FromStream
732+
/// let s = stream::repeat(Ok(9)).take(3);
733+
/// // We are using Vec here, but other collections
734+
/// // are supported as well
735+
/// let buf: Result<Vec<u8>, ()> = s.collect().await;
736+
///
737+
/// assert_eq!(buf, Ok(vec![9; 3]));
738+
///
739+
/// // The stream will stop on the first Err and
740+
/// // return that instead
741+
/// let s = stream::repeat(Err(5)).take(3);
742+
/// let buf: Result<Vec<u8>, u8> = s.collect().await;
743+
///
744+
/// assert_eq!(buf, Err(5));
729745
/// #
730746
/// # }) }
731747
/// ```

0 commit comments

Comments
 (0)