@@ -21,6 +21,7 @@ use crate::prelude::*;
21
21
22
22
#[ cfg( any( test, feature = "std" ) ) ]
23
23
use std:: time:: { Duration , Instant } ;
24
+ use tracing:: info;
24
25
25
26
use core:: future:: Future as StdFuture ;
26
27
use core:: task:: { Context , Poll } ;
@@ -55,10 +56,14 @@ impl Notifier {
55
56
}
56
57
}
57
58
59
+ #[ tracing:: instrument( skip_all) ]
58
60
fn propagate_future_state_to_notify_flag ( & self ) -> MutexGuard < ( bool , Option < Arc < Mutex < FutureState > > > ) > {
61
+ info ! ( "Notifier::propagate_future_state_to_notify_flag called" ) ;
59
62
let mut lock = self . notify_pending . lock ( ) . unwrap ( ) ;
60
63
if let Some ( existing_state) = & lock. 1 {
64
+ info ! ( "Got inside if let Some(existing_state)" ) ;
61
65
if existing_state. lock ( ) . unwrap ( ) . callbacks_made {
66
+ info ! ( "Got inside if existing_state.lock().unwrap().callbacks_made" ) ;
62
67
// If the existing `FutureState` has completed and actually made callbacks,
63
68
// consider the notification flag to have been cleared and reset the future state.
64
69
lock. 1 . take ( ) ;
@@ -102,22 +107,34 @@ impl Notifier {
102
107
}
103
108
104
109
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
110
+ #[ tracing:: instrument( skip_all) ]
105
111
pub ( crate ) fn notify ( & self ) {
112
+ info ! ( "Notifier::notify called" ) ;
106
113
let mut lock = self . notify_pending . lock ( ) . unwrap ( ) ;
107
114
if let Some ( future_state) = & lock. 1 {
108
- future_state. lock ( ) . unwrap ( ) . complete ( ) ;
115
+ info ! ( "Got inside if let Some" ) ;
116
+ if future_state. lock ( ) . unwrap ( ) . complete ( ) {
117
+ info ! ( "Got inside if future_state.lock().unwrap().complete()" ) ;
118
+ lock. 1 = None ;
119
+ return ;
120
+ }
109
121
}
122
+ info ! ( "setting notified and waking condvar" ) ;
110
123
lock. 0 = true ;
111
124
mem:: drop ( lock) ;
112
125
self . condvar . notify_all ( ) ;
113
126
}
114
127
115
128
/// Gets a [`Future`] that will get woken up with any waiters
129
+ #[ tracing:: instrument( skip_all) ]
116
130
pub ( crate ) fn get_future ( & self ) -> Future {
131
+ info ! ( "Notifier::get_future called" ) ;
117
132
let mut lock = self . propagate_future_state_to_notify_flag ( ) ;
118
133
if let Some ( existing_state) = & lock. 1 {
134
+ info ! ( "returning existing future" ) ;
119
135
Future { state : Arc :: clone ( & existing_state) }
120
136
} else {
137
+ info ! ( "returning new future, pre-completed: {}" , lock. 0 ) ;
121
138
let state = Arc :: new ( Mutex :: new ( FutureState {
122
139
callbacks : Vec :: new ( ) ,
123
140
complete : lock. 0 ,
@@ -161,12 +178,16 @@ pub(crate) struct FutureState {
161
178
}
162
179
163
180
impl FutureState {
164
- fn complete ( & mut self ) {
181
+ #[ tracing:: instrument( skip_all) ]
182
+ fn complete ( & mut self ) -> bool {
183
+ info ! ( "FutureState::complete called" ) ;
184
+ info ! ( "future completing, calling back... {} callbacks" , self . callbacks. len( ) ) ;
165
185
for ( counts_as_call, callback) in self . callbacks . drain ( ..) {
166
186
callback. call ( ) ;
167
187
self . callbacks_made |= counts_as_call;
168
188
}
169
189
self . complete = true ;
190
+ self . callbacks_made
170
191
}
171
192
}
172
193
@@ -180,7 +201,10 @@ impl Future {
180
201
/// completed, the callback will be called immediately.
181
202
///
182
203
/// (C-not exported) use the bindings-only `register_callback_fn` instead
204
+ #[ tracing:: instrument( skip_all) ]
183
205
pub fn register_callback ( & self , callback : Box < dyn FutureCallback > ) {
206
+ info ! ( "Future::register_callback called" ) ;
207
+ info ! ( "got callback registration" ) ;
184
208
let mut state = self . state . lock ( ) . unwrap ( ) ;
185
209
if state. complete {
186
210
state. callbacks_made = true ;
@@ -205,19 +229,28 @@ impl Future {
205
229
use core:: task:: Waker ;
206
230
struct StdWaker ( pub Waker ) ;
207
231
impl FutureCallback for StdWaker {
208
- fn call ( & self ) { self . 0 . wake_by_ref ( ) }
232
+ #[ tracing:: instrument( skip_all) ]
233
+ fn call ( & self ) {
234
+ info ! ( "StdWaker::call called" ) ;
235
+ info ! ( "futurecallback complete, waking poll'd context" ) ;
236
+ self . 0 . wake_by_ref ( )
237
+ }
209
238
}
210
239
211
240
/// (C-not exported) as Rust Futures aren't usable in language bindings.
212
241
impl < ' a > StdFuture for Future {
213
242
type Output = ( ) ;
214
243
244
+ #[ tracing:: instrument( skip_all) ]
215
245
fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
246
+ info ! ( "Future::poll called" ) ;
216
247
let mut state = self . state . lock ( ) . unwrap ( ) ;
217
248
if state. complete {
249
+ info ! ( "poll'ing future complete! marking called-back" ) ;
218
250
state. callbacks_made = true ;
219
251
Poll :: Ready ( ( ) )
220
252
} else {
253
+ info ! ( "poll'ing future not complete, tracking context to wake it later" ) ;
221
254
let waker = cx. waker ( ) . clone ( ) ;
222
255
state. callbacks . push ( ( false , Box :: new ( StdWaker ( waker) ) ) ) ;
223
256
Poll :: Pending
@@ -469,4 +502,63 @@ mod tests {
469
502
assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Ready ( ( ) ) ) ;
470
503
assert ! ( !notifier. wait_timeout( Duration :: from_millis( 1 ) ) ) ;
471
504
}
505
+
506
+ #[ test]
507
+ fn test_poll_post_notify_completes ( ) {
508
+ // Tests that if we have a future state that has completed, and we haven't yet requested a
509
+ // new future, if we get a notify prior to requesting that second future it is generated
510
+ // pre-completed.
511
+ let notifier = Notifier :: new ( ) ;
512
+
513
+ notifier. notify ( ) ;
514
+ let mut future = notifier. get_future ( ) ;
515
+ let ( woken, waker) = create_waker ( ) ;
516
+ assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Ready ( ( ) ) ) ;
517
+ assert ! ( !woken. load( Ordering :: SeqCst ) ) ;
518
+
519
+ notifier. notify ( ) ;
520
+ let mut future = notifier. get_future ( ) ;
521
+ let ( woken, waker) = create_waker ( ) ;
522
+ assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Ready ( ( ) ) ) ;
523
+ assert ! ( !woken. load( Ordering :: SeqCst ) ) ;
524
+
525
+ let mut future = notifier. get_future ( ) ;
526
+ let ( woken, waker) = create_waker ( ) ;
527
+ assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Pending ) ;
528
+ assert ! ( !woken. load( Ordering :: SeqCst ) ) ;
529
+
530
+ notifier. notify ( ) ;
531
+ assert ! ( woken. load( Ordering :: SeqCst ) ) ;
532
+ assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Ready ( ( ) ) ) ;
533
+ }
534
+
535
+ #[ test]
536
+ fn test_poll_post_notify_completes_initial_notified ( ) {
537
+ // Identical to the previous test, but the first future completes via a wake rather than an
538
+ // immediate `Poll::Ready`.
539
+ let notifier = Notifier :: new ( ) ;
540
+
541
+ let mut future = notifier. get_future ( ) ;
542
+ let ( woken, waker) = create_waker ( ) ;
543
+ assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Pending ) ;
544
+
545
+ notifier. notify ( ) ;
546
+ assert ! ( woken. load( Ordering :: SeqCst ) ) ;
547
+ assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Ready ( ( ) ) ) ;
548
+
549
+ notifier. notify ( ) ;
550
+ let mut future = notifier. get_future ( ) ;
551
+ let ( woken, waker) = create_waker ( ) ;
552
+ assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Ready ( ( ) ) ) ;
553
+ assert ! ( !woken. load( Ordering :: SeqCst ) ) ;
554
+
555
+ let mut future = notifier. get_future ( ) ;
556
+ let ( woken, waker) = create_waker ( ) ;
557
+ assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Pending ) ;
558
+ assert ! ( !woken. load( Ordering :: SeqCst ) ) ;
559
+
560
+ notifier. notify ( ) ;
561
+ assert ! ( woken. load( Ordering :: SeqCst ) ) ;
562
+ assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Ready ( ( ) ) ) ;
563
+ }
472
564
}
0 commit comments