Skip to content

New scheduler resilient to blocking #631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ default = [
"async-task",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-queue",
"futures-timer",
"kv-log-macro",
"log",
Expand Down Expand Up @@ -56,6 +57,7 @@ async-task = { version = "1.0.0", optional = true }
broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] }
crossbeam-channel = { version = "0.4.0", optional = true }
crossbeam-deque = { version = "0.7.2", optional = true }
crossbeam-queue = { version = "0.2.0", optional = true }
crossbeam-utils = { version = "0.7.0", optional = true }
futures-core = { version = "0.3.1", optional = true }
futures-io = { version = "0.3.1", optional = true }
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ cfg_default! {
pub mod fs;
pub mod path;
pub mod net;
pub(crate) mod rt;
}

cfg_unstable! {
Expand Down
1 change: 0 additions & 1 deletion src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,5 @@ pub use tcp::{Incoming, TcpListener, TcpStream};
pub use udp::UdpSocket;

mod addr;
pub(crate) mod driver;
mod tcp;
mod udp;
2 changes: 1 addition & 1 deletion src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::pin::Pin;

use crate::future;
use crate::io;
use crate::net::driver::Watcher;
use crate::rt::Watcher;
use crate::net::{TcpStream, ToSocketAddrs};
use crate::stream::Stream;
use crate::task::{Context, Poll};
Expand Down
2 changes: 1 addition & 1 deletion src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::pin::Pin;

use crate::future;
use crate::io::{self, Read, Write};
use crate::net::driver::Watcher;
use crate::rt::Watcher;
use crate::net::ToSocketAddrs;
use crate::task::{spawn_blocking, Context, Poll};
use crate::utils::Context as _;
Expand Down
4 changes: 2 additions & 2 deletions src/net/udp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::net::SocketAddr;
use std::net::{Ipv4Addr, Ipv6Addr};

use crate::future;
use crate::net::driver::Watcher;
use crate::net::ToSocketAddrs;
use crate::rt::Watcher;
use crate::utils::Context as _;

/// A UDP socket.
Expand Down Expand Up @@ -102,7 +102,7 @@ impl UdpSocket {
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::net::UdpSocket;
/// use async_std::net::UdpSocket;
///
/// let socket = UdpSocket::bind("127.0.0.1:0").await?;
/// let addr = socket.local_addr()?;
Expand Down
2 changes: 1 addition & 1 deletion src/os/unix/net/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use mio_uds;
use super::SocketAddr;
use crate::future;
use crate::io;
use crate::net::driver::Watcher;
use crate::rt::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path;
use crate::task::spawn_blocking;
Expand Down
2 changes: 1 addition & 1 deletion src/os/unix/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::SocketAddr;
use super::UnixStream;
use crate::future;
use crate::io;
use crate::net::driver::Watcher;
use crate::rt::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path;
use crate::stream::Stream;
Expand Down
2 changes: 1 addition & 1 deletion src/os/unix/net/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use mio_uds;

use super::SocketAddr;
use crate::io::{self, Read, Write};
use crate::net::driver::Watcher;
use crate::rt::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path;
use crate::task::{spawn_blocking, Context, Poll};
Expand Down
23 changes: 23 additions & 0 deletions src/rt/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//! The runtime.

use std::thread;

use once_cell::sync::Lazy;

use crate::utils::abort_on_panic;

pub use reactor::{Reactor, Watcher};
pub use runtime::Runtime;

mod reactor;
mod runtime;

/// The global runtime.
pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
thread::Builder::new()
.name("async-std/runtime".to_string())
.spawn(|| abort_on_panic(|| RUNTIME.run()))
.expect("cannot start a runtime thread");

Runtime::new()
});
109 changes: 40 additions & 69 deletions src/net/driver/mod.rs → src/rt/reactor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use mio::{self, Evented};
use once_cell::sync::Lazy;
use slab::Slab;

use crate::io;
use crate::rt::RUNTIME;
use crate::task::{Context, Poll, Waker};
use crate::utils::abort_on_panic;

/// Data associated with a registered I/O handle.
#[derive(Debug)]
Expand All @@ -18,15 +18,18 @@ struct Entry {
/// Tasks that are blocked on reading from this I/O handle.
readers: Mutex<Vec<Waker>>,

/// Thasks that are blocked on writing to this I/O handle.
/// Tasks that are blocked on writing to this I/O handle.
writers: Mutex<Vec<Waker>>,
}

/// The state of a networking driver.
struct Reactor {
pub struct Reactor {
/// A mio instance that polls for new events.
poller: mio::Poll,

/// A list into which mio stores events.
events: Mutex<mio::Events>,

/// A collection of registered I/O handles.
entries: Mutex<Slab<Arc<Entry>>>,

Expand All @@ -39,12 +42,13 @@ struct Reactor {

impl Reactor {
/// Creates a new reactor for polling I/O events.
fn new() -> io::Result<Reactor> {
pub fn new() -> io::Result<Reactor> {
let poller = mio::Poll::new()?;
let notify_reg = mio::Registration::new2();

let mut reactor = Reactor {
poller,
events: Mutex::new(mio::Events::with_capacity(1000)),
entries: Mutex::new(Slab::new()),
notify_reg,
notify_token: mio::Token(0),
Expand Down Expand Up @@ -92,72 +96,60 @@ impl Reactor {
Ok(())
}

// fn notify(&self) {
// self.notify_reg
// .1
// .set_readiness(mio::Ready::readable())
// .unwrap();
// }
}
/// Notifies the reactor so that polling stops blocking.
pub fn notify(&self) -> io::Result<()> {
self.notify_reg.1.set_readiness(mio::Ready::readable())
}

/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
///
/// Returns `Ok(true)` if at least one new task was woken.
pub fn poll(&self, timeout: Option<Duration>) -> io::Result<bool> {
let mut events = self.events.lock().unwrap();

/// The state of the global networking driver.
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
// handles.
std::thread::Builder::new()
.name("async-std/net".to_string())
.spawn(move || {
// If the driver thread panics, there's not much we can do. It is not a
// recoverable error and there is no place to propagate it into so we just abort.
abort_on_panic(|| {
main_loop().expect("async networking thread has panicked");
})
})
.expect("cannot start a thread driving blocking tasks");

Reactor::new().expect("cannot initialize reactor")
});

/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
fn main_loop() -> io::Result<()> {
let reactor = &REACTOR;
let mut events = mio::Events::with_capacity(1000);

loop {
// Block on the poller until at least one new event comes in.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or until timeout.

reactor.poller.poll(&mut events, None)?;
self.poller.poll(&mut events, timeout)?;

// Lock the entire entry table while we're processing new events.
let entries = reactor.entries.lock().unwrap();
let entries = self.entries.lock().unwrap();

// The number of woken tasks.
let mut progress = false;

for event in events.iter() {
let token = event.token();

if token == reactor.notify_token {
if token == self.notify_token {
// If this is the notification token, we just need the notification state.
reactor.notify_reg.1.set_readiness(mio::Ready::empty())?;
self.notify_reg.1.set_readiness(mio::Ready::empty())?;
} else {
// Otherwise, look for the entry associated with this token.
if let Some(entry) = entries.get(token.0) {
// Set the readiness flags from this I/O event.
let readiness = event.readiness();

// Wake up reader tasks blocked on this I/O handle.
if !(readiness & reader_interests()).is_empty() {
let reader_interests = mio::Ready::all() - mio::Ready::writable();
if !(readiness & reader_interests).is_empty() {
for w in entry.readers.lock().unwrap().drain(..) {
w.wake();
progress = true;
}
}

// Wake up writer tasks blocked on this I/O handle.
if !(readiness & writer_interests()).is_empty() {
let writer_interests = mio::Ready::all() - mio::Ready::readable();
if !(readiness & writer_interests).is_empty() {
for w in entry.writers.lock().unwrap().drain(..) {
w.wake();
progress = true;
}
}
}
}
}

Ok(progress)
}
}

Expand All @@ -180,7 +172,8 @@ impl<T: Evented> Watcher<T> {
/// lifetime of the returned I/O handle.
pub fn new(source: T) -> Watcher<T> {
Watcher {
entry: REACTOR
entry: RUNTIME
.reactor()
.register(&source)
.expect("cannot register an I/O event source"),
source: Some(source),
Expand Down Expand Up @@ -264,7 +257,8 @@ impl<T: Evented> Watcher<T> {
#[allow(dead_code)]
pub fn into_inner(mut self) -> T {
let source = self.source.take().unwrap();
REACTOR
RUNTIME
.reactor()
.deregister(&source, &self.entry)
.expect("cannot deregister I/O event source");
source
Expand All @@ -274,7 +268,8 @@ impl<T: Evented> Watcher<T> {
impl<T: Evented> Drop for Watcher<T> {
fn drop(&mut self) {
if let Some(ref source) = self.source {
REACTOR
RUNTIME
.reactor()
.deregister(source, &self.entry)
.expect("cannot deregister I/O event source");
}
Expand All @@ -289,27 +284,3 @@ impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> {
.finish()
}
}

/// Returns a mask containing flags that interest tasks reading from I/O handles.
#[inline]
fn reader_interests() -> mio::Ready {
mio::Ready::all() - mio::Ready::writable()
}

/// Returns a mask containing flags that interest tasks writing into I/O handles.
#[inline]
fn writer_interests() -> mio::Ready {
mio::Ready::writable() | hup()
}

/// Returns a flag containing the hangup status.
#[inline]
fn hup() -> mio::Ready {
#[cfg(unix)]
let ready = mio::unix::UnixReady::hup().into();

#[cfg(not(unix))]
let ready = mio::Ready::empty();

ready
}
Loading