Skip to content

Commit c3c5f2a

Browse files
authored
Merge pull request async-rs#64 from smol-rs/taiki-e/named
Add Readable and Writable futures
2 parents 68f9d40 + 9f7b38e commit c3c5f2a

File tree

5 files changed

+157
-124
lines changed

5 files changed

+157
-124
lines changed

Diff for: .github/workflows/build-and-test.yaml

+26-48
Original file line numberDiff line numberDiff line change
@@ -13,71 +13,49 @@ jobs:
1313
fail-fast: false
1414
matrix:
1515
os: [ubuntu-latest, windows-latest, macos-latest]
16-
rust: [nightly, beta, stable, 1.39.0]
16+
rust: [nightly, beta, stable]
1717
steps:
1818
- uses: actions/checkout@v2
19-
20-
- name: Install latest ${{ matrix.rust }}
21-
uses: actions-rs/toolchain@v1
22-
with:
23-
toolchain: ${{ matrix.rust }}
24-
profile: minimal
25-
override: true
26-
27-
- name: Run basic cargo check
28-
uses: actions-rs/cargo@v1
29-
with:
30-
command: check
31-
args: --all --bins --all-features
32-
33-
- name: Run cargo check
34-
if: startsWith(matrix.rust, '1.39.0') == false
35-
uses: actions-rs/cargo@v1
36-
with:
37-
command: check
38-
args: --all --benches --bins --examples --tests --all-features
39-
19+
- name: Install Rust
20+
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
21+
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
22+
- run: cargo build --all --all-features --all-targets
4023
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
4124
if: startsWith(matrix.rust, 'nightly')
42-
uses: actions-rs/cargo@v1
43-
with:
44-
command: check
45-
args: -Z features=dev_dep
46-
47-
- name: Run cargo test
48-
if: startsWith(matrix.rust, '1.39.0') == false
49-
uses: actions-rs/cargo@v1
50-
with:
51-
command: test
25+
run: cargo check -Z features=dev_dep
26+
- run: cargo test
5227

5328
# Copied from: https://github.com/rust-lang/stacker/pull/19/files
5429
windows_gnu:
5530
runs-on: windows-latest
5631
strategy:
5732
matrix:
58-
rust_toolchain: [nightly]
59-
rust_target:
33+
rust: [nightly]
34+
target:
6035
- x86_64-pc-windows-gnu
6136
steps:
6237
- uses: actions/checkout@v1
63-
- name: Install Rust nightly
64-
uses: actions-rs/toolchain@v1
65-
with:
66-
toolchain: ${{ matrix.rust_toolchain }}
67-
target: ${{ matrix.rust_target }}
68-
default: true
38+
- name: Install Rust
39+
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
40+
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
41+
- run: rustup target add ${{ matrix.target }}
6942
# https://github.com/rust-lang/rust/issues/49078
7043
- name: Fix windows-gnu rust-mingw
7144
run : |
7245
for i in crt2.o dllcrt2.o libmingwex.a libmsvcrt.a ; do
7346
cp -f "/C/ProgramData/Chocolatey/lib/mingw/tools/install/mingw64/x86_64-w64-mingw32/lib/$i" "`rustc --print sysroot`/lib/rustlib/x86_64-pc-windows-gnu/lib"
7447
done
7548
shell: bash
76-
- uses: actions-rs/cargo@v1
77-
with:
78-
command: build
79-
args: --target ${{ matrix.rust_target }} --all --benches --bins --examples --tests --all-features
80-
- uses: actions-rs/cargo@v1
81-
with:
82-
command: test
83-
args: --target ${{ matrix.rust_target }}
49+
- run: cargo build --target ${{ matrix.target }} --all --all-features --all-targets
50+
- run: cargo test --target ${{ matrix.target }}
51+
52+
msrv:
53+
runs-on: ubuntu-latest
54+
strategy:
55+
matrix:
56+
rust: [1.46.0]
57+
steps:
58+
- uses: actions/checkout@v2
59+
- name: Install Rust
60+
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
61+
- run: cargo build

Diff for: .github/workflows/cross.yaml

+3-7
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,9 @@ jobs:
1515
os: [ubuntu-latest, macos-latest]
1616

1717
steps:
18-
- uses: actions/checkout@master
19-
20-
- name: Install nightly
21-
uses: actions-rs/toolchain@v1
22-
with:
23-
toolchain: nightly
24-
override: true
18+
- uses: actions/checkout@v2
19+
- name: Install Rust
20+
run: rustup update stable
2521

2622
- name: Install cross
2723
run: cargo install cross

Diff for: .github/workflows/lint.yaml

+2-6
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,8 @@ jobs:
1111
runs-on: ubuntu-latest
1212
steps:
1313
- uses: actions/checkout@v2
14-
15-
- uses: actions-rs/toolchain@v1
16-
with:
17-
toolchain: stable
18-
profile: minimal
19-
components: clippy
14+
- name: Install Rust
15+
run: rustup update stable
2016

2117
- uses: actions-rs/clippy-check@v1
2218
with:

Diff for: src/lib.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ mod driver;
8585
mod reactor;
8686

8787
pub use driver::block_on;
88+
pub use reactor::{Readable, Writable};
8889

8990
/// Use `Duration::MAX` once `duration_constants` are stabilized.
9091
fn duration_max() -> Duration {
@@ -686,8 +687,8 @@ impl<T> Async<T> {
686687
/// listener.readable().await?;
687688
/// # std::io::Result::Ok(()) });
688689
/// ```
689-
pub async fn readable(&self) -> io::Result<()> {
690-
self.source.readable().await
690+
pub fn readable(&self) -> Readable {
691+
self.source.readable()
691692
}
692693

693694
/// Waits until the I/O handle is writable.
@@ -708,8 +709,8 @@ impl<T> Async<T> {
708709
/// stream.writable().await?;
709710
/// # std::io::Result::Ok(()) });
710711
/// ```
711-
pub async fn writable(&self) -> io::Result<()> {
712-
self.source.writable().await
712+
pub fn writable(&self) -> Writable {
713+
self.source.writable()
713714
}
714715

715716
/// Polls the I/O handle for readability.

Diff for: src/reactor.rs

+121-59
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
use std::collections::BTreeMap;
2+
use std::future::Future;
23
use std::io;
34
use std::mem;
45
#[cfg(unix)]
56
use std::os::unix::io::RawFd;
67
#[cfg(windows)]
78
use std::os::windows::io::RawSocket;
89
use std::panic;
10+
use std::pin::Pin;
911
use std::sync::atomic::{AtomicUsize, Ordering};
1012
use std::sync::{Arc, Mutex, MutexGuard};
1113
use std::task::{Context, Poll, Waker};
1214
use std::time::{Duration, Instant};
1315

1416
use concurrent_queue::ConcurrentQueue;
15-
use futures_lite::future;
17+
use futures_lite::ready;
1618
use once_cell::sync::Lazy;
1719
use polling::{Event, Poller};
1820
use slab::Slab;
@@ -441,78 +443,138 @@ impl Source {
441443
}
442444

443445
/// Waits until the I/O source is readable.
444-
pub(crate) async fn readable(&self) -> io::Result<()> {
445-
self.ready(READ).await?;
446-
log::trace!("readable: fd={}", self.raw);
447-
Ok(())
446+
pub(crate) fn readable(self: &Arc<Self>) -> Readable {
447+
Readable(self.ready(READ))
448448
}
449449

450450
/// Waits until the I/O source is writable.
451-
pub(crate) async fn writable(&self) -> io::Result<()> {
452-
self.ready(WRITE).await?;
453-
log::trace!("writable: fd={}", self.raw);
454-
Ok(())
451+
pub(crate) fn writable(self: &Arc<Self>) -> Writable {
452+
Writable(self.ready(WRITE))
455453
}
456454

457455
/// Waits until the I/O source is readable or writable.
458-
async fn ready(&self, dir: usize) -> io::Result<()> {
459-
let mut ticks = None;
460-
let mut index = None;
461-
let mut _guard = None;
462-
463-
future::poll_fn(|cx| {
464-
let mut state = self.state.lock().unwrap();
465-
466-
// Check if the reactor has delivered an event.
467-
if let Some((a, b)) = ticks {
468-
// If `state[dir].tick` has changed to a value other than the old reactor tick,
469-
// that means a newer reactor tick has delivered an event.
470-
if state[dir].tick != a && state[dir].tick != b {
471-
return Poll::Ready(Ok(()));
472-
}
456+
fn ready(self: &Arc<Self>, dir: usize) -> Ready {
457+
Ready {
458+
source: self.clone(),
459+
dir,
460+
ticks: None,
461+
index: None,
462+
_guard: None,
463+
}
464+
}
465+
}
466+
467+
/// Future for [`Async::readable`](crate::Async::readable).
468+
#[derive(Debug)]
469+
#[must_use = "futures do nothing unless you `.await` or poll them"]
470+
pub struct Readable(Ready);
471+
472+
impl Future for Readable {
473+
type Output = io::Result<()>;
474+
475+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
476+
ready!(Pin::new(&mut self.0).poll(cx))?;
477+
log::trace!("readable: fd={}", self.0.source.raw);
478+
Poll::Ready(Ok(()))
479+
}
480+
}
481+
482+
/// Future for [`Async::writable`](crate::Async::writable).
483+
#[derive(Debug)]
484+
#[must_use = "futures do nothing unless you `.await` or poll them"]
485+
pub struct Writable(Ready);
486+
487+
impl Future for Writable {
488+
type Output = io::Result<()>;
489+
490+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
491+
ready!(Pin::new(&mut self.0).poll(cx))?;
492+
log::trace!("writable: fd={}", self.0.source.raw);
493+
Poll::Ready(Ok(()))
494+
}
495+
}
496+
497+
#[derive(Debug)]
498+
struct Ready {
499+
source: Arc<Source>,
500+
dir: usize,
501+
ticks: Option<(usize, usize)>,
502+
index: Option<usize>,
503+
_guard: Option<RemoveOnDrop>,
504+
}
505+
506+
impl Future for Ready {
507+
type Output = io::Result<()>;
508+
509+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
510+
let Self {
511+
source,
512+
dir,
513+
ticks,
514+
index,
515+
_guard,
516+
} = &mut *self;
517+
518+
let mut state = source.state.lock().unwrap();
519+
520+
// Check if the reactor has delivered an event.
521+
if let Some((a, b)) = *ticks {
522+
// If `state[dir].tick` has changed to a value other than the old reactor tick,
523+
// that means a newer reactor tick has delivered an event.
524+
if state[*dir].tick != a && state[*dir].tick != b {
525+
return Poll::Ready(Ok(()));
473526
}
527+
}
474528

475-
let was_empty = state[dir].is_empty();
476-
477-
// Register the current task's waker.
478-
let i = match index {
479-
Some(i) => i,
480-
None => {
481-
let i = state[dir].wakers.insert(None);
482-
_guard = Some(CallOnDrop(move || {
483-
let mut state = self.state.lock().unwrap();
484-
state[dir].wakers.remove(i);
485-
}));
486-
index = Some(i);
487-
ticks = Some((Reactor::get().ticker(), state[dir].tick));
488-
i
489-
}
490-
};
491-
state[dir].wakers[i] = Some(cx.waker().clone());
492-
493-
// Update interest in this I/O handle.
494-
if was_empty {
495-
Reactor::get().poller.modify(
496-
self.raw,
497-
Event {
498-
key: self.key,
499-
readable: !state[READ].is_empty(),
500-
writable: !state[WRITE].is_empty(),
501-
},
502-
)?;
529+
let was_empty = state[*dir].is_empty();
530+
531+
// Register the current task's waker.
532+
let i = match *index {
533+
Some(i) => i,
534+
None => {
535+
let i = state[*dir].wakers.insert(None);
536+
*_guard = Some(RemoveOnDrop {
537+
source: source.clone(),
538+
dir: *dir,
539+
key: i,
540+
});
541+
*index = Some(i);
542+
*ticks = Some((Reactor::get().ticker(), state[*dir].tick));
543+
i
503544
}
545+
};
546+
state[*dir].wakers[i] = Some(cx.waker().clone());
504547

505-
Poll::Pending
506-
})
507-
.await
548+
// Update interest in this I/O handle.
549+
if was_empty {
550+
Reactor::get().poller.modify(
551+
source.raw,
552+
Event {
553+
key: source.key,
554+
readable: !state[READ].is_empty(),
555+
writable: !state[WRITE].is_empty(),
556+
},
557+
)?;
558+
}
559+
560+
Poll::Pending
508561
}
509562
}
510563

511-
/// Runs a closure when dropped.
512-
struct CallOnDrop<F: Fn()>(F);
564+
/// Remove waker when dropped.
565+
#[derive(Debug)]
566+
struct RemoveOnDrop {
567+
source: Arc<Source>,
568+
dir: usize,
569+
key: usize,
570+
}
513571

514-
impl<F: Fn()> Drop for CallOnDrop<F> {
572+
impl Drop for RemoveOnDrop {
515573
fn drop(&mut self) {
516-
(self.0)();
574+
let mut state = self.source.state.lock().unwrap();
575+
let wakers = &mut state[self.dir].wakers;
576+
if wakers.contains(self.key) {
577+
wakers.remove(self.key);
578+
}
517579
}
518580
}

0 commit comments

Comments
 (0)