-
Notifications
You must be signed in to change notification settings - Fork 653
Improve thread notify #597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
#![feature(test)] | ||
|
||
extern crate futures; | ||
extern crate test; | ||
|
||
use futures::{Future, Poll, Async}; | ||
use futures::task::{self, Task}; | ||
|
||
use test::Bencher; | ||
|
||
#[bench] | ||
fn thread_yield_single_thread_one_wait(b: &mut Bencher) { | ||
const NUM: usize = 10_000; | ||
|
||
struct Yield { | ||
rem: usize, | ||
} | ||
|
||
impl Future for Yield { | ||
type Item = (); | ||
type Error = (); | ||
|
||
fn poll(&mut self) -> Poll<(), ()> { | ||
if self.rem == 0 { | ||
Ok(Async::Ready(())) | ||
} else { | ||
self.rem -= 1; | ||
task::current().notify(); | ||
Ok(Async::NotReady) | ||
} | ||
} | ||
} | ||
|
||
b.iter(|| { | ||
let y = Yield { rem: NUM }; | ||
y.wait().unwrap(); | ||
}); | ||
} | ||
|
||
#[bench] | ||
fn thread_yield_single_thread_many_wait(b: &mut Bencher) { | ||
const NUM: usize = 10_000; | ||
|
||
struct Yield { | ||
rem: usize, | ||
} | ||
|
||
impl Future for Yield { | ||
type Item = (); | ||
type Error = (); | ||
|
||
fn poll(&mut self) -> Poll<(), ()> { | ||
if self.rem == 0 { | ||
Ok(Async::Ready(())) | ||
} else { | ||
self.rem -= 1; | ||
task::current().notify(); | ||
Ok(Async::NotReady) | ||
} | ||
} | ||
} | ||
|
||
b.iter(|| { | ||
for _ in 0..NUM { | ||
let y = Yield { rem: 1 }; | ||
y.wait().unwrap(); | ||
} | ||
}); | ||
} | ||
|
||
#[bench] | ||
fn thread_yield_multi_thread(b: &mut Bencher) { | ||
use std::sync::mpsc; | ||
use std::thread; | ||
|
||
const NUM: usize = 1_000; | ||
|
||
let (tx, rx) = mpsc::sync_channel::<Task>(10_000); | ||
|
||
struct Yield { | ||
rem: usize, | ||
tx: mpsc::SyncSender<Task>, | ||
} | ||
|
||
impl Future for Yield { | ||
type Item = (); | ||
type Error = (); | ||
|
||
fn poll(&mut self) -> Poll<(), ()> { | ||
if self.rem == 0 { | ||
Ok(Async::Ready(())) | ||
} else { | ||
self.rem -= 1; | ||
self.tx.send(task::current()).unwrap(); | ||
Ok(Async::NotReady) | ||
} | ||
} | ||
} | ||
|
||
thread::spawn(move || { | ||
while let Ok(task) = rx.recv() { | ||
task.notify(); | ||
} | ||
}); | ||
|
||
b.iter(move || { | ||
let y = Yield { | ||
rem: NUM, | ||
tx: tx.clone(), | ||
}; | ||
|
||
y.wait().unwrap(); | ||
}); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,9 +5,8 @@ use std::fmt; | |
use std::marker::PhantomData; | ||
use std::mem; | ||
use std::ptr; | ||
use std::sync::{Arc, Once, ONCE_INIT}; | ||
use std::sync::atomic::{AtomicBool, Ordering}; | ||
use std::thread; | ||
use std::sync::{Arc, Mutex, Condvar, Once, ONCE_INIT}; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
|
||
use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink}; | ||
use super::core; | ||
|
@@ -238,14 +237,15 @@ impl<F: Future> Spawn<F> { | |
/// to complete. When a future cannot make progress it will use | ||
/// `thread::park` to block the current thread. | ||
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> { | ||
let unpark = Arc::new(ThreadUnpark::new(thread::current())); | ||
ThreadNotify::with_current(|notify| { | ||
|
||
loop { | ||
match self.poll_future_notify(&unpark, 0)? { | ||
Async::NotReady => unpark.park(), | ||
Async::Ready(e) => return Ok(e), | ||
loop { | ||
match self.poll_future_notify(notify, 0)? { | ||
Async::NotReady => notify.park(), | ||
Async::Ready(e) => return Ok(e), | ||
} | ||
} | ||
} | ||
}) | ||
} | ||
|
||
/// A specialized function to request running a future to completion on the | ||
|
@@ -296,15 +296,17 @@ impl<S: Stream> Spawn<S> { | |
/// Like `wait_future`, except only waits for the next element to arrive on | ||
/// the underlying stream. | ||
pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> { | ||
let unpark = Arc::new(ThreadUnpark::new(thread::current())); | ||
loop { | ||
match self.poll_stream_notify(&unpark, 0) { | ||
Ok(Async::NotReady) => unpark.park(), | ||
Ok(Async::Ready(Some(e))) => return Some(Ok(e)), | ||
Ok(Async::Ready(None)) => return None, | ||
Err(e) => return Some(Err(e)), | ||
ThreadNotify::with_current(|notify| { | ||
|
||
loop { | ||
match self.poll_stream_notify(notify, 0) { | ||
Ok(Async::NotReady) => notify.park(), | ||
Ok(Async::Ready(Some(e))) => return Some(Ok(e)), | ||
Ok(Async::Ready(None)) => return None, | ||
Err(e) => return Some(Err(e)), | ||
} | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
|
||
|
@@ -340,14 +342,16 @@ impl<S: Sink> Spawn<S> { | |
/// be blocked until it's able to send the value. | ||
pub fn wait_send(&mut self, mut value: S::SinkItem) | ||
-> Result<(), S::SinkError> { | ||
let notify = Arc::new(ThreadUnpark::new(thread::current())); | ||
loop { | ||
value = match self.start_send_notify(value, ¬ify, 0)? { | ||
AsyncSink::NotReady(v) => v, | ||
AsyncSink::Ready => return Ok(()), | ||
}; | ||
notify.park(); | ||
} | ||
ThreadNotify::with_current(|notify| { | ||
|
||
loop { | ||
value = match self.start_send_notify(value, notify, 0)? { | ||
AsyncSink::NotReady(v) => v, | ||
AsyncSink::Ready => return Ok(()), | ||
}; | ||
notify.park(); | ||
} | ||
}) | ||
} | ||
|
||
/// Blocks the current thread until it's able to flush this sink. | ||
|
@@ -359,13 +363,15 @@ impl<S: Sink> Spawn<S> { | |
/// The thread will be blocked until `poll_complete` returns that it's | ||
/// ready. | ||
pub fn wait_flush(&mut self) -> Result<(), S::SinkError> { | ||
let notify = Arc::new(ThreadUnpark::new(thread::current())); | ||
loop { | ||
if self.poll_flush_notify(¬ify, 0)?.is_ready() { | ||
return Ok(()) | ||
ThreadNotify::with_current(|notify| { | ||
|
||
loop { | ||
if self.poll_flush_notify(notify, 0)?.is_ready() { | ||
return Ok(()) | ||
} | ||
notify.park(); | ||
} | ||
notify.park(); | ||
} | ||
}) | ||
} | ||
|
||
/// Blocks the current thread until it's able to close this sink. | ||
|
@@ -374,13 +380,15 @@ impl<S: Sink> Spawn<S> { | |
/// is not ready to be close yet, then the current thread will be blocked | ||
/// until it's closed. | ||
pub fn wait_close(&mut self) -> Result<(), S::SinkError> { | ||
let notify = Arc::new(ThreadUnpark::new(thread::current())); | ||
loop { | ||
if self.close_notify(¬ify, 0)?.is_ready() { | ||
return Ok(()) | ||
ThreadNotify::with_current(|notify| { | ||
|
||
loop { | ||
if self.close_notify(notify, 0)?.is_ready() { | ||
return Ok(()) | ||
} | ||
notify.park(); | ||
} | ||
notify.park(); | ||
} | ||
}) | ||
} | ||
} | ||
|
||
|
@@ -474,32 +482,91 @@ impl Unpark for RunInner { | |
} | ||
} | ||
|
||
// ===== ThreadUnpark ===== | ||
// ===== ThreadNotify ===== | ||
|
||
struct ThreadUnpark { | ||
thread: thread::Thread, | ||
ready: AtomicBool, | ||
struct ThreadNotify { | ||
state: AtomicUsize, | ||
mutex: Mutex<()>, | ||
condvar: Condvar, | ||
} | ||
|
||
impl ThreadUnpark { | ||
fn new(thread: thread::Thread) -> ThreadUnpark { | ||
ThreadUnpark { | ||
thread: thread, | ||
ready: AtomicBool::new(false), | ||
} | ||
const IDLE: usize = 0; | ||
const NOTIFY: usize = 1; | ||
const SLEEP: usize = 2; | ||
|
||
thread_local! { | ||
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify { | ||
state: AtomicUsize::new(IDLE), | ||
mutex: Mutex::new(()), | ||
condvar: Condvar::new(), | ||
}); | ||
} | ||
|
||
impl ThreadNotify { | ||
fn with_current<F, R>(f: F) -> R | ||
where F: FnOnce(&Arc<ThreadNotify>) -> R, | ||
{ | ||
CURRENT_THREAD_NOTIFY.with(|notify| f(notify)) | ||
} | ||
|
||
fn park(&self) { | ||
if !self.ready.swap(false, Ordering::SeqCst) { | ||
thread::park(); | ||
// If currently notified, then we skip sleeping. This is checked outside | ||
// of the lock to avoid acquiring a mutex if not necessary. | ||
match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { | ||
NOTIFY => return, | ||
IDLE => {}, | ||
_ => unreachable!(), | ||
} | ||
|
||
// The state is currently idle, so obtain the lock and then try to | ||
// transition to a sleeping state. | ||
let mut m = self.mutex.lock().unwrap(); | ||
|
||
// Transition to sleeping | ||
match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) { | ||
NOTIFY => { | ||
// Notified before we could sleep, consume the notification and | ||
// exit | ||
self.state.store(IDLE, Ordering::SeqCst); | ||
return; | ||
} | ||
IDLE => {}, | ||
_ => unreachable!(), | ||
} | ||
|
||
// Loop until we've been notified | ||
loop { | ||
m = self.condvar.wait(m).unwrap(); | ||
|
||
// Transition back to idle, loop otherwise | ||
if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { | ||
return; | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl Notify for ThreadUnpark { | ||
impl Notify for ThreadNotify { | ||
fn notify(&self, _unpark_id: usize) { | ||
self.ready.store(true, Ordering::SeqCst); | ||
self.thread.unpark() | ||
// First, try transitioning from IDLE -> NOTIFY, this does not require a | ||
// lock. | ||
match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { | ||
IDLE | NOTIFY => return, | ||
SLEEP => {} | ||
_ => unreachable!(), | ||
} | ||
|
||
// The other half is sleeping, this requires a lock | ||
let _m = self.mutex.lock().unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be worthwhile to have a separate let _nm = match self.notifier_mutex.try_lock() {
Ok(g) => g,
Err(e) => match e {
TryLockResult::Poisoned(e) => panic!(e),
TryLockResult::WouldBlock => { return ; }
}
}
let _m = self.mutex.lock().unwrap();
... which would avoid simultaneous notifications sitting on a mutex to notify a thread that will only need the first (and would also help avoid [not eliminate] the scenario where the first notification woke the sleeping thread, which then consumes all events, and then gets falsely notified by the other notifications that hadn't hit yet). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eh nvm - this would still require keeping one notifier on deck, which |
||
|
||
// Transition from SLEEP -> NOTIFY | ||
match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this cas would just need to be a store with notifier_mutex. [edit: nvm - it still needs to be a compare_and_swap, b/c a simultaneous notification that is slow to the try_lock could still fall in here after the parked thread awakens and swaps to idle] |
||
SLEEP => {} | ||
_ => return, | ||
} | ||
|
||
// Wakeup the sleeper | ||
self.condvar.notify_one(); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would this ever not be
NOTIFY
? Can this be just an atomic store and return?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
condvars can wakeup spuriously
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right, I forgot that actual OS condvars can do that - Go's sync.Cond does not. Apologies!