@@ -171,6 +171,8 @@ use panic;
171
171
use panicking;
172
172
use str;
173
173
use sync:: { Mutex , Condvar , Arc } ;
174
+ use sync:: atomic:: AtomicUsize ;
175
+ use sync:: atomic:: Ordering :: SeqCst ;
174
176
use sys:: thread as imp;
175
177
use sys_common:: mutex;
176
178
use sys_common:: thread_info;
@@ -694,6 +696,11 @@ pub fn sleep(dur: Duration) {
694
696
imp:: Thread :: sleep ( dur)
695
697
}
696
698
699
+ // constants for park/unpark
700
+ const EMPTY : usize = 0 ;
701
+ const PARKED : usize = 1 ;
702
+ const NOTIFIED : usize = 2 ;
703
+
697
704
/// Blocks unless or until the current thread's token is made available.
698
705
///
699
706
/// A call to `park` does not guarantee that the thread will remain parked
@@ -771,11 +778,27 @@ pub fn sleep(dur: Duration) {
771
778
#[ stable( feature = "rust1" , since = "1.0.0" ) ]
772
779
pub fn park ( ) {
773
780
let thread = current ( ) ;
774
- let mut guard = thread. inner . lock . lock ( ) . unwrap ( ) ;
775
- while !* guard {
776
- guard = thread. inner . cvar . wait ( guard) . unwrap ( ) ;
781
+
782
+ // If we were previously notified then we consume this notification and
783
+ // return quickly.
784
+ if thread. inner . state . compare_exchange ( NOTIFIED , EMPTY , SeqCst , SeqCst ) . is_ok ( ) {
785
+ return
786
+ }
787
+
788
+ // Otherwise we need to coordinate going to sleep
789
+ let mut m = thread. inner . lock . lock ( ) . unwrap ( ) ;
790
+ match thread. inner . state . compare_exchange ( EMPTY , PARKED , SeqCst , SeqCst ) {
791
+ Ok ( _) => { }
792
+ Err ( NOTIFIED ) => return , // notified after we locked
793
+ Err ( _) => panic ! ( "inconsistent park state" ) ,
794
+ }
795
+ loop {
796
+ m = thread. inner . cvar . wait ( m) . unwrap ( ) ;
797
+ match thread. inner . state . compare_exchange ( NOTIFIED , EMPTY , SeqCst , SeqCst ) {
798
+ Ok ( _) => return , // got a notification
799
+ Err ( _) => { } // spurious wakeup, go back to sleep
800
+ }
777
801
}
778
- * guard = false ;
779
802
}
780
803
781
804
/// Use [`park_timeout`].
@@ -842,12 +865,30 @@ pub fn park_timeout_ms(ms: u32) {
842
865
#[ stable( feature = "park_timeout" , since = "1.4.0" ) ]
843
866
pub fn park_timeout ( dur : Duration ) {
844
867
let thread = current ( ) ;
845
- let mut guard = thread. inner . lock . lock ( ) . unwrap ( ) ;
846
- if !* guard {
847
- let ( g, _) = thread. inner . cvar . wait_timeout ( guard, dur) . unwrap ( ) ;
848
- guard = g;
868
+
869
+ // Like `park` above we have a fast path for an already-notified thread, and
870
+ // afterwards we start coordinating for a sleep.
871
+ // return quickly.
872
+ if thread. inner . state . compare_exchange ( NOTIFIED , EMPTY , SeqCst , SeqCst ) . is_ok ( ) {
873
+ return
874
+ }
875
+ let m = thread. inner . lock . lock ( ) . unwrap ( ) ;
876
+ match thread. inner . state . compare_exchange ( EMPTY , PARKED , SeqCst , SeqCst ) {
877
+ Ok ( _) => { }
878
+ Err ( NOTIFIED ) => return , // notified after we locked
879
+ Err ( _) => panic ! ( "inconsistent park_timeout state" ) ,
880
+ }
881
+
882
+ // Wait with a timeout, and if we spuriously wake up or otherwise wake up
883
+ // from a notification we just want to unconditionally set the state back to
884
+ // empty, either consuming a notification or un-flagging ourselves as
885
+ // parked.
886
+ let ( _m, _result) = thread. inner . cvar . wait_timeout ( m, dur) . unwrap ( ) ;
887
+ match thread. inner . state . swap ( EMPTY , SeqCst ) {
888
+ NOTIFIED => { } // got a notification, hurray!
889
+ PARKED => { } // no notification, alas
890
+ n => panic ! ( "inconsistent park_timeout state: {}" , n) ,
849
891
}
850
- * guard = false ;
851
892
}
852
893
853
894
////////////////////////////////////////////////////////////////////////////////
@@ -914,7 +955,10 @@ impl ThreadId {
914
955
struct Inner {
915
956
name : Option < CString > , // Guaranteed to be UTF-8
916
957
id : ThreadId ,
917
- lock : Mutex < bool > , // true when there is a buffered unpark
958
+
959
+ // state for thread park/unpark
960
+ state : AtomicUsize ,
961
+ lock : Mutex < ( ) > ,
918
962
cvar : Condvar ,
919
963
}
920
964
@@ -958,7 +1002,8 @@ impl Thread {
958
1002
inner : Arc :: new ( Inner {
959
1003
name : cname,
960
1004
id : ThreadId :: new ( ) ,
961
- lock : Mutex :: new ( false ) ,
1005
+ state : AtomicUsize :: new ( EMPTY ) ,
1006
+ lock : Mutex :: new ( ( ) ) ,
962
1007
cvar : Condvar :: new ( ) ,
963
1008
} )
964
1009
}
@@ -998,10 +1043,22 @@ impl Thread {
998
1043
/// [park]: fn.park.html
999
1044
#[ stable( feature = "rust1" , since = "1.0.0" ) ]
1000
1045
pub fn unpark ( & self ) {
1001
- let mut guard = self . inner . lock . lock ( ) . unwrap ( ) ;
1002
- if !* guard {
1003
- * guard = true ;
1004
- self . inner . cvar . notify_one ( ) ;
1046
+ loop {
1047
+ match self . inner . state . compare_exchange ( EMPTY , NOTIFIED , SeqCst , SeqCst ) {
1048
+ Ok ( _) => return , // no one was waiting
1049
+ Err ( NOTIFIED ) => return , // already unparked
1050
+ Err ( PARKED ) => { } // gotta go wake someone up
1051
+ _ => panic ! ( "inconsistent state in unpark" ) ,
1052
+ }
1053
+
1054
+ // Coordinate wakeup through the mutex and a condvar notification
1055
+ let _lock = self . inner . lock . lock ( ) . unwrap ( ) ;
1056
+ match self . inner . state . compare_exchange ( PARKED , NOTIFIED , SeqCst , SeqCst ) {
1057
+ Ok ( _) => return self . inner . cvar . notify_one ( ) ,
1058
+ Err ( NOTIFIED ) => return , // a different thread unparked
1059
+ Err ( EMPTY ) => { } // parked thread went away, try again
1060
+ _ => panic ! ( "inconsistent state in unpark" ) ,
1061
+ }
1005
1062
}
1006
1063
}
1007
1064
0 commit comments