diff --git a/benches/mutex.rs b/benches/mutex.rs new file mode 100644 index 000000000..810e41a38 --- /dev/null +++ b/benches/mutex.rs @@ -0,0 +1,66 @@ +#![feature(test)] + +extern crate test; + +use async_std::future::Future; +use async_std::sync::{Arc, Mutex}; +use async_std::task; +use futures::task::noop_waker; +use test::Bencher; + +async fn test(task: usize, iter: usize) { + let mutex = Arc::new(Mutex::new(())); + let mut vec = Vec::new(); + for _ in 0..task { + let mutex_clone = mutex.clone(); + let handle = async_std::task::spawn(async move { + for _ in 0..iter { + let _ = mutex_clone.lock().await; + } + }); + vec.push(handle); + } + for i in vec { + i.await + } +} + +#[bench] +fn mutex_contention(b: &mut Bencher) { + b.iter(|| task::block_on(test(10, 1000))); +} + +#[bench] +fn mutex_no_contention(b: &mut Bencher) { + b.iter(|| task::block_on(test(1, 10000))); +} + +#[bench] +fn mutex_unused(b: &mut Bencher) { + b.iter(|| Mutex::new(())); +} + +#[bench] +fn mutex_mimick_contention(b: &mut Bencher) { + let noop_waker = noop_waker(); + let mut context = task::Context::from_waker(&noop_waker); + + b.iter(|| { + let mutex = Mutex::new(()); + let mut vec = Vec::with_capacity(10); + + // Mimick 10 tasks concurrently trying to acquire the lock. + for _ in 0..10 { + let mut lock_future = Box::pin(mutex.lock()); + let poll_result = lock_future.as_mut().poll(&mut context); + vec.push((lock_future, poll_result)); + } + + // Go through all 10 tasks and release the lock. + for (mut future, mut poll) in vec { + while let task::Poll::Pending = poll { + poll = future.as_mut().poll(&mut context); + } + } + }); +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index be74d8f7b..983063145 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -37,6 +37,7 @@ pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; mod mutex; mod rwlock; +mod waker_list; cfg_unstable! { pub use barrier::{Barrier, BarrierWaitResult}; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index cd7a3577f..cc370d696 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -4,10 +4,11 @@ use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use slab::Slab; - use crate::future::Future; -use crate::task::{Context, Poll, Waker}; +use crate::task::{Context, Poll}; + +use super::waker_list::{WakerList, WakerListLock}; +use std::num::NonZeroUsize; /// Set if the mutex is locked. const LOCK: usize = 1; @@ -15,6 +16,171 @@ const LOCK: usize = 1; /// Set if there are tasks blocked on the mutex. const BLOCKED: usize = 1 << 1; +struct RawMutex { + state: AtomicUsize, + blocked: WakerListLock, +} + +unsafe impl Send for RawMutex {} +unsafe impl Sync for RawMutex {} + +impl RawMutex { + /// Creates a new raw mutex. + #[inline] + pub fn new() -> RawMutex { + RawMutex { + state: AtomicUsize::new(0), + blocked: WakerListLock::new(WakerList::new()), + } + } + + /// Acquires the lock. + /// + /// We don't use `async` signature here for performance concern. + #[inline] + pub fn lock(&self) -> RawLockFuture<'_> { + RawLockFuture { + mutex: self, + opt_key: None, + } + } + + /// Attempts to acquire the lock. + #[inline] + pub fn try_lock(&self) -> bool { + self.state.fetch_or(LOCK, Ordering::Acquire) & LOCK == 0 + } + + #[cold] + fn unlock_slow(&self) { + let mut blocked = self.blocked.lock(); + blocked.wake_one_weak(); + } + + /// Unlock this mutex. + #[inline] + pub fn unlock(&self) { + let state = self.state.fetch_and(!LOCK, Ordering::Release); + + // If there are any blocked tasks, wake one of them up. + if state & BLOCKED != 0 { + self.unlock_slow(); + } + } +} + +struct RawLockFuture<'a> { + mutex: &'a RawMutex, + /// None indicates that the Future isn't yet polled, or has already returned `Ready`. + /// RawLockFuture does not distinguish between these two states. + opt_key: Option, +} + +impl<'a> RawLockFuture<'a> { + /// Remove waker registration. This should be called upon successful acqusition of the lock. + #[cold] + fn deregister_waker(&mut self, acquired: bool) { + if let Some(key) = self.opt_key.take() { + let mut blocked = self.mutex.blocked.lock(); + let opt_waker = unsafe { blocked.remove(key) }; + + if opt_waker.is_none() && !acquired { + // We were awoken but didn't acquire the lock. Wake up another task. + blocked.wake_one_weak(); + } + + if blocked.is_empty() { + self.mutex.state.fetch_and(!BLOCKED, Ordering::Relaxed); + } + } + } + + /// The cold path where the first poll of a mutex will cause the mutex to block. + #[cold] + fn poll_would_block(&mut self, cx: &mut Context<'_>) -> Poll<()> { + let mut blocked = self.mutex.blocked.lock(); + + // Try locking again because it's possible the mutex got unlocked before + // we acquire the lock of `blocked`. + let state = self.mutex.state.fetch_or(LOCK | BLOCKED, Ordering::Relaxed); + if state & LOCK == 0 { + return Poll::Ready(()); + } + + // Register the current task. + // Insert a new entry into the list of blocked tasks. + let w = cx.waker().clone(); + let key = blocked.insert(Some(w)); + self.opt_key = Some(key); + + Poll::Pending + } + + /// The cold path where we are polling an already-blocked mutex + #[cold] + fn poll_blocked(&mut self, cx: &mut Context<'_>) -> Poll<()> { + if self.mutex.try_lock() { + self.deregister_waker(true); + Poll::Ready(()) + } else { + let mut blocked = self.mutex.blocked.lock(); + + // Try locking again because it's possible the mutex got unlocked before + // we acquire the lock of `blocked`. On this path we know we have BLOCKED + // set, so don't bother to set it again. + if self.mutex.try_lock() { + std::mem::drop(blocked); + self.deregister_waker(true); + return Poll::Ready(()); + } + + // There is already an entry in the list of blocked tasks. Just + // reset the waker if it was removed. + let opt_waker = unsafe { blocked.get(self.opt_key.unwrap()) }; + if opt_waker.is_none() { + let w = cx.waker().clone(); + *opt_waker = Some(w); + } + + Poll::Pending + } + } + + /// Cold path of drop. Only to be hit when locking is cancelled. + #[cold] + fn drop_slow(&mut self) { + self.deregister_waker(false); + } +} + +impl<'a> Future for RawLockFuture<'a> { + type Output = (); + + #[inline] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.opt_key { + None => { + if self.mutex.try_lock() { + Poll::Ready(()) + } else { + self.poll_would_block(cx) + } + } + Some(_) => self.poll_blocked(cx), + } + } +} + +impl Drop for RawLockFuture<'_> { + #[inline] + fn drop(&mut self) { + if self.opt_key.is_some() { + // This cold path is only going to be reached when we drop the future when locking is cancelled. + self.drop_slow(); + } + } +} + /// A mutual exclusion primitive for protecting shared data. /// /// This type is an async version of [`std::sync::Mutex`]. @@ -49,8 +215,7 @@ const BLOCKED: usize = 1 << 1; /// # }) /// ``` pub struct Mutex { - state: AtomicUsize, - blocked: std::sync::Mutex>>, + mutex: RawMutex, value: UnsafeCell, } @@ -67,10 +232,10 @@ impl Mutex { /// /// let mutex = Mutex::new(0); /// ``` + #[inline] pub fn new(t: T) -> Mutex { Mutex { - state: AtomicUsize::new(0), - blocked: std::sync::Mutex::new(Slab::new()), + mutex: RawMutex::new(), value: UnsafeCell::new(t), } } @@ -102,88 +267,8 @@ impl Mutex { /// # }) /// ``` pub async fn lock(&self) -> MutexGuard<'_, T> { - pub struct LockFuture<'a, T> { - mutex: &'a Mutex, - opt_key: Option, - acquired: bool, - } - - impl<'a, T> Future for LockFuture<'a, T> { - type Output = MutexGuard<'a, T>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.mutex.try_lock() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } - None => { - let mut blocked = self.mutex.blocked.lock().unwrap(); - - // Register the current task. - match self.opt_key { - None => { - // Insert a new entry into the list of blocked tasks. - let w = cx.waker().clone(); - let key = blocked.insert(Some(w)); - self.opt_key = Some(key); - - if blocked.len() == 1 { - self.mutex.state.fetch_or(BLOCKED, Ordering::Relaxed); - } - } - Some(key) => { - // There is already an entry in the list of blocked tasks. Just - // reset the waker if it was removed. - if blocked[key].is_none() { - let w = cx.waker().clone(); - blocked[key] = Some(w); - } - } - } - - // Try locking again because it's possible the mutex got unlocked just - // before the current task was registered as a blocked task. - match self.mutex.try_lock() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } - None => Poll::Pending, - } - } - } - } - } - - impl Drop for LockFuture<'_, T> { - fn drop(&mut self) { - if let Some(key) = self.opt_key { - let mut blocked = self.mutex.blocked.lock().unwrap(); - let opt_waker = blocked.remove(key); - - if opt_waker.is_none() && !self.acquired { - // We were awoken but didn't acquire the lock. Wake up another task. - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - } - } - } - - if blocked.is_empty() { - self.mutex.state.fetch_and(!BLOCKED, Ordering::Relaxed); - } - } - } - } - - LockFuture { - mutex: self, - opt_key: None, - acquired: false, - } - .await + self.mutex.lock().await; + MutexGuard(self) } /// Attempts to acquire the lock. @@ -219,8 +304,9 @@ impl Mutex { /// # /// # }) /// ``` + #[inline] pub fn try_lock(&self) -> Option> { - if self.state.fetch_or(LOCK, Ordering::Acquire) & LOCK == 0 { + if self.mutex.try_lock() { Some(MutexGuard(self)) } else { None @@ -237,6 +323,7 @@ impl Mutex { /// let mutex = Mutex::new(10); /// assert_eq!(mutex.into_inner(), 10); /// ``` + #[inline] pub fn into_inner(self) -> T { self.value.into_inner() } @@ -259,6 +346,7 @@ impl Mutex { /// # /// # }) /// ``` + #[inline] pub fn get_mut(&mut self) -> &mut T { unsafe { &mut *self.value.get() } } @@ -284,12 +372,14 @@ impl fmt::Debug for Mutex { } impl From for Mutex { + #[inline] fn from(val: T) -> Mutex { Mutex::new(val) } } impl Default for Mutex { + #[inline] fn default() -> Mutex { Mutex::new(Default::default()) } @@ -302,20 +392,9 @@ unsafe impl Send for MutexGuard<'_, T> {} unsafe impl Sync for MutexGuard<'_, T> {} impl Drop for MutexGuard<'_, T> { + #[inline] fn drop(&mut self) { - let state = self.0.state.fetch_and(!LOCK, Ordering::AcqRel); - - // If there are any blocked tasks, wake one of them up. - if state & BLOCKED != 0 { - let mut blocked = self.0.blocked.lock().unwrap(); - - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - } - } - } + self.0.mutex.unlock(); } } @@ -334,12 +413,14 @@ impl fmt::Display for MutexGuard<'_, T> { impl Deref for MutexGuard<'_, T> { type Target = T; + #[inline] fn deref(&self) -> &T { unsafe { &*self.0.value.get() } } } impl DerefMut for MutexGuard<'_, T> { + #[inline] fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.0.value.get() } } diff --git a/src/sync/waker_list.rs b/src/sync/waker_list.rs new file mode 100644 index 000000000..8c31003c1 --- /dev/null +++ b/src/sync/waker_list.rs @@ -0,0 +1,198 @@ +use crate::task::Waker; + +use crossbeam_utils::Backoff; +use std::num::NonZeroUsize; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +struct WakerNode { + /// Previous `WakerNode` in the queue. If this node is the first node, it shall point to the last node. + prev_in_queue: *mut WakerNode, + /// Next `WakerNode` in the queue. If this node is the last node, it shall be null. + next_in_queue: *mut WakerNode, + waker: Option, +} + +pub struct WakerList { + head: *mut WakerNode, +} + +unsafe impl Send for WakerList {} +unsafe impl Sync for WakerList {} + +impl WakerList { + /// Create a new empty `WakerList` + #[inline] + pub fn new() -> Self { + Self { + head: std::ptr::null_mut(), + } + } + + /// Insert a waker to the back of the list, and return its key. + pub fn insert(&mut self, waker: Option) -> NonZeroUsize { + let node = Box::into_raw(Box::new(WakerNode { + waker, + next_in_queue: std::ptr::null_mut(), + prev_in_queue: std::ptr::null_mut(), + })); + + if self.head.is_null() { + unsafe { + (*node).prev_in_queue = node; + } + self.head = node; + } else { + unsafe { + let prev = std::mem::replace(&mut (*self.head).prev_in_queue, node); + (*prev).next_in_queue = node; + (*node).prev_in_queue = prev; + } + } + + unsafe { NonZeroUsize::new_unchecked(node as usize) } + } + + /// Remove a waker by its key. + /// + /// # Safety + /// This function is unsafe because there is no guarantee that key is the previously returned + /// key, and that the key is only removed once. + pub unsafe fn remove(&mut self, key: NonZeroUsize) -> Option { + let node = key.get() as *mut WakerNode; + let prev = (*node).prev_in_queue; + let next = (*node).next_in_queue; + + // Special treatment on removing first node + if self.head == node { + self.head = next; + } else { + std::mem::replace(&mut (*prev).next_in_queue, next); + } + + // Special treatment on removing last node + if next.is_null() { + if !self.head.is_null() { + std::mem::replace(&mut (*self.head).prev_in_queue, prev); + } + } else { + std::mem::replace(&mut (*next).prev_in_queue, prev); + } + + Box::from_raw(node).waker + } + + /// Get a waker by its key. + /// + /// # Safety + /// This function is unsafe because there is no guarantee that key is the previously returned + /// key, and that the key is not removed. + pub unsafe fn get(&mut self, key: NonZeroUsize) -> &mut Option { + &mut (*(key.get() as *mut WakerNode)).waker + } + + /// Check if this list is empty. + pub fn is_empty(&self) -> bool { + self.head.is_null() + } + + /// Get an iterator over all wakers. + pub fn iter_mut(&mut self) -> Iter<'_> { + Iter { + ptr: self.head, + _marker: std::marker::PhantomData, + } + } + + /// Wake the first waker in the list, and convert it to `None`. This function is named `weak` as + /// nothing is performed when the first waker is waken already. + pub fn wake_one_weak(&mut self) { + if let Some(opt_waker) = self.iter_mut().next() { + if let Some(w) = opt_waker.take() { + w.wake(); + } + } + } +} + +pub struct Iter<'a> { + ptr: *mut WakerNode, + _marker: std::marker::PhantomData<&'a ()>, +} + +impl<'a> Iterator for Iter<'a> { + type Item = &'a mut Option; + + fn next(&mut self) -> Option { + if self.ptr.is_null() { + return None; + } + let next = unsafe { (*self.ptr).next_in_queue }; + let ptr = std::mem::replace(&mut self.ptr, next); + Some(unsafe { &mut (*ptr).waker }) + } +} + +/// This is identical to a Spinlock, but is efficient in space and occupies the same +/// amount of memory as a pointer. Performance-wise it can be better than a spinlock because once +/// the lock is acquired, modification to the WakerList is on the stack, and thus is not on the +/// same cache line as the atomic variable itself, relieving some contention. +pub struct WakerListLock { + /// If this value is 1, it represents that it is locked. + /// All other values represent a valid `*mut WakerNode`. + value: AtomicUsize, +} + +impl WakerListLock { + /// Returns a new pointer lock initialized with `value`. + #[inline] + pub fn new(value: WakerList) -> Self { + Self { + value: AtomicUsize::new(value.head as usize), + } + } + + /// Locks the `WakerListLock`. + pub fn lock(&self) -> WakerListLockGuard<'_> { + let backoff = Backoff::new(); + loop { + let value = self.value.swap(1, Ordering::Acquire); + if value != 1 { + return WakerListLockGuard { + parent: self, + list: WakerList { + head: value as *mut WakerNode, + }, + }; + } + backoff.snooze(); + } + } +} + +pub struct WakerListLockGuard<'a> { + parent: &'a WakerListLock, + list: WakerList, +} + +impl<'a> Drop for WakerListLockGuard<'a> { + fn drop(&mut self) { + self.parent + .value + .store(self.list.head as usize, Ordering::Release); + } +} + +impl<'a> Deref for WakerListLockGuard<'a> { + type Target = WakerList; + + fn deref(&self) -> &WakerList { + &self.list + } +} + +impl<'a> DerefMut for WakerListLockGuard<'a> { + fn deref_mut(&mut self) -> &mut WakerList { + &mut self.list + } +}