Skip to content

Commit db896c0

Browse files
committed
Init get/set limit methods
1 parent e53eed6 commit db896c0

File tree

6 files changed

+63
-7
lines changed

6 files changed

+63
-7
lines changed

src/from_stream.rs

+11
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pin_project! {
1717
pub struct FromStream<S> {
1818
#[pin]
1919
stream: S,
20+
limit: Option<usize>,
2021
}
2122
}
2223

@@ -26,6 +27,7 @@ where
2627
S: Send + Sync,
2728
{
2829
FromStream {
30+
limit: None,
2931
stream: stream.into_stream(),
3032
}
3133
}
@@ -40,4 +42,13 @@ where
4042
let this = self.project();
4143
this.stream.poll_next(cx)
4244
}
45+
46+
fn set_limit(mut self, limit: impl Into<Option<usize>>) -> Self {
47+
self.limit = limit.into();
48+
self
49+
}
50+
51+
fn limit(&self) -> Option<usize> {
52+
self.limit
53+
}
4354
}

src/par_stream/for_each.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ pin_project_lite::pin_project! {
1919
exhausted: Arc<AtomicBool>,
2020
// Count how many tasks are executing.
2121
ref_count: Arc<AtomicU64>,
22+
// Max concurrency limit.
23+
limit: Option<usize>,
2224
}
2325
}
2426

2527
impl ForEach {
2628
/// Create a new instance of `ForEach`.
27-
pub fn new<S, F, Fut>(mut input: S, mut f: F) -> Self
29+
pub fn new<S, F, Fut>(mut stream: S, mut f: F) -> Self
2830
where
2931
S: ParallelStream,
3032
F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static,
@@ -39,10 +41,11 @@ impl ForEach {
3941
receiver,
4042
exhausted: exhausted.clone(),
4143
ref_count: ref_count.clone(),
44+
limit: stream.limit(),
4245
};
4346

4447
task::spawn(async move {
45-
while let Some(item) = input.next().await {
48+
while let Some(item) = stream.next().await {
4649
let sender = sender.clone();
4750
let exhausted = exhausted.clone();
4851
let ref_count = ref_count.clone();

src/par_stream/map.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pin_project_lite::pin_project! {
1414
pub struct Map<T> {
1515
#[pin]
1616
receiver: Receiver<T>,
17+
limit: Option<usize>,
1718
}
1819
}
1920

@@ -26,6 +27,7 @@ impl<T: Send + 'static> Map<T> {
2627
Fut: Future<Output = T> + Send,
2728
{
2829
let (sender, receiver) = sync::channel(1);
30+
let limit = stream.limit();
2931
task::spawn(async move {
3032
while let Some(item) = stream.next().await {
3133
let sender = sender.clone();
@@ -35,7 +37,7 @@ impl<T: Send + 'static> Map<T> {
3537
});
3638
}
3739
});
38-
Map { receiver }
40+
Map { receiver, limit }
3941
}
4042
}
4143

@@ -46,6 +48,15 @@ impl<T: Send + 'static> ParallelStream for Map<T> {
4648
let this = self.project();
4749
this.receiver.poll_next(cx)
4850
}
51+
52+
fn set_limit(mut self, limit: impl Into<Option<usize>>) -> Self {
53+
self.limit = limit.into();
54+
self
55+
}
56+
57+
fn limit(&self) -> Option<usize> {
58+
self.limit
59+
}
4960
}
5061

5162
#[async_std::test]

src/par_stream/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {
2121
/// Attempts to receive the next item from the stream.
2222
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
2323

24+
/// Set a max concurrency limit
25+
fn set_limit(self, limit: impl Into<Option<usize>>) -> Self;
26+
27+
/// Get the max concurrency limit
28+
fn limit(&self) -> Option<usize>;
29+
2430
/// Applies `f` to each item of this stream in parallel, producing a new
2531
/// stream with the results.
2632
fn map<F, T, Fut>(self, f: F) -> Map<T>

src/par_stream/take.rs

+18-4
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@ pin_project! {
1717
#[derive(Clone, Debug)]
1818
pub struct Take<S> {
1919
#[pin]
20-
pub(crate) stream: S,
21-
pub(crate) remaining: usize,
20+
stream: S,
21+
remaining: usize,
22+
limit: Option<usize>,
2223
}
2324
}
2425

25-
impl<S> Take<S> {
26+
impl<S: ParallelStream> Take<S> {
2627
pub(super) fn new(stream: S, remaining: usize) -> Self {
27-
Self { stream, remaining }
28+
Self {
29+
limit: stream.limit(),
30+
remaining,
31+
stream,
32+
}
2833
}
2934
}
3035

@@ -44,4 +49,13 @@ impl<S: ParallelStream> ParallelStream for Take<S> {
4449
Poll::Ready(next)
4550
}
4651
}
52+
53+
fn set_limit(mut self, limit: impl Into<Option<usize>>) -> Self {
54+
self.limit = limit.into();
55+
self
56+
}
57+
58+
fn limit(&self) -> Option<usize> {
59+
self.limit
60+
}
4761
}

src/vec.rs

+11
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pin_project_lite::pin_project! {
1717
pub struct IntoParStream<T> {
1818
#[pin]
1919
stream: FromStream<FromIter<vec::IntoIter<T>>>,
20+
limit: Option<usize>,
2021
}
2122
}
2223

@@ -26,6 +27,15 @@ impl<T: Send + Sync + 'static> ParallelStream for IntoParStream<T> {
2627
let this = self.project();
2728
this.stream.poll_next(cx)
2829
}
30+
31+
fn set_limit(mut self, limit: impl Into<Option<usize>>) -> Self {
32+
self.limit = limit.into();
33+
self
34+
}
35+
36+
fn limit(&self) -> Option<usize> {
37+
self.limit
38+
}
2939
}
3040

3141
impl<T: Send + Sync + 'static> IntoParallelStream for Vec<T> {
@@ -36,6 +46,7 @@ impl<T: Send + Sync + 'static> IntoParallelStream for Vec<T> {
3646
fn into_par_stream(self) -> Self::IntoParStream {
3747
IntoParStream {
3848
stream: from_stream(from_iter(self)),
49+
limit: None,
3950
}
4051
}
4152
}

0 commit comments

Comments
 (0)