Skip to content

Refactor Sink trait #765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions futures-channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ default = ["std"]
futures-core = { path = "../futures-core", version = "0.2", default-features = false }

[dev-dependencies]
futures = { path = "../futures", version = "0.2", default-features = false }
futures-executor = { path = "../futures-executor", version = "0.2", default-features = false }
futures = { path = "../futures", version = "0.2", default-features = true }
futures-executor = { path = "../futures-executor", version = "0.2", default-features = true }
17 changes: 6 additions & 11 deletions futures-channel/benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(test)]

#[macro_use]
extern crate futures;
extern crate futures_channel;
extern crate test;
Expand Down Expand Up @@ -111,17 +112,11 @@ impl Stream for TestSender {
type Error = ();

fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
match self.tx.start_send(cx, self.last + 1) {
Err(_) => panic!(),
Ok(Ok(())) => {
self.last += 1;
assert_eq!(Ok(Async::Ready(())), self.tx.flush(cx));
Ok(Async::Ready(Some(self.last)))
}
Ok(Err(_)) => {
Ok(Async::Pending)
}
}
try_ready!(self.tx.poll_ready(cx).map_err(|_| ()));
self.tx.start_send(self.last + 1).unwrap();
self.last += 1;
assert!(self.tx.poll_flush(cx).unwrap().is_ready());
Ok(Async::Ready(Some(self.last)))
}
}

Expand Down
112 changes: 51 additions & 61 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,60 +129,63 @@ pub struct Receiver<T> {
#[derive(Debug)]
pub struct UnboundedReceiver<T>(Receiver<T>);

/// Error type for sending, used when the receiving end of a channel is
/// dropped
#[derive(Clone, PartialEq, Eq)]
pub struct SendError<T>(T);
/// The error type of `<Sender<T> as Sink>`
///
/// It will contain a value of type `T` if one was passed to `start_send`
/// after the channel was closed.
pub struct ChannelClosed<T>(Option<T>);

/// Error type returned from `try_send`
#[derive(Clone, PartialEq, Eq)]
pub struct TrySendError<T> {
kind: TrySendErrorKind<T>,
pub struct TryChannelClosed<T> {
kind: TryChannelClosedKind<T>,
}

#[derive(Clone, PartialEq, Eq)]
enum TrySendErrorKind<T> {
enum TryChannelClosedKind<T> {
Full(T),
Disconnected(T),
}

impl<T> fmt::Debug for SendError<T> {
impl<T> fmt::Debug for ChannelClosed<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("SendError")
fmt.debug_tuple("ChannelClosed")
.field(&"...")
.finish()
}
}

impl<T> fmt::Display for SendError<T> {
impl<T> fmt::Display for ChannelClosed<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "send failed because receiver is gone")
}
}

impl<T: Any> Error for SendError<T>
impl<T: Any> Error for ChannelClosed<T>
{
fn description(&self) -> &str {
"send failed because receiver is gone"
}
}

impl<T> SendError<T> {
impl<T> ChannelClosed<T> {
/// Returns the message that was attempted to be sent but failed.
pub fn into_inner(self) -> T {
/// This method returns `None` if no item was being sent when the
/// error occurred.
pub fn into_inner(self) -> Option<T> {
self.0
}
}

impl<T> fmt::Debug for TrySendError<T> {
impl<T> fmt::Debug for TryChannelClosed<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("TrySendError")
fmt.debug_tuple("TryChannelClosed")
.field(&"...")
.finish()
}
}

impl<T> fmt::Display for TrySendError<T> {
impl<T> fmt::Display for TryChannelClosed<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
if self.is_full() {
write!(fmt, "send failed because channel is full")
Expand All @@ -192,7 +195,7 @@ impl<T> fmt::Display for TrySendError<T> {
}
}

impl<T: Any> Error for TrySendError<T> {
impl<T: Any> Error for TryChannelClosed<T> {
fn description(&self) -> &str {
if self.is_full() {
"send failed because channel is full"
Expand All @@ -202,10 +205,10 @@ impl<T: Any> Error for TrySendError<T> {
}
}

impl<T> TrySendError<T> {
impl<T> TryChannelClosed<T> {
/// Returns true if this error is a result of the channel being full
pub fn is_full(&self) -> bool {
use self::TrySendErrorKind::*;
use self::TryChannelClosedKind::*;

match self.kind {
Full(_) => true,
Expand All @@ -215,7 +218,7 @@ impl<T> TrySendError<T> {

/// Returns true if this error is a result of the receiver being dropped
pub fn is_disconnected(&self) -> bool {
use self::TrySendErrorKind::*;
use self::TryChannelClosedKind::*;

match self.kind {
Disconnected(_) => true,
Expand All @@ -225,7 +228,7 @@ impl<T> TrySendError<T> {

/// Returns the message that was attempted to be sent but failed.
pub fn into_inner(self) -> T {
use self::TrySendErrorKind::*;
use self::TryChannelClosedKind::*;

match self.kind {
Full(v) | Disconnected(v) => v,
Expand Down Expand Up @@ -395,47 +398,34 @@ impl<T> Sender<T> {
/// It is not recommended to call this function from inside of a future,
/// only from an external thread where you've otherwise arranged to be
/// notified when the channel is no longer full.
pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
pub fn try_send(&mut self, msg: T) -> Result<(), TryChannelClosed<T>> {
// If the sender is currently blocked, reject the message
if !self.poll_unparked(None).is_ready() {
return Err(TrySendError {
kind: TrySendErrorKind::Full(msg),
return Err(TryChannelClosed {
kind: TryChannelClosedKind::Full(msg),
});
}

// The channel has capacity to accept the message, so send it
self.do_send(Some(msg), None)
.map_err(|SendError(v)| {
TrySendError {
kind: TrySendErrorKind::Disconnected(v),
.map_err(|ChannelClosed(v)| {
TryChannelClosed {
kind: TryChannelClosedKind::Disconnected(v.unwrap()),
}
})
}

/// Attempt to start sending a message on the channel.
///
/// If there is not room on the channel, this function will return
/// `Ok(Err(msg))`, and the current `Task` will be
/// awoken when the channel is ready to receive more messages.
///
/// On successful completion, this function will return `Ok(Ok(()))`.
pub fn start_send(&mut self, cx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
// If the sender is currently blocked, reject the message before doing
// any work.
if !self.poll_unparked(Some(cx)).is_ready() {
return Ok(Err(msg));
}

// The channel has capacity to accept the message, so send it.
self.do_send(Some(msg), Some(cx))?;

Ok(Ok(()))
/// This function should only be called after `poll_ready` has responded
/// that the channel is ready to receive a message.
pub fn start_send(&mut self, msg: T) -> Result<(), ChannelClosed<T>> {
self.do_send(Some(msg), None)
}

// Do the send without failing
// None means close
fn do_send(&mut self, msg: Option<T>, cx: Option<&mut task::Context>)
-> Result<(), SendError<T>>
-> Result<(), ChannelClosed<T>>
{
// First, increment the number of messages contained by the channel.
// This operation will also atomically determine if the sender task
Expand All @@ -456,7 +446,7 @@ impl<T> Sender<T> {
// num-senders + buffer + 1
//
if let Some(msg) = msg {
return Err(SendError(msg));
return Err(ChannelClosed(Some(msg)));
} else {
return Ok(());
}
Expand All @@ -482,10 +472,10 @@ impl<T> Sender<T> {
// Do the send without parking current task.
//
// To be called from unbounded sender.
fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> {
fn do_send_nb(&self, msg: T) -> Result<(), ChannelClosed<T>> {
match self.inc_num_messages(false) {
Some(park_self) => assert!(!park_self),
None => return Err(SendError(msg)),
None => return Err(ChannelClosed(Some(msg))),
};

self.queue_push_and_signal(Some(msg));
Expand Down Expand Up @@ -601,15 +591,15 @@ impl<T> Sender<T> {
///
/// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns
/// `Ok(Async::Pending)` if the channel is not guaranteed to have capacity. Returns
/// `Err(SendError(_))` if the receiver has been dropped.
/// `Err(ChannelClosed(_))` if the receiver has been dropped.
///
/// # Panics
///
/// This method will panic if called from outside the context of a task or future.
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), SendError<()>> {
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ChannelClosed<T>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
return Err(SendError(()));
return Err(ChannelClosed(None));
}

Ok(self.poll_unparked(Some(cx)))
Expand Down Expand Up @@ -643,24 +633,24 @@ impl<T> Sender<T> {
}

impl<T> UnboundedSender<T> {
/// Check if the channel is ready to receive a message.
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ChannelClosed<T>> {
self.0.poll_ready(cx)
}

/// Attempt to start sending a message on the channel.
///
/// If there is not room on the channel, this function will return
/// `Ok(Err(msg))`, and the current `Task` will be
/// awoken when the channel is ready to receive more messages.
///
/// On a successful `start_send`, this function will return
/// `Ok(Ok(())`.
pub fn start_send(&mut self, cx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
self.0.start_send(cx, msg)
/// This function should only be called after `poll_ready` has been used to
/// verify that the channel is ready to receive a message.
pub fn start_send(&mut self, msg: T) -> Result<(), ChannelClosed<T>> {
self.0.start_send(msg)
}

/// Sends the provided message along this channel.
///
/// This is an unbounded sender, so this function differs from `Sink::send`
/// by ensuring the return type reflects that the channel is always ready to
/// receive messages.
pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
pub fn unbounded_send(&self, msg: T) -> Result<(), ChannelClosed<T>> {
self.0.do_send_nb(msg)
}
}
Expand Down
16 changes: 7 additions & 9 deletions futures-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,22 @@ fn send_recv_no_buffer() {

// Run on a task context
let f = poll_fn(move |cx| {
assert!(tx.flush(cx).unwrap().is_ready());
assert!(tx.poll_flush(cx).unwrap().is_ready());
assert!(tx.poll_ready(cx).unwrap().is_ready());

// Send first message
let res = tx.start_send(cx, 1).unwrap();
assert!(res.is_ok());
assert!(tx.start_send(1).is_ok());
assert!(tx.poll_ready(cx).unwrap().is_not_ready());

// Send second message
let res = tx.start_send(cx, 2).unwrap();
assert!(res.is_err());
assert!(tx.poll_ready(cx).unwrap().is_not_ready());

// Take the value
assert_eq!(rx.poll(cx).unwrap(), Async::Ready(Some(1)));
assert!(tx.poll_ready(cx).unwrap().is_ready());

let res = tx.start_send(cx, 2).unwrap();
assert!(res.is_ok());
assert!(tx.poll_ready(cx).unwrap().is_ready());
assert!(tx.start_send(2).is_ok());
assert!(tx.poll_ready(cx).unwrap().is_not_ready());

// Take the value
Expand Down Expand Up @@ -430,7 +428,7 @@ fn stress_poll_ready() {
// (asserting that it doesn't attempt to block).
while self.count > 0 {
try_ready!(self.sender.poll_ready(cx).map_err(|_| ()));
assert!(self.sender.start_send(cx, self.count).unwrap().is_ok());
assert!(self.sender.start_send(self.count).is_ok());
self.count -= 1;
}
Ok(Async::Ready(()))
Expand Down Expand Up @@ -502,7 +500,7 @@ fn try_send_2() {

let th = thread::spawn(|| {
block_on(poll_fn(|cx| {
assert!(tx.start_send(cx, "fail").unwrap().is_err());
assert!(tx.poll_ready(cx).unwrap().is_not_ready());
Ok::<_, ()>(Async::Ready(()))
})).unwrap();

Expand Down
Loading