Skip to content

Commit 7213cdc

Browse files
authored
Merge branch 'master' into pin-project-lite
2 parents 7d6ce00 + cb52c5e commit 7213cdc

File tree

6 files changed

+188
-47
lines changed

6 files changed

+188
-47
lines changed

.github/workflows/ci.yml

+4-3
Original file line numberDiff line numberDiff line change
@@ -192,22 +192,23 @@ jobs:
192192
- run: cargo bench --manifest-path futures-util/Cargo.toml --features=bilock,unstable
193193

194194
features:
195-
name: cargo hack check --each-feature
195+
name: cargo hack check --feature-powerset
196196
runs-on: ubuntu-latest
197197
steps:
198198
- uses: actions/checkout@v2
199199
- name: Install Rust
200200
run: rustup update nightly && rustup default nightly
201201
- run: cargo install cargo-hack
202202
# Check each specified feature works properly
203-
# * `--each-feature` - run for each feature which includes --no-default-features and default features of package
203+
# * `--feature-powerset` - run for the feature powerset of the package
204+
# * `--depth 2` - limit the max number of simultaneous feature flags of `--feature-powerset`
204205
# * `--no-dev-deps` - build without dev-dependencies to avoid https://github.com/rust-lang/cargo/issues/4866
205206
# * `--exclude futures-test` - futures-test cannot be compiled with no-default features
206207
# * `--features unstable` - some features cannot be compiled without this feature
207208
# * `--ignore-unknown-features` - some crates doesn't have 'unstable' feature
208209
- run: |
209210
cargo hack check \
210-
--each-feature --no-dev-deps \
211+
--feature-powerset --depth 2 --no-dev-deps \
211212
--workspace --exclude futures-test \
212213
--features unstable --ignore-unknown-features
213214

futures-util/src/future/future/remote_handle.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,11 @@ impl<Fut: Future> Future for Remote<Fut> {
9696
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
9797
let this = self.project();
9898

99-
if let Poll::Ready(_) = this.tx.as_mut().unwrap().poll_canceled(cx) {
100-
if !this.keep_running.load(Ordering::SeqCst) {
101-
// Cancelled, bail out
102-
return Poll::Ready(())
103-
}
99+
if this.tx.as_mut().unwrap().poll_canceled(cx).is_ready()
100+
&& !this.keep_running.load(Ordering::SeqCst)
101+
{
102+
// Cancelled, bail out
103+
return Poll::Ready(());
104104
}
105105

106106
let output = ready!(this.future.poll(cx));

futures-util/src/sink/mod.rs

+52-36
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
//! This module is only available when the `sink` feature of this
77
//! library is activated, and it is activated by default.
88
9+
use crate::future::Either;
910
use core::pin::Pin;
1011
use futures_core::future::Future;
1112
use futures_core::stream::{Stream, TryStream};
1213
use futures_core::task::{Context, Poll};
13-
use crate::future::Either;
1414

1515
#[cfg(feature = "compat")]
1616
use crate::compat::CompatSink;
@@ -41,6 +41,9 @@ pub use self::send::Send;
4141
mod send_all;
4242
pub use self::send_all::SendAll;
4343

44+
mod unfold;
45+
pub use self::unfold::{unfold, Unfold};
46+
4447
mod with;
4548
pub use self::with::With;
4649

@@ -69,10 +72,11 @@ pub trait SinkExt<Item>: Sink<Item> {
6972
/// Note that this function consumes the given sink, returning a wrapped
7073
/// version, much like `Iterator::map`.
7174
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
72-
where F: FnMut(U) -> Fut,
73-
Fut: Future<Output = Result<Item, E>>,
74-
E: From<Self::Error>,
75-
Self: Sized
75+
where
76+
F: FnMut(U) -> Fut,
77+
Fut: Future<Output = Result<Item, E>>,
78+
E: From<Self::Error>,
79+
Self: Sized,
7680
{
7781
With::new(self, f)
7882
}
@@ -110,9 +114,10 @@ pub trait SinkExt<Item>: Sink<Item> {
110114
/// # });
111115
/// ```
112116
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
113-
where F: FnMut(U) -> St,
114-
St: Stream<Item = Result<Item, Self::Error>>,
115-
Self: Sized
117+
where
118+
F: FnMut(U) -> St,
119+
St: Stream<Item = Result<Item, Self::Error>>,
120+
Self: Sized,
116121
{
117122
WithFlatMap::new(self, f)
118123
}
@@ -133,8 +138,9 @@ pub trait SinkExt<Item>: Sink<Item> {
133138

134139
/// Transforms the error returned by the sink.
135140
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
136-
where F: FnOnce(Self::Error) -> E,
137-
Self: Sized,
141+
where
142+
F: FnOnce(Self::Error) -> E,
143+
Self: Sized,
138144
{
139145
SinkMapErr::new(self, f)
140146
}
@@ -143,13 +149,13 @@ pub trait SinkExt<Item>: Sink<Item> {
143149
///
144150
/// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
145151
fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
146-
where Self: Sized,
147-
Self::Error: Into<E>,
152+
where
153+
Self: Sized,
154+
Self::Error: Into<E>,
148155
{
149156
SinkErrInto::new(self)
150157
}
151158

152-
153159
/// Adds a fixed-size buffer to the current sink.
154160
///
155161
/// The resulting sink will buffer up to `capacity` items when the
@@ -164,14 +170,16 @@ pub trait SinkExt<Item>: Sink<Item> {
164170
/// library is activated, and it is activated by default.
165171
#[cfg(feature = "alloc")]
166172
fn buffer(self, capacity: usize) -> Buffer<Self, Item>
167-
where Self: Sized,
173+
where
174+
Self: Sized,
168175
{
169176
Buffer::new(self, capacity)
170177
}
171178

172179
/// Close the sink.
173180
fn close(&mut self) -> Close<'_, Self, Item>
174-
where Self: Unpin,
181+
where
182+
Self: Unpin,
175183
{
176184
Close::new(self)
177185
}
@@ -181,9 +189,10 @@ pub trait SinkExt<Item>: Sink<Item> {
181189
/// This adapter clones each incoming item and forwards it to both this as well as
182190
/// the other sink at the same time.
183191
fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
184-
where Self: Sized,
185-
Item: Clone,
186-
Si: Sink<Item, Error=Self::Error>
192+
where
193+
Self: Sized,
194+
Item: Clone,
195+
Si: Sink<Item, Error = Self::Error>,
187196
{
188197
Fanout::new(self, other)
189198
}
@@ -193,7 +202,8 @@ pub trait SinkExt<Item>: Sink<Item> {
193202
/// This adapter is intended to be used when you want to stop sending to the sink
194203
/// until all current requests are processed.
195204
fn flush(&mut self) -> Flush<'_, Self, Item>
196-
where Self: Unpin,
205+
where
206+
Self: Unpin,
197207
{
198208
Flush::new(self)
199209
}
@@ -205,7 +215,8 @@ pub trait SinkExt<Item>: Sink<Item> {
205215
/// to batch together items to send via `send_all`, rather than flushing
206216
/// between each item.**
207217
fn send(&mut self, item: Item) -> Send<'_, Self, Item>
208-
where Self: Unpin,
218+
where
219+
Self: Unpin,
209220
{
210221
Send::new(self, item)
211222
}
@@ -221,12 +232,10 @@ pub trait SinkExt<Item>: Sink<Item> {
221232
/// Doing `sink.send_all(stream)` is roughly equivalent to
222233
/// `stream.forward(sink)`. The returned future will exhaust all items from
223234
/// `stream` and send them to `self`.
224-
fn send_all<'a, St>(
225-
&'a mut self,
226-
stream: &'a mut St
227-
) -> SendAll<'a, Self, St>
228-
where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
229-
Self: Unpin,
235+
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
236+
where
237+
St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
238+
Self: Unpin,
230239
{
231240
SendAll::new(self, stream)
232241
}
@@ -237,8 +246,9 @@ pub trait SinkExt<Item>: Sink<Item> {
237246
/// This can be used in combination with the `right_sink` method to write `if`
238247
/// statements that evaluate to different streams in different branches.
239248
fn left_sink<Si2>(self) -> Either<Self, Si2>
240-
where Si2: Sink<Item, Error = Self::Error>,
241-
Self: Sized
249+
where
250+
Si2: Sink<Item, Error = Self::Error>,
251+
Self: Sized,
242252
{
243253
Either::Left(self)
244254
}
@@ -249,8 +259,9 @@ pub trait SinkExt<Item>: Sink<Item> {
249259
/// This can be used in combination with the `left_sink` method to write `if`
250260
/// statements that evaluate to different streams in different branches.
251261
fn right_sink<Si1>(self) -> Either<Si1, Self>
252-
where Si1: Sink<Item, Error = Self::Error>,
253-
Self: Sized
262+
where
263+
Si1: Sink<Item, Error = Self::Error>,
264+
Self: Sized,
254265
{
255266
Either::Right(self)
256267
}
@@ -260,39 +271,44 @@ pub trait SinkExt<Item>: Sink<Item> {
260271
#[cfg(feature = "compat")]
261272
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
262273
fn compat(self) -> CompatSink<Self, Item>
263-
where Self: Sized + Unpin,
274+
where
275+
Self: Sized + Unpin,
264276
{
265277
CompatSink::new(self)
266278
}
267-
279+
268280
/// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
269281
/// sink types.
270282
fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
271-
where Self: Unpin
283+
where
284+
Self: Unpin,
272285
{
273286
Pin::new(self).poll_ready(cx)
274287
}
275288

276289
/// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
277290
/// sink types.
278291
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
279-
where Self: Unpin
292+
where
293+
Self: Unpin,
280294
{
281295
Pin::new(self).start_send(item)
282296
}
283297

284298
/// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
285299
/// sink types.
286300
fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
287-
where Self: Unpin
301+
where
302+
Self: Unpin,
288303
{
289304
Pin::new(self).poll_flush(cx)
290305
}
291306

292307
/// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
293308
/// sink types.
294309
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
295-
where Self: Unpin
310+
where
311+
Self: Unpin,
296312
{
297313
Pin::new(self).poll_close(cx)
298314
}

futures-util/src/sink/unfold.rs

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use core::{future::Future, pin::Pin};
2+
use futures_core::task::{Context, Poll};
3+
use futures_sink::Sink;
4+
use pin_project::pin_project;
5+
6+
/// Sink for the [`unfold`] function.
7+
#[pin_project]
8+
#[derive(Debug)]
9+
#[must_use = "sinks do nothing unless polled"]
10+
pub struct Unfold<T, F, R> {
11+
state: Option<T>,
12+
function: F,
13+
#[pin]
14+
future: Option<R>,
15+
}
16+
17+
/// Create a sink from a function which processes one item at a time.
18+
///
19+
/// # Examples
20+
///
21+
/// ```
22+
/// # futures::executor::block_on(async {
23+
/// use futures::sink::{self, SinkExt};
24+
///
25+
/// let unfold = sink::unfold(0, |mut sum, i: i32| {
26+
/// async move {
27+
/// sum += i;
28+
/// eprintln!("{}", i);
29+
/// Ok::<_, futures::never::Never>(sum)
30+
/// }
31+
/// });
32+
/// futures::pin_mut!(unfold);
33+
/// unfold.send(5).await?;
34+
/// # Ok::<(), futures::never::Never>(()) }).unwrap();
35+
/// ```
36+
pub fn unfold<T, F, R>(init: T, function: F) -> Unfold<T, F, R> {
37+
Unfold {
38+
state: Some(init),
39+
function,
40+
future: None,
41+
}
42+
}
43+
44+
impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
45+
where
46+
F: FnMut(T, Item) -> R,
47+
R: Future<Output = Result<T, E>>,
48+
{
49+
type Error = E;
50+
51+
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
52+
self.poll_flush(cx)
53+
}
54+
55+
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
56+
let mut this = self.project();
57+
debug_assert!(this.future.is_none());
58+
let future = (this.function)(this.state.take().unwrap(), item);
59+
this.future.set(Some(future));
60+
Ok(())
61+
}
62+
63+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64+
let mut this = self.project();
65+
Poll::Ready(if let Some(future) = this.future.as_mut().as_pin_mut() {
66+
let result = match ready!(future.poll(cx)) {
67+
Ok(state) => {
68+
*this.state = Some(state);
69+
Ok(())
70+
}
71+
Err(err) => Err(err),
72+
};
73+
this.future.set(None);
74+
result
75+
} else {
76+
Ok(())
77+
})
78+
}
79+
80+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81+
self.poll_flush(cx)
82+
}
83+
}

futures/src/lib.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,8 @@ pub mod future {
280280
NeverError,
281281

282282
TryFutureExt,
283-
AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, OrElse,
284-
InspectOk, InspectErr, TryFlattenStream, UnwrapOrElse,
283+
AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, MapOkOrElse, MapInto,
284+
OrElse, OkInto, InspectOk, InspectErr, TryFlatten, TryFlattenStream, UnwrapOrElse,
285285
};
286286

287287
#[cfg(feature = "alloc")]
@@ -348,6 +348,9 @@ pub mod io {
348348
Repeat, ReuniteError, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf,
349349
WriteVectored,
350350
};
351+
352+
#[cfg(feature = "write-all-vectored")]
353+
pub use futures_util::io::WriteAllVectored;
351354
}
352355

353356
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
@@ -417,7 +420,7 @@ pub mod sink {
417420

418421
pub use futures_util::sink::{
419422
Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With,
420-
SinkExt, Fanout, Drain, drain,
423+
SinkExt, Fanout, Drain, drain, Unfold, unfold,
421424
WithFlatMap,
422425
};
423426

0 commit comments

Comments
 (0)