Skip to content

Commit 12ca605

Browse files
Merge pull request #1241 from carllerche/fix-mpsc-close-race
fix race with dropping mpsc::Receiver
2 parents 0c15b63 + 7b38d8f commit 12ca605

File tree

2 files changed

+168
-16
lines changed

2 files changed

+168
-16
lines changed

src/sync/mpsc/mod.rs

+37-16
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,12 @@ impl<T> Receiver<T> {
813813
loop {
814814
match unsafe { self.inner.message_queue.pop() } {
815815
PopResult::Data(msg) => {
816+
// If there are any parked task handles in the parked queue,
817+
// pop one and unpark it.
818+
self.unpark_one();
819+
// Decrement number of messages
820+
self.dec_num_messages();
821+
816822
return Async::Ready(msg);
817823
}
818824
PopResult::Empty => {
@@ -863,7 +869,7 @@ impl<T> Receiver<T> {
863869
let state = decode_state(curr);
864870

865871
// If the channel is closed, then there is no need to park.
866-
if !state.is_open && state.num_messages == 0 {
872+
if state.is_closed() {
867873
return TryPark::Closed;
868874
}
869875

@@ -904,8 +910,8 @@ impl<T> Stream for Receiver<T> {
904910
fn poll(&mut self) -> Poll<Option<T>, ()> {
905911
loop {
906912
// Try to read a message off of the message queue.
907-
let msg = match self.next_message() {
908-
Async::Ready(msg) => msg,
913+
match self.next_message() {
914+
Async::Ready(msg) => return Ok(Async::Ready(msg)),
909915
Async::NotReady => {
910916
// There are no messages to read, in this case, attempt to
911917
// park. The act of parking will verify that the channel is
@@ -929,17 +935,7 @@ impl<T> Stream for Receiver<T> {
929935
}
930936
}
931937
}
932-
};
933-
934-
// If there are any parked task handles in the parked queue, pop
935-
// one and unpark it.
936-
self.unpark_one();
937-
938-
// Decrement number of messages
939-
self.dec_num_messages();
940-
941-
// Return the message
942-
return Ok(Async::Ready(msg));
938+
}
943939
}
944940
}
945941
}
@@ -948,8 +944,27 @@ impl<T> Drop for Receiver<T> {
948944
fn drop(&mut self) {
949945
// Drain the channel of all pending messages
950946
self.close();
951-
while self.next_message().is_ready() {
952-
// ...
947+
948+
loop {
949+
match self.next_message() {
950+
Async::Ready(_) => {}
951+
Async::NotReady => {
952+
let curr = self.inner.state.load(SeqCst);
953+
let state = decode_state(curr);
954+
955+
// If the channel is closed, then there is no need to park.
956+
if state.is_closed() {
957+
return;
958+
}
959+
960+
// TODO: Spinning isn't ideal, it might be worth
961+
// investigating using a condvar or some other strategy
962+
// here. That said, if this case is hit, then another thread
963+
// is about to push the value into the queue and this isn't
964+
// the only spinlock in the impl right now.
965+
thread::yield_now();
966+
}
967+
}
953968
}
954969
}
955970
}
@@ -1125,6 +1140,12 @@ impl<T> Inner<T> {
11251140
unsafe impl<T: Send> Send for Inner<T> {}
11261141
unsafe impl<T: Send> Sync for Inner<T> {}
11271142

1143+
impl State {
1144+
fn is_closed(&self) -> bool {
1145+
!self.is_open && self.num_messages == 0
1146+
}
1147+
}
1148+
11281149
/*
11291150
*
11301151
* ===== Helpers =====

tests/mpsc-close.rs

+131
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
extern crate futures;
22

3+
use std::sync::{Arc, Weak};
34
use std::thread;
5+
use std::time::{Duration, Instant};
46

57
use futures::prelude::*;
68
use futures::sync::mpsc::*;
9+
use futures::task;
710

811
#[test]
912
fn smoke() {
@@ -19,3 +22,131 @@ fn smoke() {
1922

2023
t.join().unwrap()
2124
}
25+
26+
// Stress test that `try_send()`s occurring concurrently with receiver
27+
// close/drops don't appear as successful sends.
28+
#[test]
29+
fn stress_try_send_as_receiver_closes() {
30+
const AMT: usize = 10000;
31+
// To provide variable timing characteristics (in the hopes of
32+
// reproducing the collision that leads to a race), we busy-re-poll
33+
// the test MPSC receiver a variable number of times before actually
34+
// stopping. We vary this countdown between 1 and the following
35+
// value.
36+
const MAX_COUNTDOWN: usize = 20;
37+
// When we detect that a successfully sent item is still in the
38+
// queue after a disconnect, we spin for up to 100ms to confirm that
39+
// it is a persistent condition and not a concurrency illusion.
40+
const SPIN_TIMEOUT_S: u64 = 10;
41+
const SPIN_SLEEP_MS: u64 = 10;
42+
struct TestRx {
43+
rx: Receiver<Arc<()>>,
44+
// The number of times to query `rx` before dropping it.
45+
poll_count: usize
46+
}
47+
struct TestTask {
48+
command_rx: Receiver<TestRx>,
49+
test_rx: Option<Receiver<Arc<()>>>,
50+
countdown: usize,
51+
}
52+
impl TestTask {
53+
/// Create a new TestTask
54+
fn new() -> (TestTask, Sender<TestRx>) {
55+
let (command_tx, command_rx) = channel::<TestRx>(0);
56+
(
57+
TestTask {
58+
command_rx: command_rx,
59+
test_rx: None,
60+
countdown: 0, // 0 means no countdown is in progress.
61+
},
62+
command_tx,
63+
)
64+
}
65+
}
66+
impl Future for TestTask {
67+
type Item = ();
68+
type Error = ();
69+
fn poll(&mut self) -> Poll<(), ()> {
70+
// Poll the test channel, if one is present.
71+
if let Some(ref mut rx) = self.test_rx {
72+
if let Ok(Async::Ready(v)) = rx.poll() {
73+
let _ = v.expect("test finished unexpectedly!");
74+
}
75+
self.countdown -= 1;
76+
// Busy-poll until the countdown is finished.
77+
task::current().notify();
78+
}
79+
// Accept any newly submitted MPSC channels for testing.
80+
match self.command_rx.poll()? {
81+
Async::Ready(Some(TestRx { rx, poll_count })) => {
82+
self.test_rx = Some(rx);
83+
self.countdown = poll_count;
84+
task::current().notify();
85+
},
86+
Async::Ready(None) => return Ok(Async::Ready(())),
87+
_ => {},
88+
}
89+
if self.countdown == 0 {
90+
// Countdown complete -- drop the Receiver.
91+
self.test_rx = None;
92+
}
93+
Ok(Async::NotReady)
94+
}
95+
}
96+
let (f, mut cmd_tx) = TestTask::new();
97+
let bg = thread::spawn(move || f.wait());
98+
for i in 0..AMT {
99+
let (mut test_tx, rx) = channel(0);
100+
let poll_count = i % MAX_COUNTDOWN;
101+
cmd_tx.try_send(TestRx { rx: rx, poll_count: poll_count }).unwrap();
102+
let mut prev_weak: Option<Weak<()>> = None;
103+
let mut attempted_sends = 0;
104+
let mut successful_sends = 0;
105+
loop {
106+
// Create a test item.
107+
let item = Arc::new(());
108+
let weak = Arc::downgrade(&item);
109+
match test_tx.try_send(item) {
110+
Ok(_) => {
111+
prev_weak = Some(weak);
112+
successful_sends += 1;
113+
}
114+
Err(ref e) if e.is_full() => {}
115+
Err(ref e) if e.is_disconnected() => {
116+
// Test for evidence of the race condition.
117+
if let Some(prev_weak) = prev_weak {
118+
if prev_weak.upgrade().is_some() {
119+
// The previously sent item is still allocated.
120+
// However, there appears to be some aspect of the
121+
// concurrency that can legitimately cause the Arc
122+
// to be momentarily valid. Spin for up to 100ms
123+
// waiting for the previously sent item to be
124+
// dropped.
125+
let t0 = Instant::now();
126+
let mut spins = 0;
127+
loop {
128+
if prev_weak.upgrade().is_none() {
129+
break;
130+
}
131+
assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
132+
"item not dropped on iteration {} after \
133+
{} sends ({} successful). spin=({})",
134+
i, attempted_sends, successful_sends, spins
135+
);
136+
spins += 1;
137+
thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
138+
}
139+
}
140+
}
141+
break;
142+
}
143+
Err(ref e) => panic!("unexpected error: {}", e),
144+
}
145+
attempted_sends += 1;
146+
}
147+
}
148+
drop(cmd_tx);
149+
bg.join()
150+
.expect("background thread join")
151+
.expect("background thread result");
152+
}

0 commit comments

Comments
 (0)