Skip to content

Add std::os::unix::{register, unregister, Interest} #626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions src/net/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,31 @@ use crate::io;
use crate::task::{Context, Poll, Waker};
use crate::utils::abort_on_panic;

#[cfg(unix)]
pub use std::os::unix::io::RawFd;

#[cfg(not(unix))]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub enum RawFd {}

/// Data associated with a registered I/O handle.
#[derive(Debug)]
struct Entry {
pub struct Entry {
/// A unique identifier.
token: mio::Token,

/// File descriptor.
fd: Option<RawFd>,

/// Tasks that are blocked on reading from this I/O handle.
readers: Mutex<Vec<Waker>>,
pub readers: Mutex<Vec<Waker>>,

/// Thasks that are blocked on writing to this I/O handle.
writers: Mutex<Vec<Waker>>,
pub writers: Mutex<Vec<Waker>>,
}

/// The state of a networking driver.
struct Reactor {
pub struct Reactor {
/// A mio instance that polls for new events.
poller: mio::Poll,

Expand Down Expand Up @@ -51,14 +61,14 @@ impl Reactor {
};

// Register a dummy I/O handle for waking up the polling thread.
let entry = reactor.register(&reactor.notify_reg.0)?;
let entry = reactor.register(&reactor.notify_reg.0, None)?;
reactor.notify_token = entry.token;

Ok(reactor)
}

/// Registers an I/O event source and returns its associated entry.
fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> {
pub fn register(&self, source: &dyn Evented, fd: Option<RawFd>) -> io::Result<Arc<Entry>> {
let mut entries = self.entries.lock().unwrap();

// Reserve a vacant spot in the slab and use its key as the token value.
Expand All @@ -68,6 +78,7 @@ impl Reactor {
// Allocate an entry and insert it into the slab.
let entry = Arc::new(Entry {
token,
fd,
readers: Mutex::new(Vec::new()),
writers: Mutex::new(Vec::new()),
});
Expand All @@ -82,7 +93,7 @@ impl Reactor {
}

/// Deregisters an I/O event source associated with an entry.
fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
pub fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evented shouldn't be part of our pub interface.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module is pub(crate).

// Deregister the I/O object from the mio instance.
self.poller.deregister(source)?;

Expand All @@ -92,6 +103,18 @@ impl Reactor {
Ok(())
}

/// Deregisters an I/O event source associated with a file descriptor.
#[cfg(all(feature = "unstable", unix))]
pub fn deregister_fd(&self, source: &dyn Evented, fd: RawFd) -> io::Result<()> {
// Deregister the I/O object from the mio instance.
self.poller.deregister(source)?;

// Remove the entry associated with the I/O object.
self.entries.lock().unwrap().retain(|_, e| e.fd != Some(fd));

Ok(())
}

// fn notify(&self) {
// self.notify_reg
// .1
Expand All @@ -101,7 +124,7 @@ impl Reactor {
}

/// The state of the global networking driver.
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
pub static REACTOR: Lazy<Reactor> = Lazy::new(|| {
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
// handles.
std::thread::Builder::new()
Expand Down Expand Up @@ -181,7 +204,7 @@ impl<T: Evented> Watcher<T> {
pub fn new(source: T) -> Watcher<T> {
Watcher {
entry: REACTOR
.register(&source)
.register(&source, None)
.expect("cannot register an I/O event source"),
source: Some(source),
}
Expand Down
63 changes: 63 additions & 0 deletions src/os/unix/io.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,44 @@
//! Unix-specific I/O extensions.

#[cfg(feature = "unstable")]
use crate::task::Context;
#[cfg(feature = "unstable")]
use crate::io;

cfg_not_docs! {
pub use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};

#[cfg(feature = "unstable")]
use mio::unix::EventedFd;
#[cfg(feature = "unstable")]
use crate::net::driver::REACTOR;

/// Registers an I/O handle so that the current task gets woken up when it becomes ready.
#[cfg(feature = "unstable")]
pub fn register(cx: &mut Context<'_>, fd: RawFd, interest: Interest) -> io::Result<()> {
let evented = EventedFd(&fd);
let entry = REACTOR.register(&evented, Some(fd))?;
if interest == Interest::Read || interest == Interest::Both {
let mut list = entry.readers.lock().unwrap();
if list.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone());
}
}
if interest == Interest::Write || interest == Interest::Both {
let mut list = entry.writers.lock().unwrap();
if list.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone());
}
}
Ok(())
}

/// Unregisters an I/O handle.
#[cfg(feature = "unstable")]
pub fn unregister(fd: RawFd) -> io::Result<()> {
let evented = EventedFd(&fd);
REACTOR.deregister_fd(&evented, fd)
}
}

cfg_docs! {
Expand Down Expand Up @@ -51,4 +88,30 @@ cfg_docs! {
/// and must close the descriptor once it's no longer needed.
fn into_raw_fd(self) -> RawFd;
}

/// Registers an I/O handle so that the current task gets woken up when it becomes ready.
#[doc(cfg(unstable))]
pub fn register(cx: &mut Context<'_>, fd: RawFd, interest: Interest) -> io::Result<()> {
unreachable!()
}

/// Unregisters an I/O handle.
#[doc(cfg(unstable))]
pub fn unregister(fd: RawFd) -> io::Result<()> {
unreachable!()
}
}

cfg_unstable! {
/// Decides which possibility a task is interested in.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Interest {
/// A task is interested in reading from a file descriptor.
Read,
/// A task is interested in writing to a file descriptor.
Write,
/// A task is interested in either being able to write or being able to read from a file
/// descriptor.
Both,
}
}
44 changes: 44 additions & 0 deletions tests/register_rawfd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#![cfg(all(unix, feature = "unstable"))]

use async_std::future::poll_fn;
use async_std::net::{TcpListener, TcpStream};
use async_std::os::unix::io::{register, unregister, FromRawFd, Interest, IntoRawFd};
use async_std::task::{self, block_on, Poll};

#[test]
fn register_stream() {
block_on(async {
for i in 0..3 {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let t = task::spawn(async move { listener.accept().await });

let stream = TcpStream::connect(&addr).await.unwrap().into_raw_fd();

let mut woken = false;
poll_fn(|cx| {
if woken {
Poll::Ready(())
} else {
match i {
0 => register(cx, stream, Interest::Read).unwrap(),
1 => register(cx, stream, Interest::Write).unwrap(),
2 => register(cx, stream, Interest::Both).unwrap(),
_ => unreachable!(),
}

unregister(stream).unwrap();
woken = true;

Poll::Pending
}
})
.await;

t.await.unwrap();
unsafe {
TcpStream::from_raw_fd(stream);
}
}
});
}