@@ -388,8 +388,18 @@ func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {
388
388
}
389
389
390
390
func goready (gp * g , traceskip int ) {
391
+ callergp := getg ()
392
+ now := nanotime ()
393
+ needWakep := readyUpdateAndPredictNeedWakep (callergp , now )
394
+ systemstack (func () {
395
+ ready (gp , traceskip , true , now , needWakep )
396
+ })
397
+ }
398
+
399
+ func goreadyNoWakep (gp * g , traceskip int ) {
400
+ now := nanotime ()
391
401
systemstack (func () {
392
- ready (gp , traceskip , true )
402
+ ready (gp , traceskip , true , now , false /* needWakep */ )
393
403
})
394
404
}
395
405
@@ -858,8 +868,42 @@ func fastrandinit() {
858
868
getRandomData (s )
859
869
}
860
870
871
+ func parkUpdateNeedWakep (gp * g ) {
872
+ gp .firstReady = 0
873
+ if gp .lastReady != 0 {
874
+ gp .runnextSwitchHistory = nanotime ()- gp .lastReady <= runnextSwitchNS
875
+ gp .lastReady = 0
876
+ }
877
+ }
878
+
879
+ // Returns true if gp, calling goready() or newproc() at now, is predicted
880
+ // *not* to park within the next runnextSwitchNS, such that a spinning P is
881
+ // needed to run the new G.
882
+ // Incorrectly returning true (causing a spinning P to be uselessly woken)
883
+ // wastes cycles but is harmless.
884
+ // Incorrectly returning false is handled by sysmon (in retake()).
885
+ func readyUpdateAndPredictNeedWakep (gp * g , now int64 ) bool {
886
+ pp := gp .m .p .ptr ()
887
+
888
+ // Update predictor state.
889
+ gp .lastReady = now
890
+ if now - gp .firstReady > runnextSwitchNS {
891
+ if gp .firstReady != 0 {
892
+ gp .runnextSwitchHistory = false
893
+ }
894
+ gp .firstReady = now
895
+ }
896
+
897
+ // If the caller's runqueue is non-empty, predict that we need wakep();
898
+ // even if gp parks, there's no guarantee that following Gs will.
899
+ if ! runqempty (pp ) {
900
+ return true
901
+ }
902
+ return ! gp .runnextSwitchHistory
903
+ }
904
+
861
905
// Mark gp ready to run.
862
- func ready (gp * g , traceskip int , next bool ) {
906
+ func ready (gp * g , traceskip int , next bool , now int64 , needWakep bool ) {
863
907
if trace .enabled {
864
908
traceGoUnpark (gp , traceskip )
865
909
}
@@ -875,8 +919,10 @@ func ready(gp *g, traceskip int, next bool) {
875
919
876
920
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
877
921
casgstatus (gp , _Gwaiting , _Grunnable )
878
- runqput (mp .p .ptr (), gp , next )
879
- wakep ()
922
+ runqput (mp .p .ptr (), gp , next , now )
923
+ if needWakep {
924
+ wakep ()
925
+ }
880
926
releasem (mp )
881
927
}
882
928
@@ -2715,7 +2761,7 @@ top:
2715
2761
// Wake up the finalizer G.
2716
2762
if fingStatus .Load ()& (fingWait | fingWake ) == fingWait | fingWake {
2717
2763
if gp := wakefing (); gp != nil {
2718
- ready (gp , 0 , true )
2764
+ ready (gp , 0 , true , now , true )
2719
2765
}
2720
2766
}
2721
2767
if * cgo_yield != nil {
@@ -3027,7 +3073,7 @@ func pollWork() bool {
3027
3073
func stealWork (now int64 ) (gp * g , inheritTime bool , rnow , pollUntil int64 , newWork bool ) {
3028
3074
pp := getg ().m .p .ptr ()
3029
3075
3030
- ranTimer := false
3076
+ ranTimerOrRunnextPending := false
3031
3077
3032
3078
const stealTries = 4
3033
3079
for i := 0 ; i < stealTries ; i ++ {
@@ -3072,16 +3118,21 @@ func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWo
3072
3118
// stolen G's. So check now if there
3073
3119
// is a local G to run.
3074
3120
if gp , inheritTime := runqget (pp ); gp != nil {
3075
- return gp , inheritTime , now , pollUntil , ranTimer
3121
+ return gp , inheritTime , now , pollUntil , ranTimerOrRunnextPending
3076
3122
}
3077
- ranTimer = true
3123
+ ranTimerOrRunnextPending = true
3078
3124
}
3079
3125
}
3080
3126
3081
3127
// Don't bother to attempt to steal if p2 is idle.
3082
3128
if ! idlepMask .read (enum .position ()) {
3083
- if gp := runqsteal (pp , p2 , stealTimersOrRunNextG ); gp != nil {
3084
- return gp , false , now , pollUntil , ranTimer
3129
+ tnow , gp , runnextPending := runqsteal (pp , p2 , stealTimersOrRunNextG , now )
3130
+ now = tnow
3131
+ if runnextPending {
3132
+ ranTimerOrRunnextPending = true
3133
+ }
3134
+ if gp != nil {
3135
+ return gp , false , now , pollUntil , ranTimerOrRunnextPending
3085
3136
}
3086
3137
}
3087
3138
}
@@ -3090,7 +3141,7 @@ func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWo
3090
3141
// No goroutines found to steal. Regardless, running a timer may have
3091
3142
// made some goroutine ready that we missed. Indicate the next timer to
3092
3143
// wait for.
3093
- return nil , false , now , pollUntil , ranTimer
3144
+ return nil , false , now , pollUntil , ranTimerOrRunnextPending
3094
3145
}
3095
3146
3096
3147
// Check all Ps for a runnable G to steal.
@@ -3474,6 +3525,8 @@ func parkunlock_c(gp *g, lock unsafe.Pointer) bool {
3474
3525
3475
3526
// park continuation on g0.
3476
3527
func park_m (gp * g ) {
3528
+ parkUpdateNeedWakep (gp )
3529
+
3477
3530
mp := getg ().m
3478
3531
3479
3532
if trace .enabled {
@@ -3597,7 +3650,7 @@ func goyield_m(gp *g) {
3597
3650
pp := gp .m .p .ptr ()
3598
3651
casgstatus (gp , _Grunning , _Grunnable )
3599
3652
dropg ()
3600
- runqput (pp , gp , false )
3653
+ runqput (pp , gp , false , 0 )
3601
3654
schedule ()
3602
3655
}
3603
3656
@@ -4232,11 +4285,13 @@ func newproc(fn *funcval) {
4232
4285
pc := getcallerpc ()
4233
4286
systemstack (func () {
4234
4287
newg := newproc1 (fn , gp , pc )
4288
+ now := nanotime ()
4289
+ needWakep := readyUpdateAndPredictNeedWakep (gp , now )
4235
4290
4236
4291
pp := getg ().m .p .ptr ()
4237
- runqput (pp , newg , true )
4292
+ runqput (pp , newg , true , now )
4238
4293
4239
- if mainStarted {
4294
+ if mainStarted && needWakep {
4240
4295
wakep ()
4241
4296
}
4242
4297
})
@@ -5389,6 +5444,7 @@ func sysmon() {
5389
5444
}
5390
5445
// retake P's blocked in syscalls
5391
5446
// and preempt long running G's
5447
+ // and start P's if we mispredicted that wakep() was unnecessary
5392
5448
if retake (now ) != 0 {
5393
5449
idle = 0
5394
5450
} else {
@@ -5424,6 +5480,7 @@ const forcePreemptNS = 10 * 1000 * 1000 // 10ms
5424
5480
5425
5481
func retake (now int64 ) uint32 {
5426
5482
n := 0
5483
+ needWakep := false
5427
5484
// Prevent allp slice changes. This lock will be completely
5428
5485
// uncontended unless we're already stopping the world.
5429
5486
lock (& allpLock )
@@ -5437,6 +5494,9 @@ func retake(now int64) uint32 {
5437
5494
// allp but not yet created new Ps.
5438
5495
continue
5439
5496
}
5497
+ if runqstealable (pp , now ) && sched .nmspinning .Load () == 0 {
5498
+ needWakep = true
5499
+ }
5440
5500
pd := & pp .sysmontick
5441
5501
s := pp .status
5442
5502
sysretake := false
@@ -5488,6 +5548,10 @@ func retake(now int64) uint32 {
5488
5548
}
5489
5549
}
5490
5550
unlock (& allpLock )
5551
+ if needWakep {
5552
+ wakep ()
5553
+ n ++
5554
+ }
5491
5555
return uint32 (n )
5492
5556
}
5493
5557
@@ -5766,7 +5830,7 @@ func globrunqget(pp *p, max int32) *g {
5766
5830
n --
5767
5831
for ; n > 0 ; n -- {
5768
5832
gp1 := sched .runq .pop ()
5769
- runqput (pp , gp1 , false )
5833
+ runqput (pp , gp1 , false , 0 )
5770
5834
}
5771
5835
return gp
5772
5836
}
@@ -5933,6 +5997,25 @@ func runqempty(pp *p) bool {
5933
5997
}
5934
5998
}
5935
5999
6000
+ // runqstealable is like !runqempty, but returns false if pp has a G in
6001
+ // p.runnext that can't be stolen yet.
6002
+ func runqstealable (pp * p , now int64 ) bool {
6003
+ for {
6004
+ head := atomic .Load (& pp .runqhead )
6005
+ tail := atomic .Load (& pp .runqtail )
6006
+ runnext := atomic .Loaduintptr ((* uintptr )(unsafe .Pointer (& pp .runnext )))
6007
+ if tail == atomic .Load (& pp .runqtail ) {
6008
+ if head != tail {
6009
+ return true
6010
+ }
6011
+ if runnext == 0 {
6012
+ return false
6013
+ }
6014
+ return ((guintptr )(runnext )).ptr ().runnextSince + runnextSwitchNS < now
6015
+ }
6016
+ }
6017
+ }
6018
+
5936
6019
// To shake out latent assumptions about scheduling order,
5937
6020
// we introduce some randomness into scheduling decisions
5938
6021
// when running with the race detector.
@@ -5949,12 +6032,16 @@ const randomizeScheduler = raceenabled
5949
6032
// If next is true, runqput puts g in the pp.runnext slot.
5950
6033
// If the run queue is full, runnext puts g on the global queue.
5951
6034
// Executed only by the owner P.
5952
- func runqput (pp * p , gp * g , next bool ) {
6035
+ func runqput (pp * p , gp * g , next bool , now int64 ) {
5953
6036
if randomizeScheduler && next && fastrandn (2 ) == 0 {
5954
6037
next = false
5955
6038
}
5956
6039
5957
6040
if next {
6041
+ if now == 0 {
6042
+ now = nanotime ()
6043
+ }
6044
+ gp .runnextSince = now
5958
6045
retryNext:
5959
6046
oldnext := pp .runnext
5960
6047
if ! pp .runnext .cas (oldnext , guintptr (unsafe .Pointer (gp ))) {
@@ -6123,11 +6210,22 @@ retry:
6123
6210
return
6124
6211
}
6125
6212
6213
+ // Don't steal p.runnext if it's been made runnable within the last
6214
+ // runnextSwitchNS and the P is running.
6215
+ // The important use case here is when the G running on the P ready()s another G
6216
+ // and then almost immediately blocks.
6217
+ // Giving the P a chance to schedule runnext avoids thrashing Gs between
6218
+ // different Ps.
6219
+ // On most platforms, sleep timeout granularity is coarser than
6220
+ // runnextSwitchNS, so sleeping will result in significant overshoot; instead,
6221
+ // stealWork() instructs findRunnable() to spin-wait.
6222
+ const runnextSwitchNS = 5e3
6223
+
6126
6224
// Grabs a batch of goroutines from pp's runnable queue into batch.
6127
6225
// Batch is a ring buffer starting at batchHead.
6128
6226
// Returns number of grabbed goroutines.
6129
6227
// Can be executed by any P.
6130
- func runqgrab (pp * p , batch * [256 ]guintptr , batchHead uint32 , stealRunNextG bool ) uint32 {
6228
+ func runqgrab (pp * p , batch * [256 ]guintptr , batchHead uint32 , stealRunNextG bool , now int64 ) ( rnow int64 , n uint32 , runnextPending bool ) {
6131
6229
for {
6132
6230
h := atomic .LoadAcq (& pp .runqhead ) // load-acquire, synchronize with other consumers
6133
6231
t := atomic .LoadAcq (& pp .runqtail ) // load-acquire, synchronize with the producer
@@ -6138,33 +6236,22 @@ func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool)
6138
6236
// Try to steal from pp.runnext.
6139
6237
if next := pp .runnext ; next != 0 {
6140
6238
if pp .status == _Prunning {
6141
- // Sleep to ensure that pp isn't about to run the g
6142
- // we are about to steal.
6143
- // The important use case here is when the g running
6144
- // on pp ready()s another g and then almost
6145
- // immediately blocks. Instead of stealing runnext
6146
- // in this window, back off to give pp a chance to
6147
- // schedule runnext. This will avoid thrashing gs
6148
- // between different Ps.
6149
- // A sync chan send/recv takes ~50ns as of time of
6150
- // writing, so 3us gives ~50x overshoot.
6151
- if GOOS != "windows" && GOOS != "openbsd" && GOOS != "netbsd" {
6152
- usleep (3 )
6153
- } else {
6154
- // On some platforms system timer granularity is
6155
- // 1-15ms, which is way too much for this
6156
- // optimization. So just yield.
6157
- osyield ()
6239
+ // Enforce runnextSwitchNS.
6240
+ if now == 0 {
6241
+ now = nanotime ()
6242
+ }
6243
+ if now <= next .ptr ().runnextSince + runnextSwitchNS {
6244
+ return now , 0 , true
6158
6245
}
6159
6246
}
6160
6247
if ! pp .runnext .cas (next , 0 ) {
6161
6248
continue
6162
6249
}
6163
6250
batch [batchHead % uint32 (len (batch ))] = next
6164
- return 1
6251
+ return now , 1 , false
6165
6252
}
6166
6253
}
6167
- return 0
6254
+ return now , 0 , false
6168
6255
}
6169
6256
if n > uint32 (len (pp .runq )/ 2 ) { // read inconsistent h and t
6170
6257
continue
@@ -6174,31 +6261,32 @@ func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool)
6174
6261
batch [(batchHead + i )% uint32 (len (batch ))] = g
6175
6262
}
6176
6263
if atomic .CasRel (& pp .runqhead , h , h + n ) { // cas-release, commits consume
6177
- return n
6264
+ return now , n , false
6178
6265
}
6179
6266
}
6180
6267
}
6181
6268
6182
6269
// Steal half of elements from local runnable queue of p2
6183
6270
// and put onto local runnable queue of p.
6184
6271
// Returns one of the stolen elements (or nil if failed).
6185
- func runqsteal (pp , p2 * p , stealRunNextG bool ) * g {
6272
+ func runqsteal (pp , p2 * p , stealRunNextG bool , now int64 ) ( rnow int64 , gp * g , runnextPending bool ) {
6186
6273
t := pp .runqtail
6187
- n := runqgrab (p2 , & pp .runq , t , stealRunNextG )
6274
+ tnow , n , runnextPending := runqgrab (p2 , & pp .runq , t , stealRunNextG , now )
6275
+ now = tnow
6188
6276
if n == 0 {
6189
- return nil
6277
+ return now , nil , runnextPending
6190
6278
}
6191
6279
n --
6192
- gp : = pp .runq [(t + n )% uint32 (len (pp .runq ))].ptr ()
6280
+ gp = pp .runq [(t + n )% uint32 (len (pp .runq ))].ptr ()
6193
6281
if n == 0 {
6194
- return gp
6282
+ return now , gp , runnextPending
6195
6283
}
6196
6284
h := atomic .LoadAcq (& pp .runqhead ) // load-acquire, synchronize with consumers
6197
6285
if t - h + n >= uint32 (len (pp .runq )) {
6198
6286
throw ("runqsteal: runq overflow" )
6199
6287
}
6200
6288
atomic .StoreRel (& pp .runqtail , t + n ) // store-release, makes the item available for consumption
6201
- return gp
6289
+ return now , gp , runnextPending
6202
6290
}
6203
6291
6204
6292
// A gQueue is a dequeue of Gs linked through g.schedlink. A G can only
0 commit comments