Skip to content

Add Condvar APIs not susceptible to spurious wake #47970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 25, 2018
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 207 additions & 2 deletions src/libstd/sync/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sync::{mutex, MutexGuard, PoisonError};
use sys_common::condvar as sys;
use sys_common::mutex as sys_mutex;
use sys_common::poison::{self, LockResult};
use time::Duration;
use time::{Duration, Instant};

/// A type indicating whether a timed wait on a condition variable returned
/// due to a time out or not.
Expand Down Expand Up @@ -219,6 +219,62 @@ impl Condvar {
}
}

/// Blocks the current thread until this condition variable receives a
/// notification and the required condition is met. Spurious wakeups are
/// ignored and this function will only return once the condition has been
/// met.
///
/// This function will atomically unlock the mutex specified (represented by
/// `guard`) and block the current thread. This means that any calls
/// to [`notify_one`] or [`notify_all`] which happen logically after the
/// mutex is unlocked are candidates to wake this thread up. When this
/// function call returns, the lock specified will have been re-acquired.
///
/// # Errors
///
/// This function will return an error if the mutex being waited on is
/// poisoned when this thread re-acquires the lock. For more information,
/// see information about [poisoning] on the [`Mutex`] type.
///
/// [`notify_one`]: #method.notify_one
/// [`notify_all`]: #method.notify_all
/// [poisoning]: ../sync/struct.Mutex.html#poisoning
/// [`Mutex`]: ../sync/struct.Mutex.html
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Mutex, Condvar};
/// use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// thread::spawn(move|| {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock().unwrap();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // Wait for the thread to start up.
/// let &(ref lock, ref cvar) = &*pair;
/// // As long as the value inside the `Mutex` is false, we wait.
/// cvar.wait_until(lock.lock().unwrap(), |started| { started });
/// ```
#[stable(feature = "wait_until", since = "1.24")]
pub fn wait_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>,
mut condition: F)
-> LockResult<MutexGuard<'a, T>>
where F: FnMut(&mut T) -> bool {
while !condition(&mut *guard) {
guard = self.wait(guard)?;
}
Ok(guard)
}


/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
Expand Down Expand Up @@ -293,7 +349,15 @@ impl Condvar {
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
/// the system time. This function is susceptible to spurious wakeups.
/// Condition variables normally have a boolean predicate associated with
/// them, and the predicate must always be checked each time this function
/// returns to protect against spurious wakeups. Additionally, it is
/// typically desirable for the time-out to not exceed some duration in
/// spite of spurious wakes, thus the sleep-duration is decremented by the
/// amount slept. Alternatively, use the `wait_timeout_until` method
/// to wait until a condition is met with a total time-out regardless
/// of spurious wakes.
///
/// The returned [`WaitTimeoutResult`] value indicates if the timeout is
/// known to have elapsed.
Expand All @@ -302,6 +366,7 @@ impl Condvar {
/// returns, regardless of whether the timeout elapsed or not.
///
/// [`wait`]: #method.wait
/// [`wait_timeout_until`]: #method.wait_timeout_until
/// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html
///
/// # Examples
Expand Down Expand Up @@ -353,6 +418,77 @@ impl Condvar {
}
}

/// Waits on this condition variable for a notification, timing out after a
/// specified duration. Spurious wakes will not cause this function to
/// return.
///
/// The semantics of this function are equivalent to [`wait_until`] except
/// that the thread will be blocked for roughly no longer than `dur`. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
/// amount of time waited to be precisely `dur`.
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
///
/// The returned [`WaitTimeoutResult`] value indicates if the timeout is
/// known to have elapsed without the condition being met.
///
/// Like [`wait_until`], the lock specified will be re-acquired when this
/// function returns, regardless of whether the timeout elapsed or not.
///
/// [`wait_until`]: #method.wait_until
/// [`wait_timeout`]: #method.wait_timeout
/// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Mutex, Condvar};
/// use std::thread;
/// use std::time::Duration;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// thread::spawn(move|| {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock().unwrap();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // wait for the thread to start up
/// let &(ref lock, ref cvar) = &*pair;
/// let result = cvar.wait_timeout_until(lock, Duration::from_millis(100), |started| {
/// started
/// }).unwrap();
/// if result.1.timed_out() {
/// // timed-out without the condition ever evaluating to true.
/// }
/// // access the locked mutex via result.0
/// ```
#[stable(feature = "wait_timeout_until", since = "1.24")]
pub fn wait_timeout_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>,
mut dur: Duration, mut condition: F)
-> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)>
where F: FnMut(&mut T) -> bool {
let timed_out = Duration::new(0, 0);
loop {
if !condition(&mut *guard) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is backwards, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops. It is. I can fix it or rewrite it as above (not sure which is simpler to read). Which would you prefer?

return Ok((guard, WaitTimeoutResult(false)));
} else if dur == timed_out {
return Ok((guard, WaitTimeoutResult(true)));
}
let wait_timer = Instant::now();
let wait_result = self.wait_timeout(guard, dur)?;
dur = dur.checked_sub(wait_timer.elapsed()).unwrap_or(timed_out);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_result already contains the information about if we time out or not. It seems like we should use that rather than doubling the time manipulation work.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately that doesn't work because the semantics of the function are such that we can't return timeout if the condition is satisfied. This particular wait_timeout (since we are doing it in a loop to protect against spurious wakes) may have timed-out with the condition satisfied so we need to check that first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the difference between a checking after a timeout and not checking semantically observable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantic difference is whether or not the contract for the function is observed which is that condition == true xor WaitTimeoutResult(true). Since the condition is protected by a mutex, failure to check the condition before exiting would result in a potential race where WaitTimeoutResult(true) is returned even though the condition has been met.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW C++ has this same pattern of checking the result before returning. The main difference is that they don't adjust the wait duration time so you could potentially never wake up if you keep getting spurious wakeups before the timeout.

Copy link
Author

@vlovich vlovich Feb 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the C++ standard (at least as described on cppreference & implemented by libstdc++ & libc++) is broken here because they don't account for time slept in their loop, but there might be a simpler way to rewrite this more like the C++ implementation.

EDIT: Nvm. I totally misread the cppreference. They use absolute timestamps for sleeping so the code is simpler.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think

     let timed_out = Duration::new(0, 0);
     while !condition(&mut *guard) {
         let wait_timer = Instant::now();
         let wait_result = self.wait_timeout(guard, dur)?;
         dur = dur.checked_sub(wait_timer.elapsed()).unwrap_or(timed_out);
         guard = wait_result.0;
         if wait_result.1.timed_out() {
             Ok((guard, WaitTimeoutResult(condition(&mut *guard))))
         }
     }
     Ok((guard, WaitTimeoutResult(true)))

is more readable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like

let start = Instant::now();
loop {
    if condition(guard) {
        return Ok((guard, WaitTimeoutResult(true)));
    }
    let timeout = match dur.checked_sub(start.elapsed()) {
        Some(timeout) => timeout,
        None => return Ok((guard, WaitTimeoutResult(false))),
    };
    guard = self.wait_timeout(guard, dur)?.0;
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that works.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I think you have the true/false reversed for the WaitTimeoutResult.

guard = wait_result.0;
}
}

/// Wakes up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
Expand Down Expand Up @@ -546,6 +682,29 @@ mod tests {
}
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_until() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();

// Inside of our lock, spawn a new thread, and then wait for it to start.
thread::spawn(move|| {
let &(ref lock, ref cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});

// Wait for the thread to start up.
let &(ref lock, ref cvar) = &*pair;
let guard = cvar.wait_until(lock.lock().unwrap(), |started| {
started
});
assert!(*guard);
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_wait() {
Expand All @@ -565,6 +724,52 @@ mod tests {
}
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_wait() {
let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new());

let g = m.lock().unwrap();
let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(1), || { false }).unwrap();
// no spurious wakeups. ensure it timed-out
assert!(wait.timed_out());
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_instant_satisfy() {
let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new());

let g = m.lock().unwrap();
let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(0), || { true }).unwrap();
// ensure it didn't time-out even if we were not given any time.
assert!(!wait.timed_out());
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_wake() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_copy = pair.clone();

let g = m.lock().unwrap();
let t = thread::spawn(move || {
let &(ref lock, ref cvar) = &*pair2;
let mut started = lock.lock().unwrap();
thread::sleep(Duration::from_millis(1));
started = true;
cvar.notify_one();
});
let (g2, wait) = c.wait_timeout_until(g, Duration::from_millis(u64::MAX), |&notified| {
notified
}).unwrap();
// ensure it didn't time-out even if we were not given any time.
assert!(!wait.timed_out());
assert!(*g2);
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_wake() {
Expand Down