@@ -581,9 +581,11 @@ type serverConn struct {
581
581
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
582
582
curClientStreams uint32 // number of open streams initiated by the client
583
583
curPushedStreams uint32 // number of open streams initiated by server push
584
+ curHandlers uint32 // number of running handler goroutines
584
585
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
585
586
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
586
587
streams map [uint32 ]* stream
588
+ unstartedHandlers []unstartedHandler
587
589
initialStreamSendWindowSize int32
588
590
maxFrameSize int32
589
591
peerMaxHeaderListSize uint32 // zero means unknown (default)
@@ -981,6 +983,8 @@ func (sc *serverConn) serve() {
981
983
return
982
984
case gracefulShutdownMsg :
983
985
sc .startGracefulShutdownInternal ()
986
+ case handlerDoneMsg :
987
+ sc .handlerDone ()
984
988
default :
985
989
panic ("unknown timer" )
986
990
}
@@ -1020,6 +1024,7 @@ var (
1020
1024
idleTimerMsg = new (serverMessage )
1021
1025
shutdownTimerMsg = new (serverMessage )
1022
1026
gracefulShutdownMsg = new (serverMessage )
1027
+ handlerDoneMsg = new (serverMessage )
1023
1028
)
1024
1029
1025
1030
func (sc * serverConn ) onSettingsTimer () { sc .sendServeMsg (settingsTimerMsg ) }
@@ -2017,8 +2022,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
2017
2022
st .readDeadline = time .AfterFunc (sc .hs .ReadTimeout , st .onReadTimeout )
2018
2023
}
2019
2024
2020
- go sc .runHandler (rw , req , handler )
2021
- return nil
2025
+ return sc .scheduleHandler (id , rw , req , handler )
2022
2026
}
2023
2027
2024
2028
func (sc * serverConn ) upgradeRequest (req * http.Request ) {
@@ -2038,6 +2042,10 @@ func (sc *serverConn) upgradeRequest(req *http.Request) {
2038
2042
sc .conn .SetReadDeadline (time.Time {})
2039
2043
}
2040
2044
2045
+ // This is the first request on the connection,
2046
+ // so start the handler directly rather than going
2047
+ // through scheduleHandler.
2048
+ sc .curHandlers ++
2041
2049
go sc .runHandler (rw , req , sc .handler .ServeHTTP )
2042
2050
}
2043
2051
@@ -2278,8 +2286,62 @@ func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *response
2278
2286
return & responseWriter {rws : rws }
2279
2287
}
2280
2288
2289
+ type unstartedHandler struct {
2290
+ streamID uint32
2291
+ rw * responseWriter
2292
+ req * http.Request
2293
+ handler func (http.ResponseWriter , * http.Request )
2294
+ }
2295
+
2296
+ // scheduleHandler starts a handler goroutine,
2297
+ // or schedules one to start as soon as an existing handler finishes.
2298
+ func (sc * serverConn ) scheduleHandler (streamID uint32 , rw * responseWriter , req * http.Request , handler func (http.ResponseWriter , * http.Request )) error {
2299
+ sc .serveG .check ()
2300
+ maxHandlers := sc .advMaxStreams
2301
+ if sc .curHandlers < maxHandlers {
2302
+ sc .curHandlers ++
2303
+ go sc .runHandler (rw , req , handler )
2304
+ return nil
2305
+ }
2306
+ if len (sc .unstartedHandlers ) > int (4 * sc .advMaxStreams ) {
2307
+ return sc .countError ("too_many_early_resets" , ConnectionError (ErrCodeEnhanceYourCalm ))
2308
+ }
2309
+ sc .unstartedHandlers = append (sc .unstartedHandlers , unstartedHandler {
2310
+ streamID : streamID ,
2311
+ rw : rw ,
2312
+ req : req ,
2313
+ handler : handler ,
2314
+ })
2315
+ return nil
2316
+ }
2317
+
2318
+ func (sc * serverConn ) handlerDone () {
2319
+ sc .serveG .check ()
2320
+ sc .curHandlers --
2321
+ i := 0
2322
+ maxHandlers := sc .advMaxStreams
2323
+ for ; i < len (sc .unstartedHandlers ); i ++ {
2324
+ u := sc .unstartedHandlers [i ]
2325
+ if sc .streams [u .streamID ] == nil {
2326
+ // This stream was reset before its goroutine had a chance to start.
2327
+ continue
2328
+ }
2329
+ if sc .curHandlers >= maxHandlers {
2330
+ break
2331
+ }
2332
+ sc .curHandlers ++
2333
+ go sc .runHandler (u .rw , u .req , u .handler )
2334
+ sc .unstartedHandlers [i ] = unstartedHandler {} // don't retain references
2335
+ }
2336
+ sc .unstartedHandlers = sc .unstartedHandlers [i :]
2337
+ if len (sc .unstartedHandlers ) == 0 {
2338
+ sc .unstartedHandlers = nil
2339
+ }
2340
+ }
2341
+
2281
2342
// Run on its own goroutine.
2282
2343
func (sc * serverConn ) runHandler (rw * responseWriter , req * http.Request , handler func (http.ResponseWriter , * http.Request )) {
2344
+ defer sc .sendServeMsg (handlerDoneMsg )
2283
2345
didPanic := true
2284
2346
defer func () {
2285
2347
rw .rws .stream .cancelCtx ()
0 commit comments