Skip to content

Commit 57974ae

Browse files
twittnerStjepan Glavina
authored and
Stjepan Glavina
committed
Use non-blocking connect for TcpStream. (#687)
* Use non-blocking connect for TcpStream. Instead of spawning a background thread which is unaware of any timeouts but continues to run until the TCP stack decides that the remote is not reachable we use mio's non-blocking connect. mio's `TcpStream::connect` returns immediately but the actual connection is usually just in progress and we have to be sure the socket is writeable before we can consider the connection as established. * Add Watcher::{poll_read_ready, poll_write_ready}. Following a suggestion of @stjepang we offer methods to check for read/write readiness of a `Watcher` instead of the previous approach to accept a set of `Waker`s when registering an event source. The changes relative to master are smaller and both methods look more useful in other contexts. Also the code is more robust w.r.t. wakeups of the `Waker` from clones outside the `Reactor`. I am not sure if we need to add protection mechanisms against spurious wakeups from mio. Currently we treat the `Poll::Ready(())` of `Watcher::poll_write_ready` as proof that the non-blocking connect has finished, but if the event from mio was a spurious one, it might still be ongoing.
1 parent 57f9fb7 commit 57974ae

File tree

2 files changed

+95
-27
lines changed

2 files changed

+95
-27
lines changed

Diff for: src/net/driver/mod.rs

+77-12
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,30 @@ struct Entry {
1616
token: mio::Token,
1717

1818
/// Tasks that are blocked on reading from this I/O handle.
19-
readers: Mutex<Vec<Waker>>,
19+
readers: Mutex<Readers>,
2020

2121
/// Thasks that are blocked on writing to this I/O handle.
22-
writers: Mutex<Vec<Waker>>,
22+
writers: Mutex<Writers>,
23+
}
24+
25+
/// The set of `Waker`s interested in read readiness.
26+
#[derive(Debug)]
27+
struct Readers {
28+
/// Flag indicating read readiness.
29+
/// (cf. `Watcher::poll_read_ready`)
30+
ready: bool,
31+
/// The `Waker`s blocked on reading.
32+
wakers: Vec<Waker>
33+
}
34+
35+
/// The set of `Waker`s interested in write readiness.
36+
#[derive(Debug)]
37+
struct Writers {
38+
/// Flag indicating write readiness.
39+
/// (cf. `Watcher::poll_write_ready`)
40+
ready: bool,
41+
/// The `Waker`s blocked on writing.
42+
wakers: Vec<Waker>
2343
}
2444

2545
/// The state of a networking driver.
@@ -68,8 +88,8 @@ impl Reactor {
6888
// Allocate an entry and insert it into the slab.
6989
let entry = Arc::new(Entry {
7090
token,
71-
readers: Mutex::new(Vec::new()),
72-
writers: Mutex::new(Vec::new()),
91+
readers: Mutex::new(Readers { ready: false, wakers: Vec::new() }),
92+
writers: Mutex::new(Writers { ready: false, wakers: Vec::new() }),
7393
});
7494
vacant.insert(entry.clone());
7595

@@ -144,14 +164,18 @@ fn main_loop() -> io::Result<()> {
144164

145165
// Wake up reader tasks blocked on this I/O handle.
146166
if !(readiness & reader_interests()).is_empty() {
147-
for w in entry.readers.lock().unwrap().drain(..) {
167+
let mut readers = entry.readers.lock().unwrap();
168+
readers.ready = true;
169+
for w in readers.wakers.drain(..) {
148170
w.wake();
149171
}
150172
}
151173

152174
// Wake up writer tasks blocked on this I/O handle.
153175
if !(readiness & writer_interests()).is_empty() {
154-
for w in entry.writers.lock().unwrap().drain(..) {
176+
let mut writers = entry.writers.lock().unwrap();
177+
writers.ready = true;
178+
for w in writers.wakers.drain(..) {
155179
w.wake();
156180
}
157181
}
@@ -207,7 +231,7 @@ impl<T: Evented> Watcher<T> {
207231
}
208232

209233
// Lock the waker list.
210-
let mut list = self.entry.readers.lock().unwrap();
234+
let mut readers = self.entry.readers.lock().unwrap();
211235

212236
// Try running the operation again.
213237
match f(self.source.as_ref().unwrap()) {
@@ -216,10 +240,12 @@ impl<T: Evented> Watcher<T> {
216240
}
217241

218242
// Register the task if it isn't registered already.
219-
if list.iter().all(|w| !w.will_wake(cx.waker())) {
220-
list.push(cx.waker().clone());
243+
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
244+
readers.wakers.push(cx.waker().clone());
221245
}
222246

247+
readers.ready = false;
248+
223249
Poll::Pending
224250
}
225251

@@ -242,7 +268,7 @@ impl<T: Evented> Watcher<T> {
242268
}
243269

244270
// Lock the waker list.
245-
let mut list = self.entry.writers.lock().unwrap();
271+
let mut writers = self.entry.writers.lock().unwrap();
246272

247273
// Try running the operation again.
248274
match f(self.source.as_ref().unwrap()) {
@@ -251,10 +277,49 @@ impl<T: Evented> Watcher<T> {
251277
}
252278

253279
// Register the task if it isn't registered already.
254-
if list.iter().all(|w| !w.will_wake(cx.waker())) {
255-
list.push(cx.waker().clone());
280+
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
281+
writers.wakers.push(cx.waker().clone());
256282
}
257283

284+
writers.ready = false;
285+
286+
Poll::Pending
287+
}
288+
289+
/// Polls the inner I/O source until a non-blocking read can be performed.
290+
///
291+
/// If non-blocking reads are currently not possible, the `Waker`
292+
/// will be saved and notified when it can read non-blocking
293+
/// again.
294+
#[allow(dead_code)]
295+
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
296+
// Lock the waker list.
297+
let mut readers = self.entry.readers.lock().unwrap();
298+
if readers.ready {
299+
return Poll::Ready(())
300+
}
301+
// Register the task if it isn't registered already.
302+
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
303+
readers.wakers.push(cx.waker().clone());
304+
}
305+
Poll::Pending
306+
}
307+
308+
/// Polls the inner I/O source until a non-blocking write can be performed.
309+
///
310+
/// If non-blocking writes are currently not possible, the `Waker`
311+
/// will be saved and notified when it can write non-blocking
312+
/// again.
313+
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
314+
// Lock the waker list.
315+
let mut writers = self.entry.writers.lock().unwrap();
316+
if writers.ready {
317+
return Poll::Ready(())
318+
}
319+
// Register the task if it isn't registered already.
320+
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
321+
writers.wakers.push(cx.waker().clone());
322+
}
258323
Poll::Pending
259324
}
260325

Diff for: src/net/tcp/stream.rs

+18-15
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ use crate::future;
66
use crate::io::{self, Read, Write};
77
use crate::net::driver::Watcher;
88
use crate::net::ToSocketAddrs;
9-
use crate::task::{spawn_blocking, Context, Poll};
10-
use crate::utils::Context as _;
9+
use crate::task::{Context, Poll};
1110

1211
/// A TCP stream between a local and a remote socket.
1312
///
@@ -77,20 +76,24 @@ impl TcpStream {
7776
.await?;
7877

7978
for addr in addrs {
80-
let res = spawn_blocking(move || {
81-
let std_stream = std::net::TcpStream::connect(addr)
82-
.context(|| format!("could not connect to {}", addr))?;
83-
let mio_stream = mio::net::TcpStream::from_stream(std_stream)
84-
.context(|| format!("could not open async connection to {}", addr))?;
85-
Ok(TcpStream {
86-
watcher: Watcher::new(mio_stream),
87-
})
88-
})
89-
.await;
79+
// mio's TcpStream::connect is non-blocking and may just be in progress
80+
// when it returns with `Ok`. We therefore wait for write readiness to
81+
// be sure the connection has either been established or there was an
82+
// error which we check for afterwards.
83+
let watcher = match mio::net::TcpStream::connect(&addr) {
84+
Ok(s) => Watcher::new(s),
85+
Err(e) => {
86+
last_err = Some(e);
87+
continue
88+
}
89+
};
9090

91-
match res {
92-
Ok(stream) => return Ok(stream),
93-
Err(err) => last_err = Some(err),
91+
future::poll_fn(|cx| watcher.poll_write_ready(cx)).await;
92+
93+
match watcher.get_ref().take_error() {
94+
Ok(None) => return Ok(TcpStream { watcher }),
95+
Ok(Some(e)) => last_err = Some(e),
96+
Err(e) => last_err = Some(e)
9497
}
9598
}
9699

0 commit comments

Comments
 (0)