diff --git a/futures-channel/Cargo.toml b/futures-channel/Cargo.toml index 44da97753c..0ba7ac3266 100644 --- a/futures-channel/Cargo.toml +++ b/futures-channel/Cargo.toml @@ -18,5 +18,5 @@ default = ["std"] futures-core = { path = "../futures-core", version = "0.2", default-features = false } [dev-dependencies] -futures = { path = "../futures", version = "0.2", default-features = false } -futures-executor = { path = "../futures-executor", version = "0.2", default-features = false } +futures = { path = "../futures", version = "0.2", default-features = true } +futures-executor = { path = "../futures-executor", version = "0.2", default-features = true } diff --git a/futures-channel/benches/sync_mpsc.rs b/futures-channel/benches/sync_mpsc.rs index 49ff91f579..f6d8087e1c 100755 --- a/futures-channel/benches/sync_mpsc.rs +++ b/futures-channel/benches/sync_mpsc.rs @@ -1,5 +1,6 @@ #![feature(test)] +#[macro_use] extern crate futures; extern crate futures_channel; extern crate test; @@ -111,17 +112,11 @@ impl Stream for TestSender { type Error = (); fn poll(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { - match self.tx.start_send(cx, self.last + 1) { - Err(_) => panic!(), - Ok(Ok(())) => { - self.last += 1; - assert_eq!(Ok(Async::Ready(())), self.tx.flush(cx)); - Ok(Async::Ready(Some(self.last))) - } - Ok(Err(_)) => { - Ok(Async::Pending) - } - } + try_ready!(self.tx.poll_ready(cx).map_err(|_| ())); + self.tx.start_send(self.last + 1).unwrap(); + self.last += 1; + assert!(self.tx.poll_flush(cx).unwrap().is_ready()); + Ok(Async::Ready(Some(self.last))) } } diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index 910caaebf8..7160458129 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -129,60 +129,63 @@ pub struct Receiver { #[derive(Debug)] pub struct UnboundedReceiver(Receiver); -/// Error type for sending, used when the receiving end of a channel is -/// dropped -#[derive(Clone, PartialEq, Eq)] -pub struct SendError(T); +/// The error type of ` as Sink>` +/// +/// It will contain a value of type `T` if one was passed to `start_send` +/// after the channel was closed. +pub struct ChannelClosed(Option); /// Error type returned from `try_send` #[derive(Clone, PartialEq, Eq)] -pub struct TrySendError { - kind: TrySendErrorKind, +pub struct TryChannelClosed { + kind: TryChannelClosedKind, } #[derive(Clone, PartialEq, Eq)] -enum TrySendErrorKind { +enum TryChannelClosedKind { Full(T), Disconnected(T), } -impl fmt::Debug for SendError { +impl fmt::Debug for ChannelClosed { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_tuple("SendError") + fmt.debug_tuple("ChannelClosed") .field(&"...") .finish() } } -impl fmt::Display for SendError { +impl fmt::Display for ChannelClosed { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "send failed because receiver is gone") } } -impl Error for SendError +impl Error for ChannelClosed { fn description(&self) -> &str { "send failed because receiver is gone" } } -impl SendError { +impl ChannelClosed { /// Returns the message that was attempted to be sent but failed. - pub fn into_inner(self) -> T { + /// This method returns `None` if no item was being sent when the + /// error occurred. + pub fn into_inner(self) -> Option { self.0 } } -impl fmt::Debug for TrySendError { +impl fmt::Debug for TryChannelClosed { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_tuple("TrySendError") + fmt.debug_tuple("TryChannelClosed") .field(&"...") .finish() } } -impl fmt::Display for TrySendError { +impl fmt::Display for TryChannelClosed { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { if self.is_full() { write!(fmt, "send failed because channel is full") @@ -192,7 +195,7 @@ impl fmt::Display for TrySendError { } } -impl Error for TrySendError { +impl Error for TryChannelClosed { fn description(&self) -> &str { if self.is_full() { "send failed because channel is full" @@ -202,10 +205,10 @@ impl Error for TrySendError { } } -impl TrySendError { +impl TryChannelClosed { /// Returns true if this error is a result of the channel being full pub fn is_full(&self) -> bool { - use self::TrySendErrorKind::*; + use self::TryChannelClosedKind::*; match self.kind { Full(_) => true, @@ -215,7 +218,7 @@ impl TrySendError { /// Returns true if this error is a result of the receiver being dropped pub fn is_disconnected(&self) -> bool { - use self::TrySendErrorKind::*; + use self::TryChannelClosedKind::*; match self.kind { Disconnected(_) => true, @@ -225,7 +228,7 @@ impl TrySendError { /// Returns the message that was attempted to be sent but failed. pub fn into_inner(self) -> T { - use self::TrySendErrorKind::*; + use self::TryChannelClosedKind::*; match self.kind { Full(v) | Disconnected(v) => v, @@ -395,47 +398,34 @@ impl Sender { /// It is not recommended to call this function from inside of a future, /// only from an external thread where you've otherwise arranged to be /// notified when the channel is no longer full. - pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError> { + pub fn try_send(&mut self, msg: T) -> Result<(), TryChannelClosed> { // If the sender is currently blocked, reject the message if !self.poll_unparked(None).is_ready() { - return Err(TrySendError { - kind: TrySendErrorKind::Full(msg), + return Err(TryChannelClosed { + kind: TryChannelClosedKind::Full(msg), }); } // The channel has capacity to accept the message, so send it self.do_send(Some(msg), None) - .map_err(|SendError(v)| { - TrySendError { - kind: TrySendErrorKind::Disconnected(v), + .map_err(|ChannelClosed(v)| { + TryChannelClosed { + kind: TryChannelClosedKind::Disconnected(v.unwrap()), } }) } /// Attempt to start sending a message on the channel. - /// - /// If there is not room on the channel, this function will return - /// `Ok(Err(msg))`, and the current `Task` will be - /// awoken when the channel is ready to receive more messages. - /// - /// On successful completion, this function will return `Ok(Ok(()))`. - pub fn start_send(&mut self, cx: &mut task::Context, msg: T) -> Result, SendError> { - // If the sender is currently blocked, reject the message before doing - // any work. - if !self.poll_unparked(Some(cx)).is_ready() { - return Ok(Err(msg)); - } - - // The channel has capacity to accept the message, so send it. - self.do_send(Some(msg), Some(cx))?; - - Ok(Ok(())) + /// This function should only be called after `poll_ready` has responded + /// that the channel is ready to receive a message. + pub fn start_send(&mut self, msg: T) -> Result<(), ChannelClosed> { + self.do_send(Some(msg), None) } // Do the send without failing // None means close fn do_send(&mut self, msg: Option, cx: Option<&mut task::Context>) - -> Result<(), SendError> + -> Result<(), ChannelClosed> { // First, increment the number of messages contained by the channel. // This operation will also atomically determine if the sender task @@ -456,7 +446,7 @@ impl Sender { // num-senders + buffer + 1 // if let Some(msg) = msg { - return Err(SendError(msg)); + return Err(ChannelClosed(Some(msg))); } else { return Ok(()); } @@ -482,10 +472,10 @@ impl Sender { // Do the send without parking current task. // // To be called from unbounded sender. - fn do_send_nb(&self, msg: T) -> Result<(), SendError> { + fn do_send_nb(&self, msg: T) -> Result<(), ChannelClosed> { match self.inc_num_messages(false) { Some(park_self) => assert!(!park_self), - None => return Err(SendError(msg)), + None => return Err(ChannelClosed(Some(msg))), }; self.queue_push_and_signal(Some(msg)); @@ -601,15 +591,15 @@ impl Sender { /// /// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns /// `Ok(Async::Pending)` if the channel is not guaranteed to have capacity. Returns - /// `Err(SendError(_))` if the receiver has been dropped. + /// `Err(ChannelClosed(_))` if the receiver has been dropped. /// /// # Panics /// /// This method will panic if called from outside the context of a task or future. - pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), SendError<()>> { + pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ChannelClosed> { let state = decode_state(self.inner.state.load(SeqCst)); if !state.is_open { - return Err(SendError(())); + return Err(ChannelClosed(None)); } Ok(self.poll_unparked(Some(cx))) @@ -643,16 +633,16 @@ impl Sender { } impl UnboundedSender { + /// Check if the channel is ready to receive a message. + pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ChannelClosed> { + self.0.poll_ready(cx) + } + /// Attempt to start sending a message on the channel. - /// - /// If there is not room on the channel, this function will return - /// `Ok(Err(msg))`, and the current `Task` will be - /// awoken when the channel is ready to receive more messages. - /// - /// On a successful `start_send`, this function will return - /// `Ok(Ok(())`. - pub fn start_send(&mut self, cx: &mut task::Context, msg: T) -> Result, SendError> { - self.0.start_send(cx, msg) + /// This function should only be called after `poll_ready` has been used to + /// verify that the channel is ready to receive a message. + pub fn start_send(&mut self, msg: T) -> Result<(), ChannelClosed> { + self.0.start_send(msg) } /// Sends the provided message along this channel. @@ -660,7 +650,7 @@ impl UnboundedSender { /// This is an unbounded sender, so this function differs from `Sink::send` /// by ensuring the return type reflects that the channel is always ready to /// receive messages. - pub fn unbounded_send(&self, msg: T) -> Result<(), SendError> { + pub fn unbounded_send(&self, msg: T) -> Result<(), ChannelClosed> { self.0.do_send_nb(msg) } } diff --git a/futures-channel/tests/mpsc.rs b/futures-channel/tests/mpsc.rs index 82345aca05..6f2e17319d 100644 --- a/futures-channel/tests/mpsc.rs +++ b/futures-channel/tests/mpsc.rs @@ -32,24 +32,22 @@ fn send_recv_no_buffer() { // Run on a task context let f = poll_fn(move |cx| { - assert!(tx.flush(cx).unwrap().is_ready()); + assert!(tx.poll_flush(cx).unwrap().is_ready()); assert!(tx.poll_ready(cx).unwrap().is_ready()); // Send first message - let res = tx.start_send(cx, 1).unwrap(); - assert!(res.is_ok()); + assert!(tx.start_send(1).is_ok()); assert!(tx.poll_ready(cx).unwrap().is_not_ready()); // Send second message - let res = tx.start_send(cx, 2).unwrap(); - assert!(res.is_err()); + assert!(tx.poll_ready(cx).unwrap().is_not_ready()); // Take the value assert_eq!(rx.poll(cx).unwrap(), Async::Ready(Some(1))); assert!(tx.poll_ready(cx).unwrap().is_ready()); - let res = tx.start_send(cx, 2).unwrap(); - assert!(res.is_ok()); + assert!(tx.poll_ready(cx).unwrap().is_ready()); + assert!(tx.start_send(2).is_ok()); assert!(tx.poll_ready(cx).unwrap().is_not_ready()); // Take the value @@ -430,7 +428,7 @@ fn stress_poll_ready() { // (asserting that it doesn't attempt to block). while self.count > 0 { try_ready!(self.sender.poll_ready(cx).map_err(|_| ())); - assert!(self.sender.start_send(cx, self.count).unwrap().is_ok()); + assert!(self.sender.start_send(self.count).is_ok()); self.count -= 1; } Ok(Async::Ready(())) @@ -502,7 +500,7 @@ fn try_send_2() { let th = thread::spawn(|| { block_on(poll_fn(|cx| { - assert!(tx.start_send(cx, "fail").unwrap().is_err()); + assert!(tx.poll_ready(cx).unwrap().is_not_ready()); Ok::<_, ()>(Async::Ready(())) })).unwrap(); diff --git a/futures-sink/src/channel_impls.rs b/futures-sink/src/channel_impls.rs index 009f237fa6..03affa70f7 100644 --- a/futures-sink/src/channel_impls.rs +++ b/futures-sink/src/channel_impls.rs @@ -1,62 +1,66 @@ -use {Async, Sink, AsyncSink, StartSend, Poll}; +use {Async, Sink, Poll}; use futures_core::task; -use futures_channel::mpsc::{Sender, SendError, UnboundedSender}; - -fn res_to_async_sink(res: Result<(), T>) -> AsyncSink { - match res { - Ok(()) => AsyncSink::Ready, - Err(x) => AsyncSink::Pending(x), - } -} +use futures_channel::mpsc::{Sender, ChannelClosed, UnboundedSender}; impl Sink for Sender { type SinkItem = T; - type SinkError = SendError; + type SinkError = ChannelClosed; - fn start_send(&mut self, cx: &mut task::Context, msg: T) -> StartSend> { - self.start_send(cx, msg).map(res_to_async_sink) + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.poll_ready(cx) } - fn flush(&mut self, _: &mut task::Context) -> Poll<(), SendError> { - Ok(Async::Ready(())) + fn start_send(&mut self, msg: T) -> Result<(), Self::SinkError> { + self.start_send(msg) } - fn close(&mut self, _: &mut task::Context) -> Poll<(), SendError> { + fn start_close(&mut self) -> Result<(), Self::SinkError> { + Ok(()) + } + + fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { Ok(Async::Ready(())) } } impl Sink for UnboundedSender { type SinkItem = T; - type SinkError = SendError; + type SinkError = ChannelClosed; - fn start_send(&mut self, cx: &mut task::Context, msg: T) -> StartSend> { - self.start_send(cx, msg).map(res_to_async_sink) + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.poll_ready(cx) } - fn flush(&mut self, _: &mut task::Context) -> Poll<(), SendError> { - Ok(Async::Ready(())) + fn start_send(&mut self, msg: T) -> Result<(), Self::SinkError> { + self.start_send(msg) } - fn close(&mut self, _: &mut task::Context) -> Poll<(), SendError> { + fn start_close(&mut self) -> Result<(), Self::SinkError> { + Ok(()) + } + + fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { Ok(Async::Ready(())) } } impl<'a, T> Sink for &'a UnboundedSender { type SinkItem = T; - type SinkError = SendError; + type SinkError = ChannelClosed; + + fn poll_ready(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { + Ok(Async::Ready(())) + } - fn start_send(&mut self, _: &mut task::Context, msg: T) -> StartSend> { - self.unbounded_send(msg)?; - Ok(AsyncSink::Ready) + fn start_send(&mut self, msg: T) -> Result<(), Self::SinkError> { + self.unbounded_send(msg) } - fn flush(&mut self, _: &mut task::Context) -> Poll<(), SendError> { - Ok(Async::Ready(())) + fn start_close(&mut self) -> Result<(), Self::SinkError> { + Ok(()) } - fn close(&mut self, _: &mut task::Context) -> Poll<(), SendError> { + fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { Ok(Async::Ready(())) } } diff --git a/futures-sink/src/lib.rs b/futures-sink/src/lib.rs index 1d72d93a19..76bf133abc 100644 --- a/futures-sink/src/lib.rs +++ b/futures-sink/src/lib.rs @@ -21,75 +21,32 @@ macro_rules! if_std { )*) } -use futures_core::Poll; -use futures_core::task; - -/// The result of an asynchronous attempt to send a value to a sink. -#[derive(Copy, Clone, Debug, PartialEq)] -pub enum AsyncSink { - /// The `start_send` attempt succeeded, so the sending process has - /// *started*; you must use `Sink::flush` to drive the send - /// to completion. - Ready, - - /// The `start_send` attempt failed due to the sink being full. The value - /// being sent is returned, and the current `Task` will be automatically - /// notified again once the sink has room. - Pending(T), -} - -impl AsyncSink { - /// Change the Pending value of this `AsyncSink` with the closure provided - pub fn map(self, f: F) -> AsyncSink - where F: FnOnce(T) -> U, - { - match self { - AsyncSink::Ready => AsyncSink::Ready, - AsyncSink::Pending(t) => AsyncSink::Pending(f(t)), - } - } - - /// Returns whether this is `AsyncSink::Ready` - pub fn is_ready(&self) -> bool { - match *self { - AsyncSink::Ready => true, - AsyncSink::Pending(_) => false, - } - } - - /// Returns whether this is `AsyncSink::Pending` - pub fn is_not_ready(&self) -> bool { - !self.is_ready() - } -} - -/// Return type of the `Sink::start_send` method, indicating the outcome of a -/// send attempt. See `AsyncSink` for more details. -pub type StartSend = Result, E>; +use futures_core::{Async, Poll, task}; if_std! { mod channel_impls; - use futures_core::Async; use futures_core::never::Never; impl Sink for ::std::vec::Vec { type SinkItem = T; type SinkError = Never; - fn start_send(&mut self, _: &mut task::Context, item: Self::SinkItem) - -> StartSend - { + fn poll_ready(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { + Ok(Async::Ready(())) + } + + fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { self.push(item); - Ok(::AsyncSink::Ready) + Ok(()) } - fn flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { - Ok(::Async::Ready(())) + fn start_close(&mut self) -> Result<(), Self::SinkError> { + Ok(()) } - fn close(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { - Ok(::Async::Ready(())) + fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { + Ok(Async::Ready(())) } } @@ -101,17 +58,20 @@ if_std! { type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: Self::SinkItem) - -> StartSend { - (**self).start_send(cx, item) + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + (**self).poll_ready(cx) + } + + fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { + (**self).start_send(item) } - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - (**self).flush(cx) + fn start_close(&mut self) -> Result<(), Self::SinkError> { + (**self).start_close() } - fn close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - (**self).close(cx) + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + (**self).poll_flush(cx) } } } @@ -154,6 +114,9 @@ pub trait Sink { /// The type of value produced by the sink when an error occurs. type SinkError; + /// Check if the sink is ready to start sending a value. + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError>; + /// Begin the process of sending a value to the sink. /// /// As the name suggests, this method only *begins* the process of sending @@ -192,8 +155,12 @@ pub trait Sink { /// /// - It is called outside of the context of a task. /// - A previous call to `start_send` or `flush` yielded an error. - fn start_send(&mut self, cx: &mut task::Context, item: Self::SinkItem) - -> StartSend; + fn start_send(&mut self, item: Self::SinkItem) + -> Result<(), Self::SinkError>; + + + /// Set the `Sink` to start closing. + fn start_close(&mut self) -> Result<(), Self::SinkError>; /// Flush all output from this sink, if necessary. /// @@ -243,90 +210,26 @@ pub trait Sink { /// In the 0.2 release series of futures this method will be renamed to /// `flush`. For 0.1, however, the breaking change is not happening /// yet. - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError>; - - /// A method to indicate that no more values will ever be pushed into this - /// sink. - /// - /// This method is used to indicate that a sink will no longer even be given - /// another value by the caller. That is, the `start_send` method above will - /// be called no longer (nor `flush`). This method is intended to - /// model "graceful shutdown" in various protocols where the intent to shut - /// down is followed by a little more blocking work. - /// - /// Callers of this function should work it it in a similar fashion to - /// `flush`. Once called it may return `Pending` which indicates - /// that more external work needs to happen to make progress. The current - /// task will be scheduled to receive a notification in such an event, - /// however. - /// - /// Note that this function will imply `flush` above. That is, if a - /// sink has buffered data, then it'll be flushed out during a `close` - /// operation. It is not necessary to have `flush` return `Ready` - /// before a `close` is called. Once a `close` is called, though, - /// `flush` cannot be called. - /// - /// # Return value - /// - /// This function, like `flush`, returns a `Poll`. The value is - /// `Ready` once the close operation has completed. At that point it should - /// be safe to drop the sink and deallocate associated resources. - /// - /// If the value returned is `Pending` then the sink is not yet closed and - /// work needs to be done to close it. The work has been scheduled and the - /// current task will receive a notification when it's next ready to call - /// this method again. - /// - /// Finally, this function may also return an error. - /// - /// # Errors - /// - /// This function will return an `Err` if any operation along the way during - /// the close operation fails. An error typically is fatal for a sink and is - /// unable to be recovered from, but in specific situations this may not - /// always be true. - /// - /// Note that it's also typically an error to call `start_send` or - /// `flush` after the `close` function is called. This method will - /// *initiate* a close, and continuing to send values after that (or attempt - /// to flush) may result in strange behavior, panics, errors, etc. Once this - /// method is called, it must be the only method called on this `Sink`. - /// - /// # Panics - /// - /// This method may panic or cause panics if: - /// - /// * It is called outside the context of a future's task - /// * It is called and then `start_send` or `flush` is called - /// - /// # Compatibility notes - /// - /// Note that this function is currently by default a provided function, - /// defaulted to calling `flush` above. This function was added - /// in the 0.1 series of the crate as a backwards-compatible addition. It - /// is intended that in the 0.2 series the method will no longer be a - /// default method. - /// - /// It is highly recommended to consider this method a required method and - /// to implement it whenever you implement `Sink` locally. It is especially - /// crucial to be sure to close inner sinks, if applicable. - fn close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError>; + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError>; } impl<'a, S: ?Sized + Sink> Sink for &'a mut S { type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: Self::SinkItem) - -> StartSend { - (**self).start_send(cx, item) + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + (**self).poll_ready(cx) + } + + fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { + (**self).start_send(item) } - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - (**self).flush(cx) + fn start_close(&mut self) -> Result<(), Self::SinkError> { + (**self).start_close() } - fn close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - (**self).close(cx) + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + (**self).poll_flush(cx) } } diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 3cbcd3330e..a6efc7a2b9 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -15,6 +15,26 @@ macro_rules! if_std { )*) } +macro_rules! delegate_sink { + ($field:ident) => { + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.$field.poll_ready(cx) + } + + fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { + self.$field.start_send(item) + } + + fn start_close(&mut self) -> Result<(), Self::SinkError> { + self.$field.start_close() + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.$field.poll_flush(cx) + } + } +} + #[macro_use] #[cfg(feature = "std")] extern crate std; diff --git a/futures-util/src/sink/buffer.rs b/futures-util/src/sink/buffer.rs index 017f744927..d05bd99951 100644 --- a/futures-util/src/sink/buffer.rs +++ b/futures-util/src/sink/buffer.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use futures_core::{Poll, Async, Stream}; use futures_core::task; -use futures_sink::{StartSend, AsyncSink, Sink}; +use futures_sink::Sink; /// Sink for the `Sink::buffer` combinator, which buffers up to some fixed /// number of values when the underlying sink is unable to accept them. @@ -14,6 +14,10 @@ pub struct Buffer { // Track capacity separately from the `VecDeque`, which may be rounded up cap: usize, + + // Whether or not to try closing the inner sink once the current buffer has + // been flushed into it. + do_close: bool, } pub fn new(sink: S, amt: usize) -> Buffer { @@ -21,6 +25,7 @@ pub fn new(sink: S, amt: usize) -> Buffer { sink: sink, buf: VecDeque::with_capacity(amt), cap: amt, + do_close: false, } } @@ -44,14 +49,13 @@ impl Buffer { } fn try_empty_buffer(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { + try_ready!(self.sink.poll_ready(cx)); while let Some(item) = self.buf.pop_front() { - if let AsyncSink::Pending(item) = self.sink.start_send(cx, item)? { - self.buf.push_front(item); - - return Ok(Async::Pending); + self.sink.start_send(item)?; + if self.buf.len() != 0 { + try_ready!(self.sink.poll_ready(cx)); } } - Ok(Async::Ready(())) } } @@ -70,38 +74,42 @@ impl Sink for Buffer { type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: Self::SinkItem) -> StartSend { + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { if self.cap == 0 { - return self.sink.start_send(cx, item); + return self.sink.poll_ready(cx); } self.try_empty_buffer(cx)?; - if self.buf.len() == self.cap { - return Ok(AsyncSink::Pending(item)); - } - self.buf.push_back(item); - Ok(AsyncSink::Ready) + + Ok(if self.buf.len() >= self.cap { + Async::Pending + } else { + Async::Ready(()) + }) } - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { if self.cap == 0 { - return self.sink.flush(cx); + self.sink.start_send(item) + } else { + self.buf.push_back(item); + Ok(()) } + } - try_ready!(self.try_empty_buffer(cx)); - debug_assert!(self.buf.is_empty()); - self.sink.flush(cx) + fn start_close(&mut self) -> Result<(), Self::SinkError> { + self.do_close = true; + Ok(()) } - fn close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - if self.cap == 0 { - return self.sink.close(cx); - } + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + try_ready!(self.try_empty_buffer(cx)); + debug_assert!(self.buf.is_empty()); - if self.buf.len() > 0 { - try_ready!(self.try_empty_buffer(cx)); + if self.do_close { + self.sink.start_close()?; + self.do_close = false; } - assert_eq!(self.buf.len(), 0); - self.sink.close(cx) + self.sink.poll_flush(cx) } } diff --git a/futures-util/src/sink/close.rs b/futures-util/src/sink/close.rs index f9dec0c7b4..618ed0d402 100644 --- a/futures-util/src/sink/close.rs +++ b/futures-util/src/sink/close.rs @@ -8,13 +8,14 @@ use futures_sink::Sink; #[must_use = "futures do nothing unless polled"] pub struct Close { sink: Option, + started_close: bool, } /// A future that completes when the sink has finished closing. /// /// The sink itself is returned after closeing is complete. pub fn close(sink: S) -> Close { - Close { sink: Some(sink) } + Close { sink: Some(sink), started_close: false } } impl Close { @@ -43,7 +44,11 @@ impl Future for Close { fn poll(&mut self, cx: &mut task::Context) -> Poll { let mut sink = self.sink.take().expect("Attempted to poll Close after it completed"); - if sink.close(cx)?.is_ready() { + if !self.started_close { + sink.start_close()?; + self.started_close = true; + } + if sink.poll_flush(cx)?.is_ready() { Ok(Async::Ready(sink)) } else { self.sink = Some(sink); diff --git a/futures-util/src/sink/fanout.rs b/futures-util/src/sink/fanout.rs index d641aff0eb..8cb6382c3d 100644 --- a/futures-util/src/sink/fanout.rs +++ b/futures-util/src/sink/fanout.rs @@ -1,9 +1,8 @@ use core::fmt::{Debug, Formatter, Result as FmtResult}; -use core::mem::replace; use futures_core::{Async, Poll}; use futures_core::task; -use futures_sink::{AsyncSink, Sink, StartSend}; +use futures_sink::{ Sink}; /// Sink that clones incoming items and forwards them to two sinks at the same time. /// @@ -51,38 +50,31 @@ impl Sink for Fanout type SinkItem = A::SinkItem; type SinkError = A::SinkError; - fn start_send( - &mut self, - cx: &mut task::Context, - item: Self::SinkItem - ) -> StartSend { - // Attempt to complete processing any outstanding requests. + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { self.left.keep_flushing(cx)?; self.right.keep_flushing(cx)?; - // Only if both downstream sinks are ready, start sending the next item. if self.left.is_ready() && self.right.is_ready() { - self.left.state = self.left.sink.start_send(cx, item.clone())?; - self.right.state = self.right.sink.start_send(cx, item)?; - Ok(AsyncSink::Ready) - } else { - Ok(AsyncSink::Pending(item)) - } - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - let left_async = self.left.flush(cx)?; - let right_async = self.right.flush(cx)?; - // Only if both downstream sinks are ready, signal readiness. - if left_async.is_ready() && right_async.is_ready() { Ok(Async::Ready(())) } else { Ok(Async::Pending) } } - fn close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - let left_async = self.left.close(cx)?; - let right_async = self.right.close(cx)?; + fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { + self.left.sink.start_send(item.clone())?; + self.right.sink.start_send(item)?; + Ok(()) + } + + fn start_close(&mut self) -> Result<(), Self::SinkError> { + self.left.start_close(); + self.right.start_close(); + Ok(()) + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + let left_async = self.left.poll_flush(cx)?; + let right_async = self.right.poll_flush(cx)?; // Only if both downstream sinks are ready, signal readiness. if left_async.is_ready() && right_async.is_ready() { Ok(Async::Ready(())) @@ -95,44 +87,47 @@ impl Sink for Fanout #[derive(Debug)] struct Downstream { sink: S, - state: AsyncSink + state: Option, + do_close: bool, } impl Downstream { fn new(sink: S) -> Self { - Downstream { sink: sink, state: AsyncSink::Ready } + Downstream { sink: sink, state: None, do_close: false } } fn is_ready(&self) -> bool { - self.state.is_ready() + self.state.is_none() + } + + fn start_close(&mut self) { + self.do_close = true; } fn keep_flushing(&mut self, cx: &mut task::Context) -> Result<(), S::SinkError> { - if let AsyncSink::Pending(item) = replace(&mut self.state, AsyncSink::Ready) { - self.state = self.sink.start_send(cx, item)?; + if let Async::Ready(()) = self.sink.poll_ready(cx)? { + if let Some(item) = self.state.take() { + self.sink.start_send(item)?; + } } Ok(()) } - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { self.keep_flushing(cx)?; - let async = self.sink.flush(cx)?; + let async = self.sink.poll_flush(cx)?; + + if self.is_ready() && self.do_close { + self.sink.start_close()?; + self.do_close = false; + } + // Only if all values have been sent _and_ the underlying // sink is completely flushed, signal readiness. - if self.state.is_ready() && async.is_ready() { + if self.state.is_none() && async.is_ready() { Ok(Async::Ready(())) } else { Ok(Async::Pending) } } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.keep_flushing(cx)?; - // If all items have been flushed, initiate close. - if self.state.is_ready() { - self.sink.close(cx) - } else { - Ok(Async::Pending) - } - } } diff --git a/futures-util/src/sink/flush.rs b/futures-util/src/sink/flush.rs index d13cfd3b5c..cbfa409347 100644 --- a/futures-util/src/sink/flush.rs +++ b/futures-util/src/sink/flush.rs @@ -46,7 +46,7 @@ impl Future for Flush { fn poll(&mut self, cx: &mut task::Context) -> Poll { let mut sink = self.sink.take().expect("Attempted to poll Flush after it completed"); - if sink.flush(cx)?.is_ready() { + if sink.poll_flush(cx)?.is_ready() { Ok(Async::Ready(sink)) } else { self.sink = Some(sink); diff --git a/futures-util/src/sink/from_err.rs b/futures-util/src/sink/from_err.rs index c3433faf32..b5a1364684 100644 --- a/futures-util/src/sink/from_err.rs +++ b/futures-util/src/sink/from_err.rs @@ -1,37 +1,35 @@ -use core::marker::PhantomData; - use futures_core::{Stream, Poll}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; +use sink::{SinkExt, SinkMapErr}; /// A sink combinator to change the error type of a sink. /// /// This is created by the `Sink::from_err` method. #[derive(Debug)] #[must_use = "futures do nothing unless polled"] -pub struct SinkFromErr { - sink: S, - f: PhantomData +pub struct SinkFromErr { + sink: SinkMapErr E>, } pub fn new(sink: S) -> SinkFromErr - where S: Sink + where S: Sink, + E: From { SinkFromErr { - sink: sink, - f: PhantomData + sink: SinkExt::sink_map_err(sink, Into::into), } } -impl SinkFromErr { +impl SinkFromErr { /// Get a shared reference to the inner sink. pub fn get_ref(&self) -> &S { - &self.sink + self.sink.get_ref() } /// Get a mutable reference to the inner sink. pub fn get_mut(&mut self) -> &mut S { - &mut self.sink + self.sink.get_mut() } /// Consumes this combinator, returning the underlying sink. @@ -39,7 +37,7 @@ impl SinkFromErr { /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> S { - self.sink + self.sink.into_inner() } } @@ -50,20 +48,10 @@ impl Sink for SinkFromErr type SinkItem = S::SinkItem; type SinkError = E; - fn start_send(&mut self, cx: &mut task::Context, item: Self::SinkItem) -> StartSend { - self.sink.start_send(cx, item).map_err(|e| e.into()) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - self.sink.flush(cx).map_err(|e| e.into()) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - self.sink.close(cx).map_err(|e| e.into()) - } + delegate_sink!(sink); } -impl Stream for SinkFromErr { +impl Stream for SinkFromErr { type Item = S::Item; type Error = S::Error; diff --git a/futures-util/src/sink/map_err.rs b/futures-util/src/sink/map_err.rs index b1e2a03c5a..fe3d526117 100644 --- a/futures-util/src/sink/map_err.rs +++ b/futures-util/src/sink/map_err.rs @@ -1,6 +1,6 @@ use futures_core::{Poll, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// Sink for the `Sink::sink_map_err` combinator. #[derive(Debug)] @@ -14,7 +14,7 @@ pub fn new(s: S, f: F) -> SinkMapErr { SinkMapErr { sink: s, f: Some(f) } } -impl SinkMapErr { +impl SinkMapErr { /// Get a shared reference to the inner sink. pub fn get_ref(&self) -> &S { &self.sink @@ -32,6 +32,10 @@ impl SinkMapErr { pub fn into_inner(self) -> S { self.sink } + + fn expect_f(&mut self) -> F { + self.f.take().expect("cannot use MapErr after an error") + } } impl Sink for SinkMapErr @@ -41,16 +45,20 @@ impl Sink for SinkMapErr type SinkItem = S::SinkItem; type SinkError = E; - fn start_send(&mut self, cx: &mut task::Context, item: Self::SinkItem) -> StartSend { - self.sink.start_send(cx, item).map_err(|e| self.f.take().expect("cannot use MapErr after an error")(e)) + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.sink.poll_ready(cx).map_err(|e| self.expect_f()(e)) + } + + fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { + self.sink.start_send(item).map_err(|e| self.expect_f()(e)) } - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - self.sink.flush(cx).map_err(|e| self.f.take().expect("cannot use MapErr after an error")(e)) + fn start_close(&mut self) -> Result<(), Self::SinkError> { + self.sink.start_close().map_err(|e| self.expect_f()(e)) } - fn close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - self.sink.close(cx).map_err(|e| self.f.take().expect("cannot use MapErr after an error")(e)) + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.sink.poll_flush(cx).map_err(|e| self.expect_f()(e)) } } diff --git a/futures-util/src/sink/send.rs b/futures-util/src/sink/send.rs index 9f7aa4ef02..455849f3ea 100644 --- a/futures-util/src/sink/send.rs +++ b/futures-util/src/sink/send.rs @@ -1,6 +1,6 @@ use futures_core::{Poll, Async, Future}; use futures_core::task; -use futures_sink::{Sink, AsyncSink}; +use futures_sink::{Sink}; /// Future for the `Sink::send` combinator, which sends a value to a sink and /// then waits until the sink has fully flushed. @@ -48,15 +48,20 @@ impl Future for Send { fn poll(&mut self, cx: &mut task::Context) -> Poll { if let Some(item) = self.item.take() { - if let AsyncSink::Pending(item) = self.sink_mut().start_send(cx, item)? { - self.item = Some(item); - return Ok(Async::Pending); + match self.sink_mut().poll_ready(cx)? { + Async::Ready(()) => { + self.sink_mut().start_send(item)?; + } + Async::Pending => { + self.item = Some(item); + return Ok(Async::Pending); + } } } // we're done sending the item, but want to block on flushing the // sink - try_ready!(self.sink_mut().flush(cx)); + try_ready!(self.sink_mut().poll_flush(cx)); // now everything's emptied, so return the sink for further use Ok(Async::Ready(self.take_sink())) diff --git a/futures-util/src/sink/send_all.rs b/futures-util/src/sink/send_all.rs index b7a4860aab..fce8c9445a 100644 --- a/futures-util/src/sink/send_all.rs +++ b/futures-util/src/sink/send_all.rs @@ -1,6 +1,6 @@ use futures_core::{Poll, Async, Future, Stream}; use futures_core::task; -use futures_sink::{Sink, AsyncSink}; +use futures_sink::{Sink}; use stream::{StreamExt, Fuse}; @@ -50,11 +50,16 @@ impl SendAll fn try_start_send(&mut self, cx: &mut task::Context, item: U::Item) -> Poll<(), T::SinkError> { debug_assert!(self.buffered.is_none()); - if let AsyncSink::Pending(item) = self.sink_mut().start_send(cx, item)? { - self.buffered = Some(item); - return Ok(Async::Pending) + match self.sink_mut().poll_ready(cx)? { + Async::Ready(()) => { + self.sink_mut().start_send(item)?; + Ok(Async::Ready(())) + } + Async::Pending => { + self.buffered = Some(item); + Ok(Async::Pending) + } } - Ok(Async::Ready(())) } } @@ -77,11 +82,11 @@ impl Future for SendAll match self.stream_mut().poll(cx)? { Async::Ready(Some(item)) => try_ready!(self.try_start_send(cx, item)), Async::Ready(None) => { - try_ready!(self.sink_mut().flush(cx)); + try_ready!(self.sink_mut().poll_flush(cx)); return Ok(Async::Ready(self.take_result())) } Async::Pending => { - try_ready!(self.sink_mut().flush(cx)); + try_ready!(self.sink_mut().poll_flush(cx)); return Ok(Async::Pending) } } diff --git a/futures-util/src/sink/with.rs b/futures-util/src/sink/with.rs index 0fdab3dd1a..664e298905 100644 --- a/futures-util/src/sink/with.rs +++ b/futures-util/src/sink/with.rs @@ -3,7 +3,7 @@ use core::marker::PhantomData; use futures_core::{IntoFuture, Future, Poll, Async, Stream}; use futures_core::task; -use futures_sink::{Sink, AsyncSink, StartSend}; +use futures_sink::{Sink}; /// Sink for the `Sink::with` combinator, chaining a computation to run *prior* /// to pushing a value into the underlying sink. @@ -15,6 +15,7 @@ pub struct With Fut: IntoFuture, { sink: S, + do_close: bool, f: F, state: State, _phantom: PhantomData, @@ -46,6 +47,7 @@ pub fn new(sink: S, f: F) -> With With { state: State::Empty, sink: sink, + do_close: false, f: f, _phantom: PhantomData, } @@ -105,9 +107,12 @@ impl With } } State::Buffered(item) => { - if let AsyncSink::Pending(item) = self.sink.start_send(cx, item)? { - self.state = State::Buffered(item); - break + match self.sink.poll_ready(cx)? { + Async::Ready(()) => self.sink.start_send(item)?, + Async::Pending => { + self.state = State::Buffered(item); + break + } } } } @@ -130,24 +135,26 @@ impl Sink for With type SinkItem = U; type SinkError = Fut::Error; - fn start_send(&mut self, cx: &mut task::Context, item: Self::SinkItem) -> StartSend { - if self.poll(cx)?.is_not_ready() { - return Ok(AsyncSink::Pending(item)) - } + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.poll(cx) + } + + fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { self.state = State::Process((self.f)(item).into_future()); - Ok(AsyncSink::Ready) + Ok(()) } - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), Fut::Error> { - // poll ourselves first, to push data downward - let me_ready = self.poll(cx)?; - // always propagate `flush` downward to attempt to make progress - try_ready!(self.sink.flush(cx)); - Ok(me_ready) + fn start_close(&mut self) -> Result<(), Self::SinkError> { + self.do_close = true; + Ok(()) } - fn close(&mut self, cx: &mut task::Context) -> Poll<(), Fut::Error> { + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { try_ready!(self.poll(cx)); - Ok(self.sink.close(cx)?) + if self.do_close { + self.sink.start_close()?; + self.do_close = false; + } + self.sink.poll_flush(cx).map_err(Into::into) } } diff --git a/futures-util/src/sink/with_flat_map.rs b/futures-util/src/sink/with_flat_map.rs index 1eaec3deba..82330c1b77 100644 --- a/futures-util/src/sink/with_flat_map.rs +++ b/futures-util/src/sink/with_flat_map.rs @@ -2,7 +2,7 @@ use core::marker::PhantomData; use futures_core::{Poll, Async, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend, AsyncSink}; +use futures_sink::Sink; /// Sink for the `Sink::with_flat_map` combinator, chaining a computation that returns an iterator /// to run prior to pushing a value into the underlying sink @@ -15,6 +15,7 @@ where St: Stream, { sink: S, + do_close: bool, f: F, stream: Option, buffer: Option, @@ -29,6 +30,7 @@ where { WithFlatMap { sink: sink, + do_close: false, f: f, stream: None, buffer: None, @@ -62,17 +64,23 @@ where fn try_empty_stream(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { if let Some(x) = self.buffer.take() { - if let AsyncSink::Pending(x) = self.sink.start_send(cx, x)? { - self.buffer = Some(x); - return Ok(Async::Pending); + match self.sink.poll_ready(cx)? { + Async::Ready(()) => self.sink.start_send(x)?, + Async::Pending => { + self.buffer = Some(x); + return Ok(Async::Pending); + } } } if let Some(mut stream) = self.stream.take() { while let Some(x) = try_ready!(stream.poll(cx)) { - if let AsyncSink::Pending(x) = self.sink.start_send(cx, x)? { - self.stream = Some(stream); - self.buffer = Some(x); - return Ok(Async::Pending); + match self.sink.poll_ready(cx)? { + Async::Ready(()) => self.sink.start_send(x)?, + Async::Pending => { + self.stream = Some(stream); + self.buffer = Some(x); + return Ok(Async::Pending); + } } } } @@ -101,26 +109,30 @@ where { type SinkItem = U; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, i: Self::SinkItem) -> StartSend { - if self.try_empty_stream(cx)?.is_not_ready() { - return Ok(AsyncSink::Pending(i)); - } + + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + self.try_empty_stream(cx) + } + + fn start_send(&mut self, i: Self::SinkItem) -> Result<(), Self::SinkError> { assert!(self.stream.is_none()); self.stream = Some((self.f)(i)); - self.try_empty_stream(cx)?; - Ok(AsyncSink::Ready) + Ok(()) } - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - if self.try_empty_stream(cx)?.is_not_ready() { - return Ok(Async::Pending); - } - self.sink.flush(cx) + + fn start_close(&mut self) -> Result<(), Self::SinkError> { + self.do_close = true; + Ok(()) } - fn close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { if self.try_empty_stream(cx)?.is_not_ready() { return Ok(Async::Pending); } - assert!(self.stream.is_none()); - self.sink.close(cx) + if self.do_close { + self.sink.start_close()?; + self.do_close = false; + } + self.sink.poll_flush(cx) } } diff --git a/futures-util/src/stream/and_then.rs b/futures-util/src/stream/and_then.rs index c29b582660..dbf3a4f06b 100644 --- a/futures-util/src/stream/and_then.rs +++ b/futures-util/src/stream/and_then.rs @@ -1,6 +1,6 @@ use futures_core::{IntoFuture, Future, Poll, Async, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// A stream combinator which chains a computation onto values produced by a /// stream. @@ -62,17 +62,7 @@ impl Sink for AndThen type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for AndThen diff --git a/futures-util/src/stream/buffer_unordered.rs b/futures-util/src/stream/buffer_unordered.rs index 0596f75140..d3e4574218 100644 --- a/futures-util/src/stream/buffer_unordered.rs +++ b/futures-util/src/stream/buffer_unordered.rs @@ -2,7 +2,7 @@ use std::fmt; use futures_core::{Async, IntoFuture, Poll, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; use stream::{Fuse, FuturesUnordered}; @@ -119,15 +119,5 @@ impl Sink for BufferUnordered type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } diff --git a/futures-util/src/stream/buffered.rs b/futures-util/src/stream/buffered.rs index f886ee9374..d12eb10dcd 100644 --- a/futures-util/src/stream/buffered.rs +++ b/futures-util/src/stream/buffered.rs @@ -2,7 +2,7 @@ use std::fmt; use futures_core::{Async, IntoFuture, Poll, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; use stream::{Fuse, FuturesOrdered}; @@ -84,18 +84,8 @@ impl Sink for Buffered { type SinkItem = S::SinkItem; type SinkError = S::SinkError; - - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + + delegate_sink!(stream); } impl Stream for Buffered diff --git a/futures-util/src/stream/chunks.rs b/futures-util/src/stream/chunks.rs index de9fee8df8..ace3a1a174 100644 --- a/futures-util/src/stream/chunks.rs +++ b/futures-util/src/stream/chunks.rs @@ -3,7 +3,7 @@ use std::prelude::v1::*; use futures_core::{Async, Poll, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; use stream::Fuse; @@ -41,17 +41,7 @@ impl Sink for Chunks type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } diff --git a/futures-util/src/stream/filter.rs b/futures-util/src/stream/filter.rs index bf4d12305a..fdfba24583 100644 --- a/futures-util/src/stream/filter.rs +++ b/futures-util/src/stream/filter.rs @@ -1,6 +1,6 @@ use futures_core::{Async, Future, IntoFuture, Poll, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// A stream combinator used to filter the results of a stream and only yield /// some values. @@ -69,17 +69,7 @@ impl Sink for Filter type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for Filter diff --git a/futures-util/src/stream/filter_map.rs b/futures-util/src/stream/filter_map.rs index 2aea1ea2f1..fbb27a36b5 100644 --- a/futures-util/src/stream/filter_map.rs +++ b/futures-util/src/stream/filter_map.rs @@ -1,6 +1,6 @@ use futures_core::{Async, Future, IntoFuture, Poll, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// A combinator used to filter the results of a stream and simultaneously map /// them to a different type. @@ -68,17 +68,7 @@ impl Sink for FilterMap type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for FilterMap diff --git a/futures-util/src/stream/flatten.rs b/futures-util/src/stream/flatten.rs index bc3f3f99cf..2cdeb1b871 100644 --- a/futures-util/src/stream/flatten.rs +++ b/futures-util/src/stream/flatten.rs @@ -1,6 +1,6 @@ use futures_core::{Poll, Async, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// A combinator used to flatten a stream-of-streams into one long stream of /// elements. @@ -57,17 +57,7 @@ impl Sink for Flatten type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for Flatten diff --git a/futures-util/src/stream/forward.rs b/futures-util/src/stream/forward.rs index 6521613f7f..3da3470040 100644 --- a/futures-util/src/stream/forward.rs +++ b/futures-util/src/stream/forward.rs @@ -1,6 +1,6 @@ use futures_core::{Async, Future, Poll, Stream}; use futures_core::task; -use futures_sink::{Sink, AsyncSink}; +use futures_sink::{Sink}; use stream::{StreamExt, Fuse}; @@ -66,14 +66,15 @@ impl Forward fn try_start_send(&mut self, cx: &mut task::Context, item: T::Item) -> Poll<(), U::SinkError> { debug_assert!(self.buffered.is_none()); - if let AsyncSink::Pending(item) = self.sink_mut() - .take().expect("Attempted to poll Forward after completion") - .start_send(cx, item)? { - self.buffered = Some(item); - return Ok(Async::Pending) + let sink_mut = self.sink_mut().take().expect("Attempted to poll Forward after completion"); + if let Async::Ready(()) = sink_mut.poll_ready(cx)? { + sink_mut.start_send(item)?; + return Ok(Async::Ready(())) + } } - Ok(Async::Ready(())) + self.buffered = Some(item); + Ok(Async::Pending) } } @@ -99,11 +100,11 @@ impl Future for Forward { Async::Ready(Some(item)) => try_ready!(self.try_start_send(cx, item)), Async::Ready(None) => { - try_ready!(self.sink_mut().take().expect("Attempted to poll Forward after completion").flush(cx)); + try_ready!(self.sink_mut().take().expect("Attempted to poll Forward after completion").poll_flush(cx)); return Ok(Async::Ready(self.take_result())) } Async::Pending => { - try_ready!(self.sink_mut().take().expect("Attempted to poll Forward after completion").flush(cx)); + try_ready!(self.sink_mut().take().expect("Attempted to poll Forward after completion").poll_flush(cx)); return Ok(Async::Pending) } } diff --git a/futures-util/src/stream/from_err.rs b/futures-util/src/stream/from_err.rs index 8b6de39c21..acfdedfe27 100644 --- a/futures-util/src/stream/from_err.rs +++ b/futures-util/src/stream/from_err.rs @@ -2,7 +2,7 @@ use core::marker::PhantomData; use futures_core::{Async, Poll, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// A stream combinator to change the error type of a stream. /// @@ -66,16 +66,6 @@ impl> Stream for FromErr { impl Sink for FromErr { type SinkItem = S::SinkItem; type SinkError = S::SinkError; - - fn start_send(&mut self, cx: &mut task::Context, item: Self::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { - self.stream.close(cx) - } + + delegate_sink!(stream); } diff --git a/futures-util/src/stream/fuse.rs b/futures-util/src/stream/fuse.rs index b4bfdff508..3e1b05888a 100644 --- a/futures-util/src/stream/fuse.rs +++ b/futures-util/src/stream/fuse.rs @@ -1,6 +1,6 @@ use futures_core::{Poll, Async, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// A stream which "fuse"s a stream once it's terminated. /// @@ -21,17 +21,7 @@ impl Sink for Fuse type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } pub fn new(s: S) -> Fuse { diff --git a/futures-util/src/stream/inspect.rs b/futures-util/src/stream/inspect.rs index 541a6b4e4b..9fc6669736 100644 --- a/futures-util/src/stream/inspect.rs +++ b/futures-util/src/stream/inspect.rs @@ -1,6 +1,6 @@ use futures_core::{Stream, Poll, Async}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// Do something with the items of a stream, passing it on. /// @@ -54,17 +54,7 @@ impl Sink for Inspect type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for Inspect diff --git a/futures-util/src/stream/inspect_err.rs b/futures-util/src/stream/inspect_err.rs index 418949203c..2952154991 100644 --- a/futures-util/src/stream/inspect_err.rs +++ b/futures-util/src/stream/inspect_err.rs @@ -1,6 +1,6 @@ use futures_core::{Stream, Poll}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// Do something with the error of a stream, passing it on. /// @@ -54,17 +54,7 @@ impl Sink for InspectErr type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for InspectErr diff --git a/futures-util/src/stream/map.rs b/futures-util/src/stream/map.rs index ab5b604ef7..880b63740b 100644 --- a/futures-util/src/stream/map.rs +++ b/futures-util/src/stream/map.rs @@ -1,6 +1,6 @@ use futures_core::{Async, Poll, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// A stream combinator which will change the type of a stream from one /// type to another. @@ -55,17 +55,7 @@ impl Sink for Map type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for Map diff --git a/futures-util/src/stream/map_err.rs b/futures-util/src/stream/map_err.rs index 46469fa5a1..d3b06eaba2 100644 --- a/futures-util/src/stream/map_err.rs +++ b/futures-util/src/stream/map_err.rs @@ -1,6 +1,6 @@ use futures_core::{Poll, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// A stream combinator which will change the error type of a stream from one /// type to another. @@ -55,17 +55,7 @@ impl Sink for MapErr type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for MapErr diff --git a/futures-util/src/stream/or_else.rs b/futures-util/src/stream/or_else.rs index 8acdcb4b46..04f9b29c3a 100644 --- a/futures-util/src/stream/or_else.rs +++ b/futures-util/src/stream/or_else.rs @@ -1,6 +1,6 @@ use futures_core::{IntoFuture, Future, Poll, Async, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; /// A stream combinator which chains a computation onto errors produced by a /// stream. @@ -35,17 +35,7 @@ impl Sink for OrElse type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for OrElse diff --git a/futures-util/src/stream/peek.rs b/futures-util/src/stream/peek.rs index 33054e7014..95f05a5ba7 100644 --- a/futures-util/src/stream/peek.rs +++ b/futures-util/src/stream/peek.rs @@ -1,6 +1,6 @@ use futures_core::{Async, Poll, Stream}; use futures_core::task; -use futures_sink::{Sink, StartSend}; +use futures_sink::{Sink}; use stream::{StreamExt, Fuse}; @@ -31,17 +31,7 @@ impl Sink for Peekable type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for Peekable { diff --git a/futures-util/src/stream/skip.rs b/futures-util/src/stream/skip.rs index 19ddc90449..f261c40655 100644 --- a/futures-util/src/stream/skip.rs +++ b/futures-util/src/stream/skip.rs @@ -1,6 +1,6 @@ use futures_core::{Poll, Async, Stream}; use futures_core::task; -use futures_sink::{StartSend, Sink}; +use futures_sink::{ Sink}; /// A stream combinator which skips a number of elements before continuing. /// @@ -53,17 +53,7 @@ impl Sink for Skip type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for Skip diff --git a/futures-util/src/stream/skip_while.rs b/futures-util/src/stream/skip_while.rs index 4c29d53d6f..97c5feb6ea 100644 --- a/futures-util/src/stream/skip_while.rs +++ b/futures-util/src/stream/skip_while.rs @@ -1,6 +1,6 @@ use futures_core::{Async, Poll, IntoFuture, Future, Stream}; use futures_core::task; -use futures_sink::{StartSend, Sink}; +use futures_sink::{ Sink}; /// A stream combinator which skips elements of a stream while a predicate /// holds. @@ -60,17 +60,7 @@ impl Sink for SkipWhile type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for SkipWhile diff --git a/futures-util/src/stream/split.rs b/futures-util/src/stream/split.rs index 5faa42192c..e86f12e3b2 100644 --- a/futures-util/src/stream/split.rs +++ b/futures-util/src/stream/split.rs @@ -4,7 +4,7 @@ use std::fmt; use futures_core::{Stream, Poll, Async}; use futures_core::task; -use futures_sink::{StartSend, Sink, AsyncSink}; +use futures_sink::{ Sink}; use lock::BiLock; @@ -13,7 +13,7 @@ use lock::BiLock; #[derive(Debug)] pub struct SplitStream(BiLock); -impl SplitStream { +impl SplitStream { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream` and `SplitSink` are /// a matching pair originating from the same call to `Stream::split`. @@ -34,16 +34,31 @@ impl Stream for SplitStream { } } +#[allow(bad_style)] +fn SplitSink(lock: BiLock) -> SplitSink { + SplitSink { + lock, + slot: None, + do_close: false, + } +} + /// A `Sink` part of the split pair #[derive(Debug)] -pub struct SplitSink(BiLock); +pub struct SplitSink { + lock: BiLock, + slot: Option, + // Whether or not to attempt to close the underlying sink when + // `slot` is emptied. + do_close: bool, +} -impl SplitSink { +impl SplitSink { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream` and `SplitSink` are /// a matching pair originating from the same call to `Stream::split`. pub fn reunite(self, other: SplitStream) -> Result> { - self.0.reunite(other.0).map_err(|err| { + self.lock.reunite(other.0).map_err(|err| { ReuniteError(SplitSink(err.0), SplitStream(err.1)) }) } @@ -53,25 +68,38 @@ impl Sink for SplitSink { type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) - -> StartSend - { - match self.0.poll_lock(cx) { - Async::Ready(mut inner) => inner.start_send(cx, item), - Async::Pending => Ok(AsyncSink::Pending(item)), + fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { + loop { + if self.slot.is_none() { + return Ok(Async::Ready(())); + } + try_ready!(self.poll_flush(cx)); } } - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - match self.0.poll_lock(cx) { - Async::Ready(mut inner) => inner.flush(cx), - Async::Pending => Ok(Async::Pending), - } + fn start_send(&mut self, item: S::SinkItem) -> Result<(), S::SinkError> { + self.slot = Some(item); + Ok(()) } - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - match self.0.poll_lock(cx) { - Async::Ready(mut inner) => inner.close(cx), + fn start_close(&mut self) -> Result<(), S::SinkError> { + self.do_close = true; + Ok(()) + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { + match self.lock.poll_lock(cx) { + Async::Ready(mut inner) => { + if self.slot.is_some() { + try_ready!(inner.poll_ready(cx)); + inner.start_send(self.slot.take().unwrap())?; + } + if self.do_close { + inner.start_close()?; + self.do_close = false; + } + inner.poll_flush(cx) + } Async::Pending => Ok(Async::Pending), } } @@ -86,9 +114,9 @@ pub fn split(s: S) -> (SplitSink, SplitStream) { /// Error indicating a `SplitSink` and `SplitStream` were not two halves /// of a `Stream + Split`, and thus could not be `reunite`d. -pub struct ReuniteError(pub SplitSink, pub SplitStream); +pub struct ReuniteError(pub SplitSink, pub SplitStream); -impl fmt::Debug for ReuniteError { +impl fmt::Debug for ReuniteError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_tuple("ReuniteError") .field(&"...") @@ -96,13 +124,13 @@ impl fmt::Debug for ReuniteError { } } -impl fmt::Display for ReuniteError { +impl fmt::Display for ReuniteError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "tried to reunite a SplitStream and SplitSink that don't form a pair") } } -impl Error for ReuniteError { +impl Error for ReuniteError { fn description(&self) -> &str { "tried to reunite a SplitStream and SplitSink that don't form a pair" } diff --git a/futures-util/src/stream/take.rs b/futures-util/src/stream/take.rs index 42ac7755bc..fb75ac8459 100644 --- a/futures-util/src/stream/take.rs +++ b/futures-util/src/stream/take.rs @@ -1,6 +1,6 @@ use futures_core::{Async, Poll, Stream}; use futures_core::task; -use futures_sink::{StartSend, Sink}; +use futures_sink::{ Sink}; /// A stream combinator which returns a maximum number of elements. /// @@ -53,17 +53,7 @@ impl Sink for Take type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for Take diff --git a/futures-util/src/stream/take_while.rs b/futures-util/src/stream/take_while.rs index 79d55eb3ae..028b29c714 100644 --- a/futures-util/src/stream/take_while.rs +++ b/futures-util/src/stream/take_while.rs @@ -1,6 +1,6 @@ use futures_core::{Async, Poll, IntoFuture, Future, Stream}; use futures_core::task; -use futures_sink::{StartSend, Sink}; +use futures_sink::{ Sink}; /// A stream combinator which takes elements from a stream while a predicate /// holds. @@ -60,17 +60,7 @@ impl Sink for TakeWhile type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for TakeWhile diff --git a/futures-util/src/stream/then.rs b/futures-util/src/stream/then.rs index 84c841c37a..c9bdeba2d9 100644 --- a/futures-util/src/stream/then.rs +++ b/futures-util/src/stream/then.rs @@ -1,6 +1,6 @@ use futures_core::{Async, IntoFuture, Future, Poll, Stream}; use futures_core::task; -use futures_sink::{StartSend, Sink}; +use futures_sink::{ Sink}; /// A stream combinator which chains a computation onto each item produced by a /// stream. @@ -35,17 +35,7 @@ impl Sink for Then type SinkItem = S::SinkItem; type SinkError = S::SinkError; - fn start_send(&mut self, cx: &mut task::Context, item: S::SinkItem) -> StartSend { - self.stream.start_send(cx, item) - } - - fn flush(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.flush(cx) - } - - fn close(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> { - self.stream.close(cx) - } + delegate_sink!(stream); } impl Stream for Then diff --git a/futures/src/lib.rs b/futures/src/lib.rs index bbe57359f3..38e51753fd 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -246,11 +246,7 @@ pub mod prelude { Poll, }; - pub use futures_sink::{ - Sink, - AsyncSink, - StartSend, - }; + pub use futures_sink::Sink; pub use futures_util::{ FutureExt,