Skip to content

Commit 3e5aa4b

Browse files
authored
Merge pull request #765 from cramertj/refactor-sink
Refactor Sink trait
2 parents eb9f603 + 0fba3db commit 3e5aa4b

40 files changed

+452
-684
lines changed

futures-channel/Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ default = ["std"]
1818
futures-core = { path = "../futures-core", version = "0.2", default-features = false }
1919

2020
[dev-dependencies]
21-
futures = { path = "../futures", version = "0.2", default-features = false }
22-
futures-executor = { path = "../futures-executor", version = "0.2", default-features = false }
21+
futures = { path = "../futures", version = "0.2", default-features = true }
22+
futures-executor = { path = "../futures-executor", version = "0.2", default-features = true }

futures-channel/benches/sync_mpsc.rs

+6-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![feature(test)]
22

3+
#[macro_use]
34
extern crate futures;
45
extern crate futures_channel;
56
extern crate test;
@@ -111,17 +112,11 @@ impl Stream for TestSender {
111112
type Error = ();
112113

113114
fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
114-
match self.tx.start_send(cx, self.last + 1) {
115-
Err(_) => panic!(),
116-
Ok(Ok(())) => {
117-
self.last += 1;
118-
assert_eq!(Ok(Async::Ready(())), self.tx.flush(cx));
119-
Ok(Async::Ready(Some(self.last)))
120-
}
121-
Ok(Err(_)) => {
122-
Ok(Async::Pending)
123-
}
124-
}
115+
try_ready!(self.tx.poll_ready(cx).map_err(|_| ()));
116+
self.tx.start_send(self.last + 1).unwrap();
117+
self.last += 1;
118+
assert!(self.tx.poll_flush(cx).unwrap().is_ready());
119+
Ok(Async::Ready(Some(self.last)))
125120
}
126121
}
127122

futures-channel/src/mpsc/mod.rs

+51-61
Original file line numberDiff line numberDiff line change
@@ -129,60 +129,63 @@ pub struct Receiver<T> {
129129
#[derive(Debug)]
130130
pub struct UnboundedReceiver<T>(Receiver<T>);
131131

132-
/// Error type for sending, used when the receiving end of a channel is
133-
/// dropped
134-
#[derive(Clone, PartialEq, Eq)]
135-
pub struct SendError<T>(T);
132+
/// The error type of `<Sender<T> as Sink>`
133+
///
134+
/// It will contain a value of type `T` if one was passed to `start_send`
135+
/// after the channel was closed.
136+
pub struct ChannelClosed<T>(Option<T>);
136137

137138
/// Error type returned from `try_send`
138139
#[derive(Clone, PartialEq, Eq)]
139-
pub struct TrySendError<T> {
140-
kind: TrySendErrorKind<T>,
140+
pub struct TryChannelClosed<T> {
141+
kind: TryChannelClosedKind<T>,
141142
}
142143

143144
#[derive(Clone, PartialEq, Eq)]
144-
enum TrySendErrorKind<T> {
145+
enum TryChannelClosedKind<T> {
145146
Full(T),
146147
Disconnected(T),
147148
}
148149

149-
impl<T> fmt::Debug for SendError<T> {
150+
impl<T> fmt::Debug for ChannelClosed<T> {
150151
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
151-
fmt.debug_tuple("SendError")
152+
fmt.debug_tuple("ChannelClosed")
152153
.field(&"...")
153154
.finish()
154155
}
155156
}
156157

157-
impl<T> fmt::Display for SendError<T> {
158+
impl<T> fmt::Display for ChannelClosed<T> {
158159
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
159160
write!(fmt, "send failed because receiver is gone")
160161
}
161162
}
162163

163-
impl<T: Any> Error for SendError<T>
164+
impl<T: Any> Error for ChannelClosed<T>
164165
{
165166
fn description(&self) -> &str {
166167
"send failed because receiver is gone"
167168
}
168169
}
169170

170-
impl<T> SendError<T> {
171+
impl<T> ChannelClosed<T> {
171172
/// Returns the message that was attempted to be sent but failed.
172-
pub fn into_inner(self) -> T {
173+
/// This method returns `None` if no item was being sent when the
174+
/// error occurred.
175+
pub fn into_inner(self) -> Option<T> {
173176
self.0
174177
}
175178
}
176179

177-
impl<T> fmt::Debug for TrySendError<T> {
180+
impl<T> fmt::Debug for TryChannelClosed<T> {
178181
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
179-
fmt.debug_tuple("TrySendError")
182+
fmt.debug_tuple("TryChannelClosed")
180183
.field(&"...")
181184
.finish()
182185
}
183186
}
184187

185-
impl<T> fmt::Display for TrySendError<T> {
188+
impl<T> fmt::Display for TryChannelClosed<T> {
186189
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
187190
if self.is_full() {
188191
write!(fmt, "send failed because channel is full")
@@ -192,7 +195,7 @@ impl<T> fmt::Display for TrySendError<T> {
192195
}
193196
}
194197

195-
impl<T: Any> Error for TrySendError<T> {
198+
impl<T: Any> Error for TryChannelClosed<T> {
196199
fn description(&self) -> &str {
197200
if self.is_full() {
198201
"send failed because channel is full"
@@ -202,10 +205,10 @@ impl<T: Any> Error for TrySendError<T> {
202205
}
203206
}
204207

205-
impl<T> TrySendError<T> {
208+
impl<T> TryChannelClosed<T> {
206209
/// Returns true if this error is a result of the channel being full
207210
pub fn is_full(&self) -> bool {
208-
use self::TrySendErrorKind::*;
211+
use self::TryChannelClosedKind::*;
209212

210213
match self.kind {
211214
Full(_) => true,
@@ -215,7 +218,7 @@ impl<T> TrySendError<T> {
215218

216219
/// Returns true if this error is a result of the receiver being dropped
217220
pub fn is_disconnected(&self) -> bool {
218-
use self::TrySendErrorKind::*;
221+
use self::TryChannelClosedKind::*;
219222

220223
match self.kind {
221224
Disconnected(_) => true,
@@ -225,7 +228,7 @@ impl<T> TrySendError<T> {
225228

226229
/// Returns the message that was attempted to be sent but failed.
227230
pub fn into_inner(self) -> T {
228-
use self::TrySendErrorKind::*;
231+
use self::TryChannelClosedKind::*;
229232

230233
match self.kind {
231234
Full(v) | Disconnected(v) => v,
@@ -395,47 +398,34 @@ impl<T> Sender<T> {
395398
/// It is not recommended to call this function from inside of a future,
396399
/// only from an external thread where you've otherwise arranged to be
397400
/// notified when the channel is no longer full.
398-
pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
401+
pub fn try_send(&mut self, msg: T) -> Result<(), TryChannelClosed<T>> {
399402
// If the sender is currently blocked, reject the message
400403
if !self.poll_unparked(None).is_ready() {
401-
return Err(TrySendError {
402-
kind: TrySendErrorKind::Full(msg),
404+
return Err(TryChannelClosed {
405+
kind: TryChannelClosedKind::Full(msg),
403406
});
404407
}
405408

406409
// The channel has capacity to accept the message, so send it
407410
self.do_send(Some(msg), None)
408-
.map_err(|SendError(v)| {
409-
TrySendError {
410-
kind: TrySendErrorKind::Disconnected(v),
411+
.map_err(|ChannelClosed(v)| {
412+
TryChannelClosed {
413+
kind: TryChannelClosedKind::Disconnected(v.unwrap()),
411414
}
412415
})
413416
}
414417

415418
/// Attempt to start sending a message on the channel.
416-
///
417-
/// If there is not room on the channel, this function will return
418-
/// `Ok(Err(msg))`, and the current `Task` will be
419-
/// awoken when the channel is ready to receive more messages.
420-
///
421-
/// On successful completion, this function will return `Ok(Ok(()))`.
422-
pub fn start_send(&mut self, cx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
423-
// If the sender is currently blocked, reject the message before doing
424-
// any work.
425-
if !self.poll_unparked(Some(cx)).is_ready() {
426-
return Ok(Err(msg));
427-
}
428-
429-
// The channel has capacity to accept the message, so send it.
430-
self.do_send(Some(msg), Some(cx))?;
431-
432-
Ok(Ok(()))
419+
/// This function should only be called after `poll_ready` has responded
420+
/// that the channel is ready to receive a message.
421+
pub fn start_send(&mut self, msg: T) -> Result<(), ChannelClosed<T>> {
422+
self.do_send(Some(msg), None)
433423
}
434424

435425
// Do the send without failing
436426
// None means close
437427
fn do_send(&mut self, msg: Option<T>, cx: Option<&mut task::Context>)
438-
-> Result<(), SendError<T>>
428+
-> Result<(), ChannelClosed<T>>
439429
{
440430
// First, increment the number of messages contained by the channel.
441431
// This operation will also atomically determine if the sender task
@@ -456,7 +446,7 @@ impl<T> Sender<T> {
456446
// num-senders + buffer + 1
457447
//
458448
if let Some(msg) = msg {
459-
return Err(SendError(msg));
449+
return Err(ChannelClosed(Some(msg)));
460450
} else {
461451
return Ok(());
462452
}
@@ -482,10 +472,10 @@ impl<T> Sender<T> {
482472
// Do the send without parking current task.
483473
//
484474
// To be called from unbounded sender.
485-
fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> {
475+
fn do_send_nb(&self, msg: T) -> Result<(), ChannelClosed<T>> {
486476
match self.inc_num_messages(false) {
487477
Some(park_self) => assert!(!park_self),
488-
None => return Err(SendError(msg)),
478+
None => return Err(ChannelClosed(Some(msg))),
489479
};
490480

491481
self.queue_push_and_signal(Some(msg));
@@ -601,15 +591,15 @@ impl<T> Sender<T> {
601591
///
602592
/// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns
603593
/// `Ok(Async::Pending)` if the channel is not guaranteed to have capacity. Returns
604-
/// `Err(SendError(_))` if the receiver has been dropped.
594+
/// `Err(ChannelClosed(_))` if the receiver has been dropped.
605595
///
606596
/// # Panics
607597
///
608598
/// This method will panic if called from outside the context of a task or future.
609-
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), SendError<()>> {
599+
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ChannelClosed<T>> {
610600
let state = decode_state(self.inner.state.load(SeqCst));
611601
if !state.is_open {
612-
return Err(SendError(()));
602+
return Err(ChannelClosed(None));
613603
}
614604

615605
Ok(self.poll_unparked(Some(cx)))
@@ -643,24 +633,24 @@ impl<T> Sender<T> {
643633
}
644634

645635
impl<T> UnboundedSender<T> {
636+
/// Check if the channel is ready to receive a message.
637+
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ChannelClosed<T>> {
638+
self.0.poll_ready(cx)
639+
}
640+
646641
/// Attempt to start sending a message on the channel.
647-
///
648-
/// If there is not room on the channel, this function will return
649-
/// `Ok(Err(msg))`, and the current `Task` will be
650-
/// awoken when the channel is ready to receive more messages.
651-
///
652-
/// On a successful `start_send`, this function will return
653-
/// `Ok(Ok(())`.
654-
pub fn start_send(&mut self, cx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
655-
self.0.start_send(cx, msg)
642+
/// This function should only be called after `poll_ready` has been used to
643+
/// verify that the channel is ready to receive a message.
644+
pub fn start_send(&mut self, msg: T) -> Result<(), ChannelClosed<T>> {
645+
self.0.start_send(msg)
656646
}
657647

658648
/// Sends the provided message along this channel.
659649
///
660650
/// This is an unbounded sender, so this function differs from `Sink::send`
661651
/// by ensuring the return type reflects that the channel is always ready to
662652
/// receive messages.
663-
pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
653+
pub fn unbounded_send(&self, msg: T) -> Result<(), ChannelClosed<T>> {
664654
self.0.do_send_nb(msg)
665655
}
666656
}

futures-channel/tests/mpsc.rs

+7-9
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,22 @@ fn send_recv_no_buffer() {
3232

3333
// Run on a task context
3434
let f = poll_fn(move |cx| {
35-
assert!(tx.flush(cx).unwrap().is_ready());
35+
assert!(tx.poll_flush(cx).unwrap().is_ready());
3636
assert!(tx.poll_ready(cx).unwrap().is_ready());
3737

3838
// Send first message
39-
let res = tx.start_send(cx, 1).unwrap();
40-
assert!(res.is_ok());
39+
assert!(tx.start_send(1).is_ok());
4140
assert!(tx.poll_ready(cx).unwrap().is_not_ready());
4241

4342
// Send second message
44-
let res = tx.start_send(cx, 2).unwrap();
45-
assert!(res.is_err());
43+
assert!(tx.poll_ready(cx).unwrap().is_not_ready());
4644

4745
// Take the value
4846
assert_eq!(rx.poll(cx).unwrap(), Async::Ready(Some(1)));
4947
assert!(tx.poll_ready(cx).unwrap().is_ready());
5048

51-
let res = tx.start_send(cx, 2).unwrap();
52-
assert!(res.is_ok());
49+
assert!(tx.poll_ready(cx).unwrap().is_ready());
50+
assert!(tx.start_send(2).is_ok());
5351
assert!(tx.poll_ready(cx).unwrap().is_not_ready());
5452

5553
// Take the value
@@ -430,7 +428,7 @@ fn stress_poll_ready() {
430428
// (asserting that it doesn't attempt to block).
431429
while self.count > 0 {
432430
try_ready!(self.sender.poll_ready(cx).map_err(|_| ()));
433-
assert!(self.sender.start_send(cx, self.count).unwrap().is_ok());
431+
assert!(self.sender.start_send(self.count).is_ok());
434432
self.count -= 1;
435433
}
436434
Ok(Async::Ready(()))
@@ -502,7 +500,7 @@ fn try_send_2() {
502500

503501
let th = thread::spawn(|| {
504502
block_on(poll_fn(|cx| {
505-
assert!(tx.start_send(cx, "fail").unwrap().is_err());
503+
assert!(tx.poll_ready(cx).unwrap().is_not_ready());
506504
Ok::<_, ()>(Async::Ready(()))
507505
})).unwrap();
508506

0 commit comments

Comments
 (0)