@@ -21,16 +21,26 @@ impl<'a> DeadlineQueue<'a> {
21
21
Self { queue : VecDeque :: with_capacity ( capacity) }
22
22
}
23
23
24
+ /// All calls to [`Instant::now`] go through this wrapper method.
25
+ /// This makes it easier to find all places that read the current time.
26
+ fn now ( & self ) -> Instant {
27
+ Instant :: now ( )
28
+ }
29
+
24
30
pub ( crate ) fn push ( & mut self , id : TestId , test : & ' a CollectedTest ) {
25
- let deadline = Instant :: now ( ) + Duration :: from_secs ( TEST_WARN_TIMEOUT_S ) ;
31
+ let deadline = self . now ( ) + Duration :: from_secs ( TEST_WARN_TIMEOUT_S ) ;
32
+ if let Some ( back) = self . queue . back ( ) {
33
+ assert ! ( back. deadline <= deadline) ;
34
+ }
26
35
self . queue . push_back ( DeadlineEntry { id, test, deadline } ) ;
27
36
}
28
37
29
- /// Equivalent to `rx.read ()`, except that if any test exceeds its deadline
38
+ /// Equivalent to `rx.recv ()`, except that if a test exceeds its deadline
30
39
/// during the wait, the given callback will also be called for that test.
31
40
pub ( crate ) fn read_channel_while_checking_deadlines < T > (
32
41
& mut self ,
33
42
rx : & mpsc:: Receiver < T > ,
43
+ is_running : impl Fn ( TestId ) -> bool ,
34
44
mut on_deadline_passed : impl FnMut ( TestId , & CollectedTest ) ,
35
45
) -> Result < T , RecvError > {
36
46
loop {
@@ -39,18 +49,18 @@ impl<'a> DeadlineQueue<'a> {
39
49
// deadline, so do a normal receive.
40
50
return rx. recv ( ) ;
41
51
} ;
42
- let wait_duration = next_deadline. saturating_duration_since ( Instant :: now ( ) ) ;
52
+ let next_deadline_timeout = next_deadline. saturating_duration_since ( self . now ( ) ) ;
53
+
54
+ let recv_result = rx. recv_timeout ( next_deadline_timeout) ;
55
+ // Process deadlines after every receive attempt, regardless of
56
+ // outcome, so that we don't build up an unbounded backlog of stale
57
+ // entries due to a constant stream of tests finishing.
58
+ self . for_each_entry_past_deadline ( & is_running, & mut on_deadline_passed) ;
43
59
44
- let recv_result = rx. recv_timeout ( wait_duration) ;
45
60
match recv_result {
46
61
Ok ( value) => return Ok ( value) ,
47
- Err ( RecvTimeoutError :: Timeout ) => {
48
- // Notify the callback of tests that have exceeded their
49
- // deadline, then loop and do annother channel read.
50
- for DeadlineEntry { id, test, .. } in self . remove_tests_past_deadline ( ) {
51
- on_deadline_passed ( id, test) ;
52
- }
53
- }
62
+ // Deadlines have already been processed, so loop and do another receive.
63
+ Err ( RecvTimeoutError :: Timeout ) => { }
54
64
Err ( RecvTimeoutError :: Disconnected ) => return Err ( RecvError ) ,
55
65
}
56
66
}
@@ -60,14 +70,28 @@ impl<'a> DeadlineQueue<'a> {
60
70
Some ( self . queue . front ( ) ?. deadline )
61
71
}
62
72
63
- fn remove_tests_past_deadline ( & mut self ) -> Vec < DeadlineEntry < ' a > > {
64
- let now = Instant :: now ( ) ;
65
- let mut timed_out = vec ! [ ] ;
66
- while let Some ( deadline_entry) = pop_front_if ( & mut self . queue , |entry| now < entry. deadline )
67
- {
68
- timed_out. push ( deadline_entry) ;
73
+ fn for_each_entry_past_deadline (
74
+ & mut self ,
75
+ is_running : impl Fn ( TestId ) -> bool ,
76
+ mut on_deadline_passed : impl FnMut ( TestId , & CollectedTest ) ,
77
+ ) {
78
+ let now = self . now ( ) ;
79
+
80
+ // Clear out entries that are past their deadline, but only invoke the
81
+ // callback for tests that are still considered running.
82
+ while let Some ( entry) = pop_front_if ( & mut self . queue , |entry| entry. deadline <= now) {
83
+ if is_running ( entry. id ) {
84
+ on_deadline_passed ( entry. id , entry. test ) ;
85
+ }
86
+ }
87
+
88
+ // Also clear out any leading entries that are no longer running, even
89
+ // if their deadline hasn't been reached.
90
+ while let Some ( _) = pop_front_if ( & mut self . queue , |entry| !is_running ( entry. id ) ) { }
91
+
92
+ if let Some ( front) = self . queue . front ( ) {
93
+ assert ! ( now < front. deadline) ;
69
94
}
70
- timed_out
71
95
}
72
96
}
73
97
0 commit comments