diff --git a/src/collections/binary_heap/extend.rs b/src/collections/binary_heap/extend.rs index 439bf139e..1a9479e14 100644 --- a/src/collections/binary_heap/extend.rs +++ b/src/collections/binary_heap/extend.rs @@ -4,11 +4,14 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{self, IntoStream}; -impl stream::Extend for BinaryHeap { +impl stream::Extend for BinaryHeap { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); self.reserve(stream.size_hint().0); diff --git a/src/collections/binary_heap/from_stream.rs b/src/collections/binary_heap/from_stream.rs index 6851948e6..614f5f5de 100644 --- a/src/collections/binary_heap/from_stream.rs +++ b/src/collections/binary_heap/from_stream.rs @@ -4,11 +4,14 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{self, FromStream, IntoStream}; -impl FromStream for BinaryHeap { +impl FromStream for BinaryHeap { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/collections/btree_map/extend.rs b/src/collections/btree_map/extend.rs index 19d306ffe..4a1ad0618 100644 --- a/src/collections/btree_map/extend.rs +++ b/src/collections/btree_map/extend.rs @@ -4,11 +4,14 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{self, IntoStream}; -impl stream::Extend<(K, V)> for BTreeMap { +impl stream::Extend<(K, V)> for BTreeMap { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { Box::pin(stream.into_stream().for_each(move |(k, v)| { self.insert(k, v); })) diff --git a/src/collections/btree_map/from_stream.rs b/src/collections/btree_map/from_stream.rs index 853122361..ef01b6e0d 100644 --- a/src/collections/btree_map/from_stream.rs +++ b/src/collections/btree_map/from_stream.rs @@ -4,11 +4,14 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{self, FromStream, IntoStream}; -impl FromStream<(K, V)> for BTreeMap { +impl FromStream<(K, V)> for BTreeMap { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/collections/btree_set/extend.rs b/src/collections/btree_set/extend.rs index 422640b15..0f7421023 100644 --- a/src/collections/btree_set/extend.rs +++ b/src/collections/btree_set/extend.rs @@ -4,11 +4,14 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{self, IntoStream}; -impl stream::Extend for BTreeSet { +impl stream::Extend for BTreeSet { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { Box::pin(stream.into_stream().for_each(move |item| { self.insert(item); })) diff --git a/src/collections/btree_set/from_stream.rs b/src/collections/btree_set/from_stream.rs index 318af9e65..5fdb33e6c 100644 --- a/src/collections/btree_set/from_stream.rs +++ b/src/collections/btree_set/from_stream.rs @@ -4,11 +4,14 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{self, FromStream, IntoStream}; -impl FromStream for BTreeSet { +impl FromStream for BTreeSet { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/collections/hash_map/extend.rs b/src/collections/hash_map/extend.rs index 0f4ce0c6e..15b7e7d46 100644 --- a/src/collections/hash_map/extend.rs +++ b/src/collections/hash_map/extend.rs @@ -7,13 +7,17 @@ use crate::stream::{self, IntoStream}; impl stream::Extend<(K, V)> for HashMap where - K: Eq + Hash, - H: BuildHasher + Default, + K: Eq + Hash + Send, + V: Send, + H: BuildHasher + Default + Send, { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); // The following is adapted from the hashbrown source code: diff --git a/src/collections/hash_map/from_stream.rs b/src/collections/hash_map/from_stream.rs index d74a7ccfa..ae3f11a56 100644 --- a/src/collections/hash_map/from_stream.rs +++ b/src/collections/hash_map/from_stream.rs @@ -7,13 +7,17 @@ use crate::stream::{self, FromStream, IntoStream}; impl FromStream<(K, V)> for HashMap where - K: Eq + Hash, - H: BuildHasher + Default, + K: Eq + Hash + Send, + H: BuildHasher + Default + Send, + V: Send, { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/collections/hash_set/extend.rs b/src/collections/hash_set/extend.rs index ba872b438..0246a1e88 100644 --- a/src/collections/hash_set/extend.rs +++ b/src/collections/hash_set/extend.rs @@ -7,13 +7,16 @@ use crate::stream::{self, IntoStream}; impl stream::Extend for HashSet where - T: Eq + Hash, - H: BuildHasher + Default, + T: Eq + Hash + Send, + H: BuildHasher + Default + Send, { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { // The Extend impl for HashSet in the standard library delegates to the internal HashMap. // Thus, this impl is just a copy of the async Extend impl for HashMap in this crate. diff --git a/src/collections/hash_set/from_stream.rs b/src/collections/hash_set/from_stream.rs index dc5e61e39..6f640695a 100644 --- a/src/collections/hash_set/from_stream.rs +++ b/src/collections/hash_set/from_stream.rs @@ -7,13 +7,16 @@ use crate::stream::{self, FromStream, IntoStream}; impl FromStream for HashSet where - T: Eq + Hash, - H: BuildHasher + Default, + T: Eq + Hash + Send, + H: BuildHasher + Default + Send, { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/collections/linked_list/extend.rs b/src/collections/linked_list/extend.rs index b0dff009d..8adc41d73 100644 --- a/src/collections/linked_list/extend.rs +++ b/src/collections/linked_list/extend.rs @@ -4,11 +4,14 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{self, IntoStream}; -impl stream::Extend for LinkedList { +impl stream::Extend for LinkedList { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(stream.for_each(move |item| self.push_back(item))) } diff --git a/src/collections/linked_list/from_stream.rs b/src/collections/linked_list/from_stream.rs index d93bbb7be..7c3eb738c 100644 --- a/src/collections/linked_list/from_stream.rs +++ b/src/collections/linked_list/from_stream.rs @@ -4,11 +4,14 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{self, FromStream, IntoStream}; -impl FromStream for LinkedList { +impl FromStream for LinkedList { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/collections/vec_deque/extend.rs b/src/collections/vec_deque/extend.rs index dd2ddce3c..9dea92231 100644 --- a/src/collections/vec_deque/extend.rs +++ b/src/collections/vec_deque/extend.rs @@ -4,11 +4,14 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{self, IntoStream}; -impl stream::Extend for VecDeque { +impl stream::Extend for VecDeque { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); self.reserve(stream.size_hint().0); diff --git a/src/collections/vec_deque/from_stream.rs b/src/collections/vec_deque/from_stream.rs index 241bd74e9..3e0719ab2 100644 --- a/src/collections/vec_deque/from_stream.rs +++ b/src/collections/vec_deque/from_stream.rs @@ -4,11 +4,14 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{self, FromStream, IntoStream}; -impl FromStream for VecDeque { +impl FromStream for VecDeque { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index de929ca94..7931d97a6 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -4,7 +4,7 @@ use crate::prelude::*; use crate::stream::{FromStream, IntoStream}; use std::convert::identity; -impl FromStream> for Option +impl FromStream> for Option where V: FromStream, { @@ -14,7 +14,10 @@ where #[inline] fn from_stream<'a, S: IntoStream> + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/path/pathbuf.rs b/src/path/pathbuf.rs index e684df89f..f9370cbab 100644 --- a/src/path/pathbuf.rs +++ b/src/path/pathbuf.rs @@ -323,7 +323,10 @@ impl> stream::Extend

for PathBuf { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { @@ -337,11 +340,14 @@ impl> stream::Extend

for PathBuf { } #[cfg(feature = "unstable")] -impl<'b, P: AsRef + 'b> FromStream

for PathBuf { +impl<'b, P: AsRef + 'b + Send> FromStream

for PathBuf { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 8a8e0eaf3..68fa535fa 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -5,6 +5,8 @@ use crate::stream::{FromStream, IntoStream}; impl FromStream> for Result where + T: Send, + E: Send, V: FromStream, { /// Takes each element in the stream: if it is an `Err`, no further @@ -30,7 +32,10 @@ where #[inline] fn from_stream<'a, S: IntoStream> + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/stream/extend.rs b/src/stream/extend.rs index 702cbcac6..0c7d41049 100644 --- a/src/stream/extend.rs +++ b/src/stream/extend.rs @@ -34,7 +34,9 @@ pub trait Extend { fn extend<'a, T: IntoStream + 'a>( &'a mut self, stream: T, - ) -> Pin + 'a>>; + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send; } /// Extends a collection with the contents of a stream. @@ -69,6 +71,7 @@ pub async fn extend<'a, C, T, S>(collection: &mut C, stream: S) where C: Extend, S: IntoStream + 'a, + ::IntoStream: Send, { Extend::extend(collection, stream).await } diff --git a/src/stream/from_stream.rs b/src/stream/from_stream.rs index 12d89e813..c4001917c 100644 --- a/src/stream/from_stream.rs +++ b/src/stream/from_stream.rs @@ -72,7 +72,10 @@ use crate::stream::IntoStream; /// impl FromStream for MyCollection { /// fn from_stream<'a, S: IntoStream + 'a>( /// stream: S, -/// ) -> Pin + 'a>> { +/// ) -> Pin + 'a + Send>> +/// where +/// ::IntoStream: Send, +/// { /// let stream = stream.into_stream(); /// /// Box::pin(async move { @@ -107,12 +110,12 @@ use crate::stream::IntoStream; /// assert_eq!(c.0, vec![5, 5, 5, 5, 5]); /// # /// # Ok(()) }) } -///``` +/// ``` /// /// [`IntoStream`]: trait.IntoStream.html #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] -pub trait FromStream { +pub trait FromStream { /// Creates a value from a stream. /// /// # Examples @@ -135,5 +138,7 @@ pub trait FromStream { /// ``` fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>>; + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send; } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 4be0eb5f9..4e074a2f3 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1888,10 +1888,11 @@ extension_trait! { #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn collect<'a, B>( self, - ) -> impl Future + 'a [Pin + 'a>>] + ) -> impl Future + 'a [Pin + 'a + Send>>] where - Self: Sized + 'a, + Self: Sized + 'a + Send, B: FromStream, + Self::Item: Send, { FromStream::from_stream(self) } diff --git a/src/string/extend.rs b/src/string/extend.rs index 55bec0c55..eae824cbd 100644 --- a/src/string/extend.rs +++ b/src/string/extend.rs @@ -8,7 +8,10 @@ impl stream::Extend for String { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); self.reserve(stream.size_hint().0); @@ -26,7 +29,10 @@ impl<'b> stream::Extend<&'b char> for String { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { @@ -43,7 +49,10 @@ impl<'b> stream::Extend<&'b str> for String { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { @@ -60,7 +69,10 @@ impl stream::Extend for String { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { @@ -77,7 +89,10 @@ impl<'b> stream::Extend> for String { fn extend<'a, S: IntoStream> + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/string/from_stream.rs b/src/string/from_stream.rs index 375ac3715..d9e824681 100644 --- a/src/string/from_stream.rs +++ b/src/string/from_stream.rs @@ -8,7 +8,10 @@ impl FromStream for String { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { @@ -23,7 +26,10 @@ impl<'b> FromStream<&'b char> for String { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { @@ -38,7 +44,10 @@ impl<'b> FromStream<&'b str> for String { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { @@ -53,7 +62,10 @@ impl FromStream for String { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { @@ -68,7 +80,10 @@ impl<'b> FromStream> for String { #[inline] fn from_stream<'a, S: IntoStream> + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/unit/extend.rs b/src/unit/extend.rs index 55c8e0d08..ffc0c2d9d 100644 --- a/src/unit/extend.rs +++ b/src/unit/extend.rs @@ -4,10 +4,13 @@ use crate::prelude::*; use crate::stream::{self, IntoStream}; impl stream::Extend<()> for () { - fn extend<'a, T: IntoStream + 'a>( + fn extend<'a, S: IntoStream + 'a>( &'a mut self, - stream: T, - ) -> Pin + 'a>> { + stream: S, + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/src/unit/from_stream.rs b/src/unit/from_stream.rs index 7b784383d..e88758323 100644 --- a/src/unit/from_stream.rs +++ b/src/unit/from_stream.rs @@ -7,7 +7,10 @@ impl FromStream<()> for () { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { Box::pin(stream.into_stream().for_each(drop)) } } diff --git a/src/vec/extend.rs b/src/vec/extend.rs index 302fc7a87..717338ab7 100644 --- a/src/vec/extend.rs +++ b/src/vec/extend.rs @@ -3,11 +3,14 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{self, IntoStream}; -impl stream::Extend for Vec { +impl stream::Extend for Vec { fn extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send, + { let stream = stream.into_stream(); self.reserve(stream.size_hint().0); diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs index e88e8202e..95a1f98c9 100644 --- a/src/vec/from_stream.rs +++ b/src/vec/from_stream.rs @@ -6,13 +6,13 @@ use std::sync::Arc; use crate::prelude::*; use crate::stream::{self, FromStream, IntoStream}; -impl FromStream for Vec { +impl FromStream for Vec { #[inline] fn from_stream<'a, S: IntoStream>( stream: S, - ) -> Pin + 'a>> + ) -> Pin + 'a + Send>> where - ::IntoStream: 'a, + ::IntoStream: 'a + Send, { let stream = stream.into_stream(); @@ -24,11 +24,14 @@ impl FromStream for Vec { } } -impl<'b, T: Clone> FromStream for Cow<'b, [T]> { +impl<'b, T: Clone + Send> FromStream for Cow<'b, [T]> { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send + { let stream = stream.into_stream(); Box::pin(async move { @@ -37,11 +40,14 @@ impl<'b, T: Clone> FromStream for Cow<'b, [T]> { } } -impl FromStream for Box<[T]> { +impl FromStream for Box<[T]> { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send + { let stream = stream.into_stream(); Box::pin(async move { @@ -50,11 +56,14 @@ impl FromStream for Box<[T]> { } } -impl FromStream for Rc<[T]> { +impl FromStream for Rc<[T]> { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send + { let stream = stream.into_stream(); Box::pin(async move { @@ -63,11 +72,14 @@ impl FromStream for Rc<[T]> { } } -impl FromStream for Arc<[T]> { +impl FromStream for Arc<[T]> { #[inline] fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + 'a>> { + ) -> Pin + 'a + Send>> + where + ::IntoStream: Send + { let stream = stream.into_stream(); Box::pin(async move { diff --git a/tests/collect.rs b/tests/collect.rs new file mode 100644 index 000000000..d24484f4e --- /dev/null +++ b/tests/collect.rs @@ -0,0 +1,20 @@ +#[cfg(feature = "unstable")] +#[test] +fn test_send() -> async_std::io::Result<()> { + use async_std::prelude::*; + use async_std::{stream, task}; + + task::block_on(async { + fn test_send_trait(_: &T) {} + + let stream = stream::repeat(1u8).take(10); + test_send_trait(&stream); + + let fut = stream.collect::>(); + + // This line triggers a compilation error + test_send_trait(&fut); + + Ok(()) + }) +}