Skip to content

Commit a3fde16

Browse files
committed
init limit functionality
Updates the traits to support limiting
1 parent db896c0 commit a3fde16

File tree

8 files changed

+70
-15
lines changed

8 files changed

+70
-15
lines changed

src/from_stream.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ where
4343
this.stream.poll_next(cx)
4444
}
4545

46-
fn set_limit(mut self, limit: impl Into<Option<usize>>) -> Self {
46+
fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
4747
self.limit = limit.into();
4848
self
4949
}
5050

51-
fn limit(&self) -> Option<usize> {
51+
fn get_limit(&self) -> Option<usize> {
5252
self.limit
5353
}
5454
}

src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,5 @@ pub use par_stream::{ForEach, Map, NextFuture, ParallelStream, Take};
5656

5757
pub mod prelude;
5858
pub mod vec;
59+
60+
pub(crate) mod utils;

src/par_stream/for_each.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ pin_project_lite::pin_project! {
1919
exhausted: Arc<AtomicBool>,
2020
// Count how many tasks are executing.
2121
ref_count: Arc<AtomicU64>,
22-
// Max concurrency limit.
23-
limit: Option<usize>,
2422
}
2523
}
2624

@@ -35,13 +33,13 @@ impl ForEach {
3533
let exhausted = Arc::new(AtomicBool::new(false));
3634
let ref_count = Arc::new(AtomicU64::new(0));
3735
let (sender, receiver): (Sender<()>, Receiver<()>) = sync::channel(1);
36+
let _limit = stream.get_limit();
3837

3938
// Initialize the return type here to prevent borrowing issues.
4039
let this = Self {
4140
receiver,
4241
exhausted: exhausted.clone(),
4342
ref_count: ref_count.clone(),
44-
limit: stream.limit(),
4543
};
4644

4745
task::spawn(async move {

src/par_stream/map.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ impl<T: Send + 'static> Map<T> {
2727
Fut: Future<Output = T> + Send,
2828
{
2929
let (sender, receiver) = sync::channel(1);
30-
let limit = stream.limit();
30+
let limit = stream.get_limit();
3131
task::spawn(async move {
3232
while let Some(item) = stream.next().await {
3333
let sender = sender.clone();
@@ -49,12 +49,12 @@ impl<T: Send + 'static> ParallelStream for Map<T> {
4949
this.receiver.poll_next(cx)
5050
}
5151

52-
fn set_limit(mut self, limit: impl Into<Option<usize>>) -> Self {
52+
fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
5353
self.limit = limit.into();
5454
self
5555
}
5656

57-
fn limit(&self) -> Option<usize> {
57+
fn get_limit(&self) -> Option<usize> {
5858
self.limit
5959
}
6060
}

src/par_stream/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {
2222
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
2323

2424
/// Set a max concurrency limit
25-
fn set_limit(self, limit: impl Into<Option<usize>>) -> Self;
25+
fn limit(self, limit: impl Into<Option<usize>>) -> Self;
2626

2727
/// Get the max concurrency limit
28-
fn limit(&self) -> Option<usize>;
28+
fn get_limit(&self) -> Option<usize>;
2929

3030
/// Applies `f` to each item of this stream in parallel, producing a new
3131
/// stream with the results.

src/par_stream/take.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pin_project! {
2626
impl<S: ParallelStream> Take<S> {
2727
pub(super) fn new(stream: S, remaining: usize) -> Self {
2828
Self {
29-
limit: stream.limit(),
29+
limit: stream.get_limit(),
3030
remaining,
3131
stream,
3232
}
@@ -50,12 +50,12 @@ impl<S: ParallelStream> ParallelStream for Take<S> {
5050
}
5151
}
5252

53-
fn set_limit(mut self, limit: impl Into<Option<usize>>) -> Self {
53+
fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
5454
self.limit = limit.into();
5555
self
5656
}
5757

58-
fn limit(&self) -> Option<usize> {
58+
fn get_limit(&self) -> Option<usize> {
5959
self.limit
6060
}
6161
}

src/utils.rs

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// use core::pin::Pin;
2+
// use core::task::{Context, Poll};
3+
4+
// use std::sync::atomic::{AtomicUsize, Ordering};
5+
// use std::sync::Arc;
6+
7+
// use async_std::stream::Stream;
8+
9+
// /// A stream that has a max concurrency of N.
10+
// pub(crate) struct LimitStream {
11+
// limit: Option<usize>,
12+
// ref_count: Arc<AtomicUsize>,
13+
// }
14+
15+
// impl LimitStream {
16+
// /// Create a new instance of LimitStream.
17+
// pub(crate) fn new(limit: Option<usize>) -> Self {
18+
// Self {
19+
// limit,
20+
// ref_count: Arc::new(AtomicUsize::new(0)),
21+
// }
22+
// }
23+
// }
24+
25+
// #[derive(Debug)]
26+
// pub(crate) struct Guard {
27+
// limit: Option<usize>,
28+
// ref_count: Arc<AtomicUsize>,
29+
// }
30+
31+
// impl Guard {
32+
// fn new(limit: Option<usize>, ref_count: Arc<AtomicUsize>) -> Self {
33+
// Self { limit, ref_count }
34+
// }
35+
// }
36+
37+
// impl Drop for Guard {
38+
// fn drop(&mut self) {
39+
// if self.limit.is_some() {
40+
// self.ref_count.fetch_sub(1, Ordering::SeqCst);
41+
// }
42+
// }
43+
// }
44+
45+
// impl Stream for LimitStream {
46+
// type Item = Guard;
47+
48+
// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49+
// if self.limit.is_none() {
50+
// let guard = Guard::new(self.limit, self.ref_count.clone());
51+
// return Poll::Ready(Some(guard));
52+
// }
53+
// todo!();
54+
// }
55+
// }

src/vec.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ impl<T: Send + Sync + 'static> ParallelStream for IntoParStream<T> {
2828
this.stream.poll_next(cx)
2929
}
3030

31-
fn set_limit(mut self, limit: impl Into<Option<usize>>) -> Self {
31+
fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
3232
self.limit = limit.into();
3333
self
3434
}
3535

36-
fn limit(&self) -> Option<usize> {
36+
fn get_limit(&self) -> Option<usize> {
3737
self.limit
3838
}
3939
}

0 commit comments

Comments
 (0)