-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathheader.rs
193 lines (168 loc) · 6.48 KB
/
header.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
use core::alloc::Layout;
use core::cell::UnsafeCell;
use core::fmt;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::Waker;
use crate::raw::TaskVTable;
use crate::state::*;
use crate::utils::{abort_on_panic, extend};
/// The header of a task.
///
/// This header is stored right at the beginning of every heap-allocated task.
pub(crate) struct Header {
/// Current state of the task.
///
/// Contains flags representing the current state and the reference count.
pub(crate) state: AtomicUsize,
/// The task that is blocked on the `JoinHandle`.
///
/// This waker needs to be woken up once the task completes or is closed.
pub(crate) awaiter: UnsafeCell<Option<Waker>>,
/// The virtual table.
///
/// In addition to the actual waker virtual table, it also contains pointers to several other
/// methods necessary for bookkeeping the heap-allocated task.
pub(crate) vtable: &'static TaskVTable,
}
impl Header {
/// Cancels the task.
///
/// This method will mark the task as closed and notify the awaiter, but it won't reschedule
/// the task if it's not completed.
pub(crate) fn cancel(&self) {
let mut state = self.state.load(Ordering::Acquire);
loop {
// If the task has been completed or closed, it can't be cancelled.
if state & (COMPLETED | CLOSED) != 0 {
break;
}
// Mark the task as closed.
match self.state.compare_exchange_weak(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Notify the awaiter that the task has been closed.
if state & AWAITER != 0 {
self.notify(None);
}
break;
}
Err(s) => state = s,
}
}
}
/// Notifies the awaiter blocked on this task.
///
/// If the awaiter is the same as the current waker, it will not be notified.
#[inline]
pub(crate) fn notify(&self, current: Option<&Waker>) {
// Mark the awaiter as being notified.
let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel);
// If the awaiter was not being notified nor registered...
if state & (NOTIFYING | REGISTERING) == 0 {
// Take the waker out.
let waker = unsafe { (*self.awaiter.get()).take() };
// Mark the state as not being notified anymore nor containing an awaiter.
self.state
.fetch_and(!NOTIFYING & !AWAITER, Ordering::Release);
if let Some(w) = waker {
// We need a safeguard against panics because waking can panic.
abort_on_panic(|| match current {
None => w.wake(),
Some(c) if !w.will_wake(c) => w.wake(),
Some(_) => {}
});
}
}
}
/// Registers a new awaiter blocked on this task.
///
/// This method is called when `JoinHandle` is polled and the task has not completed.
#[inline]
pub(crate) fn register(&self, waker: &Waker) {
// Load the state and synchronize with it.
let mut state = self.state.fetch_or(0, Ordering::Acquire);
loop {
// There can't be two concurrent registrations because `JoinHandle` can only be polled
// by a unique pinned reference.
debug_assert!(state & REGISTERING == 0);
// If the state is being notified at this moment, just wake and return without
// registering.
if state & NOTIFYING != 0 {
waker.wake_by_ref();
return;
}
// Mark the state to let other threads know we're registering a new awaiter.
match self.state.compare_exchange_weak(
state,
state | REGISTERING,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
state |= REGISTERING;
break;
}
Err(s) => state = s,
}
}
// Put the waker into the awaiter field.
unsafe {
abort_on_panic(|| (*self.awaiter.get()) = Some(waker.clone()));
}
// This variable will contain the newly registered waker if a notification comes in before
// we complete registration.
let mut waker = None;
loop {
// If there was a notification, take the waker out of the awaiter field.
if state & NOTIFYING != 0 {
if let Some(w) = unsafe { (*self.awaiter.get()).take() } {
waker = Some(w);
}
}
// The new state is not being notified nor registered, but there might or might not be
// an awaiter depending on whether there was a concurrent notification.
let new = if waker.is_none() {
(state & !NOTIFYING & !REGISTERING) | AWAITER
} else {
state & !NOTIFYING & !REGISTERING & !AWAITER
};
match self
.state
.compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => break,
Err(s) => state = s,
}
}
// If there was a notification during registration, wake the awaiter now.
if let Some(w) = waker {
abort_on_panic(|| w.wake());
}
}
/// Returns the offset at which the tag of type `T` is stored.
#[inline]
pub(crate) fn offset_tag<T>() -> usize {
let layout_header = Layout::new::<Header>();
let layout_t = Layout::new::<T>();
let (_, offset_t) = extend(layout_header, layout_t);
offset_t
}
}
impl fmt::Debug for Header {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load(Ordering::SeqCst);
f.debug_struct("Header")
.field("scheduled", &(state & SCHEDULED != 0))
.field("running", &(state & RUNNING != 0))
.field("completed", &(state & COMPLETED != 0))
.field("closed", &(state & CLOSED != 0))
.field("awaiter", &(state & AWAITER != 0))
.field("handle", &(state & HANDLE != 0))
.field("ref_count", &(state / REFERENCE))
.finish()
}
}