Skip to content

Commit b66da31

Browse files
committed
Add std::os::unix::{register, unregister, Interest}
Closes async-rs#293, based on async-rs#293 (comment)
1 parent 83a488b commit b66da31

File tree

3 files changed

+124
-9
lines changed

3 files changed

+124
-9
lines changed

Diff for: src/net/driver/mod.rs

+25-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::fmt;
22
use std::sync::{Arc, Mutex};
3+
use std::os::raw::c_int;
34

45
use mio::{self, Evented};
56
use once_cell::sync::Lazy;
@@ -11,19 +12,22 @@ use crate::utils::abort_on_panic;
1112

1213
/// Data associated with a registered I/O handle.
1314
#[derive(Debug)]
14-
struct Entry {
15+
pub struct Entry {
1516
/// A unique identifier.
1617
token: mio::Token,
1718

19+
/// File descriptor.
20+
fd: Option<c_int>,
21+
1822
/// Tasks that are blocked on reading from this I/O handle.
19-
readers: Mutex<Vec<Waker>>,
23+
pub readers: Mutex<Vec<Waker>>,
2024

2125
/// Thasks that are blocked on writing to this I/O handle.
22-
writers: Mutex<Vec<Waker>>,
26+
pub writers: Mutex<Vec<Waker>>,
2327
}
2428

2529
/// The state of a networking driver.
26-
struct Reactor {
30+
pub struct Reactor {
2731
/// A mio instance that polls for new events.
2832
poller: mio::Poll,
2933

@@ -51,14 +55,14 @@ impl Reactor {
5155
};
5256

5357
// Register a dummy I/O handle for waking up the polling thread.
54-
let entry = reactor.register(&reactor.notify_reg.0)?;
58+
let entry = reactor.register(&reactor.notify_reg.0, None)?;
5559
reactor.notify_token = entry.token;
5660

5761
Ok(reactor)
5862
}
5963

6064
/// Registers an I/O event source and returns its associated entry.
61-
fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> {
65+
pub fn register(&self, source: &dyn Evented, fd: Option<c_int>) -> io::Result<Arc<Entry>> {
6266
let mut entries = self.entries.lock().unwrap();
6367

6468
// Reserve a vacant spot in the slab and use its key as the token value.
@@ -68,6 +72,7 @@ impl Reactor {
6872
// Allocate an entry and insert it into the slab.
6973
let entry = Arc::new(Entry {
7074
token,
75+
fd,
7176
readers: Mutex::new(Vec::new()),
7277
writers: Mutex::new(Vec::new()),
7378
});
@@ -82,7 +87,7 @@ impl Reactor {
8287
}
8388

8489
/// Deregisters an I/O event source associated with an entry.
85-
fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
90+
pub fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
8691
// Deregister the I/O object from the mio instance.
8792
self.poller.deregister(source)?;
8893

@@ -92,6 +97,17 @@ impl Reactor {
9297
Ok(())
9398
}
9499

100+
/// Deregisters an I/O event source associated with a file descriptor.
101+
pub fn deregister_fd(&self, source: &dyn Evented, fd: c_int) -> io::Result<()> {
102+
// Deregister the I/O object from the mio instance.
103+
self.poller.deregister(source)?;
104+
105+
// Remove the entry associated with the I/O object.
106+
self.entries.lock().unwrap().retain(|_, e| e.fd != Some(fd));
107+
108+
Ok(())
109+
}
110+
95111
// fn notify(&self) {
96112
// self.notify_reg
97113
// .1
@@ -101,7 +117,7 @@ impl Reactor {
101117
}
102118

103119
/// The state of the global networking driver.
104-
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
120+
pub static REACTOR: Lazy<Reactor> = Lazy::new(|| {
105121
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
106122
// handles.
107123
std::thread::Builder::new()
@@ -181,7 +197,7 @@ impl<T: Evented> Watcher<T> {
181197
pub fn new(source: T) -> Watcher<T> {
182198
Watcher {
183199
entry: REACTOR
184-
.register(&source)
200+
.register(&source, None)
185201
.expect("cannot register an I/O event source"),
186202
source: Some(source),
187203
}

Diff for: src/os/unix/io.rs

+58
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,39 @@
11
//! Unix-specific I/O extensions.
22
3+
use crate::task::Context;
4+
use crate::io;
5+
36
cfg_not_docs! {
47
pub use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
8+
use mio::unix::EventedFd;
9+
use crate::net::driver::REACTOR;
10+
11+
/// Registers an I/O handle so that the current task gets woken up when it becomes ready.
12+
#[cfg(feature = "unstable")]
13+
pub fn register(cx: &mut Context<'_>, fd: RawFd, interest: Interest) -> io::Result<()> {
14+
let evented = EventedFd(&fd);
15+
let entry = REACTOR.register(&evented, Some(fd))?;
16+
if interest == Interest::Read || interest == Interest::Both {
17+
let mut list = entry.readers.lock().unwrap();
18+
if list.iter().all(|w| !w.will_wake(cx.waker())) {
19+
list.push(cx.waker().clone());
20+
}
21+
}
22+
if interest == Interest::Write || interest == Interest::Both {
23+
let mut list = entry.writers.lock().unwrap();
24+
if list.iter().all(|w| !w.will_wake(cx.waker())) {
25+
list.push(cx.waker().clone());
26+
}
27+
}
28+
Ok(())
29+
}
30+
31+
/// Unregisters an I/O handle.
32+
#[cfg(feature = "unstable")]
33+
pub fn unregister(fd: RawFd) -> io::Result<()> {
34+
let evented = EventedFd(&fd);
35+
REACTOR.deregister_fd(&evented, fd)
36+
}
537
}
638

739
cfg_docs! {
@@ -51,4 +83,30 @@ cfg_docs! {
5183
/// and must close the descriptor once it's no longer needed.
5284
fn into_raw_fd(self) -> RawFd;
5385
}
86+
87+
/// Registers an I/O handle so that the current task gets woken up when it becomes ready.
88+
#[doc(cfg(unstable))]
89+
pub fn register(cx: &mut Context<'_>, fd: RawFd, interest: Interest) -> io::Result<()> {
90+
unreachable!()
91+
}
92+
93+
/// Unregisters an I/O handle.
94+
#[doc(cfg(unstable))]
95+
pub fn unregister(fd: RawFd) -> io::Result<()> {
96+
unreachable!()
97+
}
98+
}
99+
100+
cfg_unstable! {
101+
/// Decides which possibility a task is interested in.
102+
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
103+
pub enum Interest {
104+
/// A task is interested in reading from a file descriptor.
105+
Read,
106+
/// A task is interested in writing to a file descriptor.
107+
Write,
108+
/// A task is interested in either being able to write or being able to read from a file
109+
/// descriptor.
110+
Both,
111+
}
54112
}

Diff for: tests/register_rawfd.rs

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#![cfg(all(unix, feature = "unstable"))]
2+
3+
use async_std::os::unix::io::{register, unregister, Interest, IntoRawFd, FromRawFd};
4+
use async_std::future::poll_fn;
5+
use async_std::net::{TcpListener, TcpStream};
6+
use async_std::task::{self, block_on, Poll};
7+
8+
#[test]
9+
fn register_stream() {
10+
block_on(async {
11+
for i in 0..3 {
12+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
13+
let addr = listener.local_addr().unwrap();
14+
let t = task::spawn(async move { listener.accept().await });
15+
16+
let stream = TcpStream::connect(&addr).await.unwrap().into_raw_fd();
17+
18+
let mut woken = false;
19+
poll_fn(|cx| {
20+
if woken {
21+
Poll::Ready(())
22+
} else {
23+
match i {
24+
0 => register(cx, stream, Interest::Read).unwrap(),
25+
1 => register(cx, stream, Interest::Write).unwrap(),
26+
2 => register(cx, stream, Interest::Both).unwrap(),
27+
_ => unreachable!(),
28+
}
29+
30+
unregister(stream).unwrap();
31+
woken = true;
32+
33+
Poll::Pending
34+
}
35+
}).await;
36+
37+
t.await.unwrap();
38+
unsafe { TcpStream::from_raw_fd(stream); }
39+
}
40+
});
41+
}

0 commit comments

Comments
 (0)