Skip to content

Commit 4b0e217

Browse files
Merge pull request #1240 from carllerche/fix-mpsc-poll-complete
fix mpsc::Sender::poll_complete impl
2 parents 12ca605 + 4707a1a commit 4b0e217

File tree

4 files changed

+31
-2
lines changed

4 files changed

+31
-2
lines changed

benches/sync_mpsc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![feature(test)]
22

3+
#[macro_use]
34
extern crate futures;
45
extern crate test;
56

@@ -106,7 +107,6 @@ impl Stream for TestSender {
106107
Err(_) => panic!(),
107108
Ok(AsyncSink::Ready) => {
108109
self.last += 1;
109-
assert_eq!(Ok(Async::Ready(())), self.tx.poll_complete());
110110
Ok(Async::Ready(Some(self.last)))
111111
}
112112
Ok(AsyncSink::NotReady(_)) => {

src/sync/mpsc/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,14 @@ impl<T> Sink for Sender<T> {
649649
}
650650

651651
fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
652-
Ok(Async::Ready(()))
652+
self.poll_ready()
653+
// At this point, the value cannot be returned and `SendError`
654+
// cannot be created with a `T` without breaking backwards
655+
// comptibility. This means we cannot return an error.
656+
//
657+
// That said, there is also no guarantee that a `poll_complete`
658+
// returning `Ok` implies the receiver sees the message.
659+
.or_else(|_| Ok(().into()))
653660
}
654661

655662
fn close(&mut self) -> Poll<(), SendError<T>> {

tests/mpsc.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,3 +548,19 @@ fn try_send_fail() {
548548
assert_eq!(rx.next(), Some(Ok("goodbye")));
549549
assert!(rx.next().is_none());
550550
}
551+
552+
#[test]
553+
fn bounded_is_really_bounded() {
554+
use futures::Async::*;
555+
let (mut tx, mut rx) = mpsc::channel(0);
556+
lazy(|| {
557+
assert!(tx.start_send(1).unwrap().is_ready());
558+
// Not ready until we receive
559+
assert!(!tx.poll_complete().unwrap().is_ready());
560+
// Receive the value
561+
assert_eq!(rx.poll().unwrap(), Ready(Some(1)));
562+
// Now the sender is ready
563+
assert!(tx.poll_complete().unwrap().is_ready());
564+
Ok::<_, ()>(())
565+
}).wait().unwrap();
566+
}

tests/sink.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,12 @@ fn fanout_backpressure() {
392392
let (item, right_recv) = right_recv.into_future().wait().unwrap();
393393
assert_eq!(item, Some(1));
394394
assert!(flag.get());
395+
let (item, left_recv) = left_recv.into_future().wait().unwrap();
396+
assert_eq!(item, Some(2));
397+
assert!(flag.get());
398+
assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
399+
let (item, right_recv) = right_recv.into_future().wait().unwrap();
400+
assert_eq!(item, Some(2));
395401
match task.poll_future_notify(&flag, 0).unwrap() {
396402
Async::Ready(_) => {
397403
},

0 commit comments

Comments
 (0)