Skip to content

Commit fb8d9f4

Browse files
committed
auto merge of #21132 : sfackler/rust/wait_timeout, r=alexcrichton
**The implementation is a direct adaptation of libcxx's condition_variable implementation.** I also added a wait_timeout_with method, which matches the second overload in C++'s condition_variable. The implementation right now is kind of dumb but it works. There is an outstanding issue with it: as is it doesn't support the use case where a user doesn't care about poisoning and wants to continue through poison. r? @alexcrichton @aturon
2 parents ed530d7 + 08f6380 commit fb8d9f4

File tree

8 files changed

+343
-97
lines changed

8 files changed

+343
-97
lines changed

src/libstd/sync/condvar.rs

+113-10
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use prelude::v1::*;
1212

1313
use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
1414
use sync::poison::{self, LockResult};
15+
use sys::time::SteadyTime;
1516
use sys_common::condvar as sys;
1617
use sys_common::mutex as sys_mutex;
1718
use time::Duration;
@@ -153,20 +154,34 @@ impl Condvar {
153154
///
154155
/// Like `wait`, the lock specified will be re-acquired when this function
155156
/// returns, regardless of whether the timeout elapsed or not.
156-
// Note that this method is *not* public, and this is quite intentional
157-
// because we're not quite sure about the semantics of relative vs absolute
158-
// durations or how the timing guarantees play into what the system APIs
159-
// provide. There are also additional concerns about the unix-specific
160-
// implementation which may need to be addressed.
161-
#[allow(dead_code)]
162-
fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
157+
#[unstable]
158+
pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
163159
-> LockResult<(MutexGuard<'a, T>, bool)> {
164160
unsafe {
165161
let me: &'static Condvar = &*(self as *const _);
166162
me.inner.wait_timeout(guard, dur)
167163
}
168164
}
169165

166+
/// Wait on this condition variable for a notification, timing out after a
167+
/// specified duration.
168+
///
169+
/// The semantics of this function are equivalent to `wait_timeout` except
170+
/// that the implementation will repeatedly wait while the duration has not
171+
/// passed and the provided function returns `false`.
172+
#[unstable]
173+
pub fn wait_timeout_with<'a, T, F>(&self,
174+
guard: MutexGuard<'a, T>,
175+
dur: Duration,
176+
f: F)
177+
-> LockResult<(MutexGuard<'a, T>, bool)>
178+
where F: FnMut(LockResult<&mut T>) -> bool {
179+
unsafe {
180+
let me: &'static Condvar = &*(self as *const _);
181+
me.inner.wait_timeout_with(guard, dur, f)
182+
}
183+
}
184+
170185
/// Wake up one blocked thread on this condvar.
171186
///
172187
/// If there is a blocked thread on this condition variable, then it will
@@ -220,9 +235,9 @@ impl StaticCondvar {
220235
/// specified duration.
221236
///
222237
/// See `Condvar::wait_timeout`.
223-
#[allow(dead_code)] // may want to stabilize this later, see wait_timeout above
224-
fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
225-
-> LockResult<(MutexGuard<'a, T>, bool)> {
238+
#[unstable = "may be merged with Condvar in the future"]
239+
pub fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
240+
-> LockResult<(MutexGuard<'a, T>, bool)> {
226241
let (poisoned, success) = unsafe {
227242
let lock = mutex::guard_lock(&guard);
228243
self.verify(lock);
@@ -236,6 +251,50 @@ impl StaticCondvar {
236251
}
237252
}
238253

254+
/// Wait on this condition variable for a notification, timing out after a
255+
/// specified duration.
256+
///
257+
/// The implementation will repeatedly wait while the duration has not
258+
/// passed and the function returns `false`.
259+
///
260+
/// See `Condvar::wait_timeout_with`.
261+
#[unstable = "may be merged with Condvar in the future"]
262+
pub fn wait_timeout_with<'a, T, F>(&'static self,
263+
guard: MutexGuard<'a, T>,
264+
dur: Duration,
265+
mut f: F)
266+
-> LockResult<(MutexGuard<'a, T>, bool)>
267+
where F: FnMut(LockResult<&mut T>) -> bool {
268+
// This could be made more efficient by pushing the implementation into sys::condvar
269+
let start = SteadyTime::now();
270+
let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
271+
while !f(guard_result
272+
.as_mut()
273+
.map(|g| &mut **g)
274+
.map_err(|e| poison::new_poison_error(&mut **e.get_mut()))) {
275+
let now = SteadyTime::now();
276+
let consumed = &now - &start;
277+
let guard = guard_result.unwrap_or_else(|e| e.into_inner());
278+
let (new_guard_result, no_timeout) = match self.wait_timeout(guard, dur - consumed) {
279+
Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout),
280+
Err(err) => {
281+
let (new_guard, no_timeout) = err.into_inner();
282+
(Err(poison::new_poison_error(new_guard)), no_timeout)
283+
}
284+
};
285+
guard_result = new_guard_result;
286+
if !no_timeout {
287+
let result = f(guard_result
288+
.as_mut()
289+
.map(|g| &mut **g)
290+
.map_err(|e| poison::new_poison_error(&mut **e.get_mut())));
291+
return poison::map_result(guard_result, |g| (g, result));
292+
}
293+
}
294+
295+
poison::map_result(guard_result, |g| (g, true))
296+
}
297+
239298
/// Wake up one blocked thread on this condvar.
240299
///
241300
/// See `Condvar::notify_one`.
@@ -285,6 +344,7 @@ mod tests {
285344
use super::{StaticCondvar, CONDVAR_INIT};
286345
use sync::mpsc::channel;
287346
use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
347+
use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
288348
use thread::Thread;
289349
use time::Duration;
290350

@@ -372,6 +432,49 @@ mod tests {
372432
unsafe { C.destroy(); M.destroy(); }
373433
}
374434

435+
#[test]
436+
fn wait_timeout_with() {
437+
static C: StaticCondvar = CONDVAR_INIT;
438+
static M: StaticMutex = MUTEX_INIT;
439+
static S: AtomicUsize = ATOMIC_USIZE_INIT;
440+
441+
let g = M.lock().unwrap();
442+
let (g, success) = C.wait_timeout_with(g, Duration::nanoseconds(1000), |_| false).unwrap();
443+
assert!(!success);
444+
445+
let (tx, rx) = channel();
446+
let _t = Thread::scoped(move || {
447+
rx.recv().unwrap();
448+
let g = M.lock().unwrap();
449+
S.store(1, Ordering::SeqCst);
450+
C.notify_one();
451+
drop(g);
452+
453+
rx.recv().unwrap();
454+
let g = M.lock().unwrap();
455+
S.store(2, Ordering::SeqCst);
456+
C.notify_one();
457+
drop(g);
458+
459+
rx.recv().unwrap();
460+
let _g = M.lock().unwrap();
461+
S.store(3, Ordering::SeqCst);
462+
C.notify_one();
463+
});
464+
465+
let mut state = 0;
466+
let (_g, success) = C.wait_timeout_with(g, Duration::days(1), |_| {
467+
assert_eq!(state, S.load(Ordering::SeqCst));
468+
tx.send(()).unwrap();
469+
state += 1;
470+
match state {
471+
1|2 => false,
472+
_ => true,
473+
}
474+
}).unwrap();
475+
assert!(success);
476+
}
477+
375478
#[test]
376479
#[should_fail]
377480
fn two_mutexes() {

src/libstd/sync/poison.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,23 @@ impl<T> fmt::Show for PoisonError<T> {
9999
impl<T> PoisonError<T> {
100100
/// Consumes this error indicating that a lock is poisoned, returning the
101101
/// underlying guard to allow access regardless.
102-
#[stable]
102+
#[deprecated="renamed to into_inner"]
103103
pub fn into_guard(self) -> T { self.guard }
104+
105+
/// Consumes this error indicating that a lock is poisoned, returning the
106+
/// underlying guard to allow access regardless.
107+
#[unstable]
108+
pub fn into_inner(self) -> T { self.guard }
109+
110+
/// Reaches into this error indicating that a lock is poisoned, returning a
111+
/// reference to the underlying guard to allow access regardless.
112+
#[unstable]
113+
pub fn get_ref(&self) -> &T { &self.guard }
114+
115+
/// Reaches into this error indicating that a lock is poisoned, returning a
116+
/// mutable reference to the underlying guard to allow access regardless.
117+
#[unstable]
118+
pub fn get_mut(&mut self) -> &mut T { &mut self.guard }
104119
}
105120

106121
impl<T> FromError<PoisonError<T>> for TryLockError<T> {

src/libstd/sys/unix/condvar.rs

+36-20
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010

1111
use cell::UnsafeCell;
1212
use libc;
13+
use std::option::Option::{Some, None};
1314
use sys::mutex::{self, Mutex};
15+
use sys::time;
1416
use sys::sync as ffi;
1517
use time::Duration;
18+
use num::{Int, NumCast};
1619

1720
pub struct Condvar { inner: UnsafeCell<ffi::pthread_cond_t> }
1821

@@ -46,33 +49,46 @@ impl Condvar {
4649
debug_assert_eq!(r, 0);
4750
}
4851

52+
// This implementation is modeled after libcxx's condition_variable
53+
// https://github.com/llvm-mirror/libcxx/blob/release_35/src/condition_variable.cpp#L46
54+
// https://github.com/llvm-mirror/libcxx/blob/release_35/include/__mutex_base#L367
4955
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
50-
assert!(dur >= Duration::nanoseconds(0));
56+
if dur <= Duration::zero() {
57+
return false;
58+
}
5159

52-
// First, figure out what time it currently is
53-
let mut tv = libc::timeval { tv_sec: 0, tv_usec: 0 };
54-
let r = ffi::gettimeofday(&mut tv, 0 as *mut _);
60+
// First, figure out what time it currently is, in both system and stable time.
61+
// pthread_cond_timedwait uses system time, but we want to report timeout based on stable
62+
// time.
63+
let mut sys_now = libc::timeval { tv_sec: 0, tv_usec: 0 };
64+
let stable_now = time::SteadyTime::now();
65+
let r = ffi::gettimeofday(&mut sys_now, 0 as *mut _);
5566
debug_assert_eq!(r, 0);
5667

57-
// Offset that time with the specified duration
58-
let abs = Duration::seconds(tv.tv_sec as i64) +
59-
Duration::microseconds(tv.tv_usec as i64) +
60-
dur;
61-
let ns = abs.num_nanoseconds().unwrap() as u64;
62-
let timeout = libc::timespec {
63-
tv_sec: (ns / 1000000000) as libc::time_t,
64-
tv_nsec: (ns % 1000000000) as libc::c_long,
68+
let seconds = NumCast::from(dur.num_seconds());
69+
let timeout = match seconds.and_then(|s| sys_now.tv_sec.checked_add(s)) {
70+
Some(sec) => {
71+
libc::timespec {
72+
tv_sec: sec,
73+
tv_nsec: (dur - Duration::seconds(dur.num_seconds()))
74+
.num_nanoseconds().unwrap() as libc::c_long,
75+
}
76+
}
77+
None => {
78+
libc::timespec {
79+
tv_sec: Int::max_value(),
80+
tv_nsec: 1_000_000_000 - 1,
81+
}
82+
}
6583
};
6684

6785
// And wait!
68-
let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex),
69-
&timeout);
70-
if r != 0 {
71-
debug_assert_eq!(r as int, libc::ETIMEDOUT as int);
72-
false
73-
} else {
74-
true
75-
}
86+
let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex), &timeout);
87+
debug_assert!(r == libc::ETIMEDOUT || r == 0);
88+
89+
// ETIMEDOUT is not a totally reliable method of determining timeout due to clock shifts,
90+
// so do the check ourselves
91+
&time::SteadyTime::now() - &stable_now < dur
7692
}
7793

7894
#[inline]

src/libstd/sys/unix/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub mod sync;
5252
pub mod tcp;
5353
pub mod thread;
5454
pub mod thread_local;
55+
pub mod time;
5556
pub mod timer;
5657
pub mod tty;
5758
pub mod udp;

0 commit comments

Comments
 (0)