Skip to content

Commit c1cb916

Browse files
committed
Implement async_std::sync::Condvar
Part of async-rs#217
1 parent ec23632 commit c1cb916

File tree

3 files changed

+386
-0
lines changed

3 files changed

+386
-0
lines changed

Diff for: src/sync/condvar.rs

+380
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,380 @@
1+
use std::pin::Pin;
2+
use std::sync::atomic::{AtomicBool, Ordering};
3+
use std::time::Duration;
4+
5+
use futures_timer::Delay;
6+
use slab::Slab;
7+
8+
use super::mutex::{guard_lock, MutexGuard};
9+
use crate::future::Future;
10+
use crate::task::{Context, Poll, Waker};
11+
12+
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
13+
pub struct WaitTimeoutResult(bool);
14+
15+
/// A type indicating whether a timed wait on a condition variable returned due to a time out or
16+
/// not
17+
impl WaitTimeoutResult {
18+
/// Returns `true` if the wait was known to have timed out.
19+
pub fn timed_out(&self) -> bool {
20+
self.0
21+
}
22+
}
23+
24+
/// A Condition Variable
25+
///
26+
/// This type is an async version of [`std::sync::Mutex`].
27+
///
28+
/// [`std::sync::Condvar`]: https://doc.rust-lang.org/std/sync/struct.Condvar.html
29+
///
30+
/// # Examples
31+
///
32+
/// ```
33+
/// # fn main() { async_std::task::block_on(async {
34+
/// #
35+
/// use std::sync::Arc;
36+
///
37+
/// use async_std::sync::{Mutex, Condvar};
38+
/// use async_std::task;
39+
///
40+
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
41+
/// let pair2 = pair.clone();
42+
///
43+
/// // Inside of our lock, spawn a new thread, and then wait for it to start.
44+
/// task::spawn(async move {
45+
/// let (lock, cvar) = &*pair2;
46+
/// let mut started = lock.lock().await;
47+
/// *started = true;
48+
/// // We notify the condvar that the value has changed.
49+
/// cvar.notify_one();
50+
/// });
51+
///
52+
/// // Wait for the thread to start up.
53+
/// let (lock, cvar) = &*pair;
54+
/// let mut started = lock.lock().await;
55+
/// while !*started {
56+
/// started = cvar.wait(started).await;
57+
/// }
58+
///
59+
/// # }) }
60+
/// ```
61+
#[derive(Debug)]
62+
pub struct Condvar {
63+
has_blocked: AtomicBool,
64+
blocked: std::sync::Mutex<Slab<Option<Waker>>>,
65+
}
66+
67+
impl Condvar {
68+
/// Creates a new condition variable
69+
///
70+
/// # Examples
71+
///
72+
/// ```
73+
/// use async_std::sync::Condvar;
74+
///
75+
/// let cvar = Condvar::new();
76+
/// ```
77+
pub fn new() -> Self {
78+
Condvar {
79+
has_blocked: AtomicBool::new(false),
80+
blocked: std::sync::Mutex::new(Slab::new()),
81+
}
82+
}
83+
84+
/// Blocks the current task until this condition variable receives a notification.
85+
///
86+
/// Unlike the std equivalent, this does not check that a single mutex is used at runtime.
87+
/// However, as a best practice avoid using with multiple mutexes.
88+
///
89+
/// # Examples
90+
///
91+
/// ```
92+
/// # fn main() { async_std::task::block_on(async {
93+
/// use std::sync::Arc;
94+
///
95+
/// use async_std::sync::{Mutex, Condvar};
96+
/// use async_std::task;
97+
///
98+
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
99+
/// let pair2 = pair.clone();
100+
///
101+
/// task::spawn(async move {
102+
/// let (lock, cvar) = &*pair2;
103+
/// let mut started = lock.lock().await;
104+
/// *started = true;
105+
/// // We notify the condvar that the value has changed.
106+
/// cvar.notify_one();
107+
/// });
108+
///
109+
/// // Wait for the thread to start up.
110+
/// let (lock, cvar) = &*pair;
111+
/// let mut started = lock.lock().await;
112+
/// while !*started {
113+
/// started = cvar.wait(started).await;
114+
/// }
115+
/// # }) }
116+
/// ```
117+
pub async fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
118+
let mutex = guard_lock(&guard);
119+
120+
self.await_notify(guard).await;
121+
122+
mutex.lock().await
123+
}
124+
125+
fn await_notify<'a, T>(&self, guard: MutexGuard<'a, T>) -> AwaitNotify<'_, 'a, T> {
126+
AwaitNotify {
127+
cond: self,
128+
guard: Some(guard),
129+
key: None,
130+
}
131+
}
132+
133+
/// Blocks the current taks until this condition variable receives a notification and the
134+
/// required condition is met. Spurious wakeups are ignored and this function will only
135+
/// return once the condition has been met.
136+
///
137+
/// # Examples
138+
///
139+
/// ```
140+
/// # fn main() { async_std::task::block_on(async {
141+
/// #
142+
/// use std::sync::Arc;
143+
///
144+
/// use async_std::sync::{Mutex, Condvar};
145+
/// use async_std::task;
146+
///
147+
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
148+
/// let pair2 = pair.clone();
149+
///
150+
/// task::spawn(async move {
151+
/// let (lock, cvar) = &*pair2;
152+
/// let mut started = lock.lock().await;
153+
/// *started = true;
154+
/// // We notify the condvar that the value has changed.
155+
/// cvar.notify_one();
156+
/// });
157+
///
158+
/// // Wait for the thread to start up.
159+
/// let (lock, cvar) = &*pair;
160+
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
161+
/// let _guard = cvar.wait_until(lock.lock().await, |started| { *started }).await;
162+
/// #
163+
/// # }) }
164+
/// ```
165+
#[cfg(feature = "unstable")]
166+
pub async fn wait_until<'a, T, F>(
167+
&self,
168+
mut guard: MutexGuard<'a, T>,
169+
mut condition: F,
170+
) -> MutexGuard<'a, T>
171+
where
172+
F: FnMut(&mut T) -> bool,
173+
{
174+
while !condition(&mut *guard) {
175+
guard = self.wait(guard).await;
176+
}
177+
guard
178+
}
179+
180+
/// Waits on this condition variable for a notification, timing out after a specified duration.
181+
///
182+
/// # Examples
183+
///
184+
/// ```
185+
/// # fn main() { async_std::task::block_on(async {
186+
/// #
187+
/// use std::sync::Arc;
188+
/// use std::time::Duration;
189+
///
190+
/// use async_std::sync::{Mutex, Condvar};
191+
/// use async_std::task;
192+
///
193+
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
194+
/// let pair2 = pair.clone();
195+
///
196+
/// task::spawn(async move {
197+
/// let (lock, cvar) = &*pair2;
198+
/// let mut started = lock.lock().await;
199+
/// *started = true;
200+
/// // We notify the condvar that the value has changed.
201+
/// cvar.notify_one();
202+
/// });
203+
///
204+
/// // wait for the thread to start up
205+
/// let (lock, cvar) = &*pair;
206+
/// let mut started = lock.lock().await;
207+
/// loop {
208+
/// let result = cvar.wait_timeout(started, Duration::from_millis(10)).await;
209+
/// started = result.0;
210+
/// if *started == true {
211+
/// // We received the notification and the value has been updated, we can leave.
212+
/// break
213+
/// }
214+
/// }
215+
/// #
216+
/// # }) }
217+
/// ```
218+
pub async fn wait_timeout<'a, T>(
219+
&self,
220+
guard: MutexGuard<'a, T>,
221+
dur: Duration,
222+
) -> (MutexGuard<'a, T>, WaitTimeoutResult) {
223+
let mutex = guard_lock(&guard);
224+
let timeout_result = TimeoutWaitFuture {
225+
await_notify: self.await_notify(guard),
226+
delay: Delay::new(dur),
227+
}
228+
.await;
229+
230+
(mutex.lock().await, timeout_result)
231+
}
232+
233+
/// Wakes up one blocked task on this condvar.
234+
///
235+
/// # Examples
236+
///
237+
/// ```
238+
/// # fn main() { async_std::task::block_on(async {
239+
/// use std::sync::Arc;
240+
///
241+
/// use async_std::sync::{Mutex, Condvar};
242+
/// use async_std::task;
243+
///
244+
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
245+
/// let pair2 = pair.clone();
246+
///
247+
/// task::spawn(async move {
248+
/// let (lock, cvar) = &*pair2;
249+
/// let mut started = lock.lock().await;
250+
/// *started = true;
251+
/// // We notify the condvar that the value has changed.
252+
/// cvar.notify_one();
253+
/// });
254+
///
255+
/// // Wait for the thread to start up.
256+
/// let (lock, cvar) = &*pair;
257+
/// let mut started = lock.lock().await;
258+
/// while !*started {
259+
/// started = cvar.wait(started).await;
260+
/// }
261+
/// # }) }
262+
/// ```
263+
pub fn notify_one(&self) {
264+
if self.has_blocked.load(Ordering::Acquire) {
265+
let mut blocked = self.blocked.lock().unwrap();
266+
if let Some((_, opt_waker)) = blocked.iter_mut().next() {
267+
if let Some(w) = opt_waker.take() {
268+
w.wake();
269+
}
270+
}
271+
}
272+
}
273+
274+
/// Wakes up all blocked tasks on this condvar.
275+
///
276+
/// # Examples
277+
/// ```
278+
/// # fn main() { async_std::task::block_on(async {
279+
/// #
280+
/// use std::sync::Arc;
281+
///
282+
/// use async_std::sync::{Mutex, Condvar};
283+
/// use async_std::task;
284+
///
285+
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
286+
/// let pair2 = pair.clone();
287+
///
288+
/// task::spawn(async move {
289+
/// let (lock, cvar) = &*pair2;
290+
/// let mut started = lock.lock().await;
291+
/// *started = true;
292+
/// // We notify the condvar that the value has changed.
293+
/// cvar.notify_all();
294+
/// });
295+
///
296+
/// // Wait for the thread to start up.
297+
/// let (lock, cvar) = &*pair;
298+
/// let mut started = lock.lock().await;
299+
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
300+
/// while !*started {
301+
/// started = cvar.wait(started).await;
302+
/// }
303+
/// #
304+
/// # }) }
305+
/// ```
306+
pub fn notify_all(&self) {
307+
if self.has_blocked.load(Ordering::Acquire) {
308+
let mut blocked = self.blocked.lock().unwrap();
309+
for (_, opt_waker) in blocked.iter_mut() {
310+
if let Some(w) = opt_waker.take() {
311+
w.wake();
312+
}
313+
}
314+
}
315+
}
316+
}
317+
318+
struct AwaitNotify<'a, 'b, T> {
319+
cond: &'a Condvar,
320+
guard: Option<MutexGuard<'b, T>>,
321+
key: Option<usize>,
322+
}
323+
324+
impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> {
325+
type Output = ();
326+
327+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
328+
match self.guard.take() {
329+
Some(_) => {
330+
let mut blocked = self.cond.blocked.lock().unwrap();
331+
let w = cx.waker().clone();
332+
self.key = Some(blocked.insert(Some(w)));
333+
334+
if blocked.len() == 1 {
335+
self.cond.has_blocked.store(true, Ordering::Relaxed);
336+
}
337+
// the guard is dropped when we return, which frees the lock
338+
Poll::Pending
339+
}
340+
None => Poll::Ready(()),
341+
}
342+
}
343+
}
344+
345+
impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> {
346+
fn drop(&mut self) {
347+
if let Some(key) = self.key {
348+
let mut blocked = self.cond.blocked.lock().unwrap();
349+
blocked.remove(key);
350+
351+
if blocked.is_empty() {
352+
self.cond.has_blocked.store(false, Ordering::Relaxed);
353+
}
354+
}
355+
}
356+
}
357+
358+
struct TimeoutWaitFuture<'a, 'b, T> {
359+
await_notify: AwaitNotify<'a, 'b, T>,
360+
delay: Delay,
361+
}
362+
363+
impl<'a, 'b, T> TimeoutWaitFuture<'a, 'b, T> {
364+
pin_utils::unsafe_pinned!(await_notify: AwaitNotify<'a, 'b, T>);
365+
pin_utils::unsafe_pinned!(delay: Delay);
366+
}
367+
368+
impl<'a, 'b, T> Future for TimeoutWaitFuture<'a, 'b, T> {
369+
type Output = WaitTimeoutResult;
370+
371+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
372+
match self.as_mut().await_notify().poll(cx) {
373+
Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(false)),
374+
Poll::Pending => match self.delay().poll(cx) {
375+
Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(true)),
376+
Poll::Pending => Poll::Pending,
377+
},
378+
}
379+
}
380+
}

Diff for: src/sync/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
#[doc(inline)]
3333
pub use std::sync::{Arc, Weak};
3434

35+
pub use condvar::Condvar;
3536
pub use mutex::{Mutex, MutexGuard};
3637
pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
3738

39+
mod condvar;
3840
mod mutex;
3941
mod rwlock;
4042

0 commit comments

Comments
 (0)