Skip to content

Commit 840ca83

Browse files
author
Chao Xu
committed
Only check connection health if the connection read loop has been idle
1 parent f7858bc commit 840ca83

File tree

2 files changed

+176
-37
lines changed

2 files changed

+176
-37
lines changed

http2/transport.go

+44-32
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,23 @@ type Transport struct {
108108
// waiting for their turn.
109109
StrictMaxConcurrentStreams bool
110110

111+
// PingPeriod controls how often pings are sent on idle connections to
112+
// check the liveness of the connection. The connection will be closed
113+
// if response is not received within PingTimeout.
114+
// 0 means no periodic pings. Defaults to 0.
115+
PingPeriod time.Duration
116+
// PingTimeout is the timeout after which the connection will be closed
117+
// if a response to Ping is not received.
118+
// 0 means no periodic pings. Defaults to 0.
119+
PingTimeout time.Duration
120+
// ReadIdleTimeout is the timeout after which the periodic ping for
121+
// connection health check will begin if no frame is received on the
122+
// connection.
123+
// The health check will stop once there is frame received on the
124+
// connection.
125+
// Defaults to 60s.
126+
ReadIdleTimeout time.Duration
127+
111128
// t1, if non-nil, is the standard library Transport using
112129
// this transport. Its settings are used (but not its
113130
// RoundTrip method, etc).
@@ -140,10 +157,6 @@ func ConfigureTransport(t1 *http.Transport) error {
140157

141158
func configureTransport(t1 *http.Transport) (*Transport, error) {
142159
connPool := new(clientConnPool)
143-
// TODO: figure out a way to allow user to configure pingPeriod and
144-
// pingTimeout.
145-
connPool.pingPeriod = 5 * time.Second
146-
connPool.pingTimeout = 1 * time.Second
147160
t2 := &Transport{
148161
ConnPool: noDialClientConnPool{connPool},
149162
t1: t1,
@@ -244,7 +257,7 @@ type ClientConn struct {
244257
wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
245258
werr error // first write error that has occurred
246259

247-
healthCheckCancel chan struct{}
260+
healthCheckStopCh chan struct{}
248261
}
249262

250263
// clientStream is the state for a single HTTP/2 stream. One of these
@@ -680,42 +693,47 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
680693
return cc, nil
681694
}
682695

683-
func (cc *ClientConn) healthCheck(cancel chan struct{}) {
684-
// TODO: CHAO: configurable
685-
pingPeriod := 15 * time.Second
696+
func (cc *ClientConn) healthCheck(stop chan struct{}) {
697+
pingPeriod := cc.t.PingPeriod
698+
pingTimeout := cc.t.PingTimeout
699+
if pingPeriod == 0 || pingTimeout == 0 {
700+
return
701+
}
686702
ticker := time.NewTicker(pingPeriod)
687703
defer ticker.Stop()
688704
for {
689705
select {
690-
case <-cancel:
706+
case <-stop:
691707
return
692708
case <-ticker.C:
693-
ctx, _ := context.WithTimeout(context.Background(), p.pingTimeout)
709+
ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
694710
err := cc.Ping(ctx)
711+
cancel()
695712
if err != nil {
696713
cc.closeForLostPing()
697714
cc.t.connPool().MarkDead(cc)
715+
return
698716
}
699717
}
700718
}
701719
}
702720

703721
func (cc *ClientConn) startHealthCheck() {
704-
if cc.healthCheckCancel != nil {
722+
if cc.healthCheckStopCh != nil {
705723
// a health check is already running
706724
return
707725
}
708-
cc.healthCheckCancel = make(chan struct{})
709-
go cc.healthCheck(cc.healthCheckCancel)
726+
cc.healthCheckStopCh = make(chan struct{})
727+
go cc.healthCheck(cc.healthCheckStopCh)
710728
}
711729

712730
func (cc *ClientConn) stopHealthCheck() {
713-
if cc.healthCheckCancel == nil {
731+
if cc.healthCheckStopCh == nil {
714732
// no health check running
715733
return
716734
}
717-
close(cc.healthCheckCancel)
718-
cc.healthCheckCancel = nil
735+
close(cc.healthCheckStopCh)
736+
cc.healthCheckStopCh = nil
719737
}
720738

721739
func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
@@ -1757,25 +1775,16 @@ func (rl *clientConnReadLoop) cleanup() {
17571775
cc.mu.Unlock()
17581776
}
17591777

1760-
func ReadFrameAndProbablyStartOrStopPingLoop() {
1761-
select {
1762-
case <-timer:
1763-
// start ping loop
1764-
case <-read:
1765-
// stop ping loop
1766-
}
1767-
}
1768-
17691778
type frameAndError struct {
17701779
f Frame
17711780
err error
17721781
}
17731782

1774-
func nonBlockingReadFrame(f *Framer) chan frameAndError {
1775-
feCh := make(chan FrameAndError)
1783+
func nonBlockingReadFrame(fr *Framer) chan frameAndError {
1784+
feCh := make(chan frameAndError)
17761785
go func() {
17771786
f, err := fr.ReadFrame()
1778-
feCh <- frameAndError{frame: f, err: err}
1787+
feCh <- frameAndError{f: f, err: err}
17791788
}()
17801789
return feCh
17811790
}
@@ -1788,15 +1797,18 @@ func (rl *clientConnReadLoop) run() error {
17881797
for {
17891798
var fe frameAndError
17901799
feCh := nonBlockingReadFrame(cc.fr)
1791-
// TODO: CHAO: make it configurable
1792-
readIdleTimer := time.NewTimer(15 * time.Second)
1800+
to := cc.t.ReadIdleTimeout
1801+
if to == 0 {
1802+
to = 60 * time.Second
1803+
}
1804+
readIdleTimer := time.NewTimer(to)
17931805
select {
1794-
case fe <- feCh:
1806+
case fe = <-feCh:
17951807
cc.stopHealthCheck()
17961808
readIdleTimer.Stop()
17971809
case <-readIdleTimer.C:
17981810
cc.startHealthCheck()
1799-
fe <- feCh
1811+
fe = <-feCh
18001812
}
18011813
f, err := fe.f, fe.err
18021814
if err != nil {

http2/transport_test.go

+132-5
Original file line numberDiff line numberDiff line change
@@ -3247,11 +3247,9 @@ func TestTransportNoRaceOnRequestObjectAfterRequestComplete(t *testing.T) {
32473247
func TestTransportCloseAfterLostPing(t *testing.T) {
32483248
clientDone := make(chan struct{})
32493249
ct := newClientTester(t)
3250-
connPool := new(clientConnPool)
3251-
connPool.pingPeriod = 1 * time.Second
3252-
connPool.pingTimeout = 100 * time.Millisecond
3253-
connPool.t = ct.tr
3254-
ct.tr.ConnPool = connPool
3250+
ct.tr.PingPeriod = 1 * time.Second
3251+
ct.tr.PingTimeout = 1 * time.Second
3252+
ct.tr.ReadIdleTimeout = 1 * time.Second
32553253
ct.client = func() error {
32563254
defer ct.cc.(*net.TCPConn).CloseWrite()
32573255
defer close(clientDone)
@@ -3270,6 +3268,135 @@ func TestTransportCloseAfterLostPing(t *testing.T) {
32703268
ct.run()
32713269
}
32723270

3271+
func TestTransportPingWhenReading(t *testing.T) {
3272+
testTransportPingWhenReading(t, 50*time.Millisecond, 100*time.Millisecond)
3273+
testTransportPingWhenReading(t, 100*time.Millisecond, 50*time.Millisecond)
3274+
}
3275+
3276+
func testTransportPingWhenReading(t *testing.T, readIdleTimeout, serverResponseInterval time.Duration) {
3277+
var pinged bool
3278+
clientBodyBytes := []byte("hello, this is client")
3279+
clientDone := make(chan struct{})
3280+
ct := newClientTester(t)
3281+
ct.tr.PingPeriod = 10 * time.Millisecond
3282+
ct.tr.PingTimeout = 10 * time.Millisecond
3283+
ct.tr.ReadIdleTimeout = readIdleTimeout
3284+
// guards the ct.fr.Write
3285+
var wmu sync.Mutex
3286+
ct.client = func() error {
3287+
defer ct.cc.(*net.TCPConn).CloseWrite()
3288+
defer close(clientDone)
3289+
3290+
req, err := http.NewRequest("PUT", "https://dummy.tld/", bytes.NewReader(clientBodyBytes))
3291+
if err != nil {
3292+
return err
3293+
}
3294+
res, err := ct.tr.RoundTrip(req)
3295+
if err != nil {
3296+
return fmt.Errorf("RoundTrip: %v", err)
3297+
}
3298+
defer res.Body.Close()
3299+
if res.StatusCode != 200 {
3300+
return fmt.Errorf("status code = %v; want %v", res.StatusCode, 200)
3301+
}
3302+
_, err = ioutil.ReadAll(res.Body)
3303+
return err
3304+
}
3305+
ct.server = func() error {
3306+
ct.greet()
3307+
var buf bytes.Buffer
3308+
enc := hpack.NewEncoder(&buf)
3309+
var dataRecv int
3310+
var closed bool
3311+
for {
3312+
f, err := ct.fr.ReadFrame()
3313+
if err != nil {
3314+
select {
3315+
case <-clientDone:
3316+
// If the client's done, it
3317+
// will have reported any
3318+
// errors on its side.
3319+
return nil
3320+
default:
3321+
return err
3322+
}
3323+
}
3324+
switch f := f.(type) {
3325+
case *WindowUpdateFrame, *SettingsFrame, *HeadersFrame:
3326+
case *DataFrame:
3327+
dataLen := len(f.Data())
3328+
if dataLen > 0 {
3329+
err := func() error {
3330+
wmu.Lock()
3331+
defer wmu.Unlock()
3332+
if dataRecv == 0 {
3333+
enc.WriteField(hpack.HeaderField{Name: ":status", Value: strconv.Itoa(200)})
3334+
ct.fr.WriteHeaders(HeadersFrameParam{
3335+
StreamID: f.StreamID,
3336+
EndHeaders: true,
3337+
EndStream: false,
3338+
BlockFragment: buf.Bytes(),
3339+
})
3340+
}
3341+
if err := ct.fr.WriteWindowUpdate(0, uint32(dataLen)); err != nil {
3342+
return err
3343+
}
3344+
if err := ct.fr.WriteWindowUpdate(f.StreamID, uint32(dataLen)); err != nil {
3345+
return err
3346+
}
3347+
return nil
3348+
}()
3349+
if err != nil {
3350+
return err
3351+
}
3352+
}
3353+
dataRecv += dataLen
3354+
3355+
if !closed && dataRecv == len(clientBodyBytes) {
3356+
closed = true
3357+
go func() {
3358+
for i := 0; i < 10; i++ {
3359+
wmu.Lock()
3360+
if err := ct.fr.WriteData(f.StreamID, false, []byte(fmt.Sprintf("hello, this is server data frame %d", i))); err != nil {
3361+
wmu.Unlock()
3362+
t.Error(err)
3363+
return
3364+
}
3365+
wmu.Unlock()
3366+
time.Sleep(serverResponseInterval)
3367+
}
3368+
wmu.Lock()
3369+
if err := ct.fr.WriteData(f.StreamID, true, []byte("hello, this is last server frame")); err != nil {
3370+
wmu.Unlock()
3371+
t.Error(err)
3372+
return
3373+
}
3374+
wmu.Unlock()
3375+
}()
3376+
}
3377+
case *PingFrame:
3378+
pinged = true
3379+
if serverResponseInterval > readIdleTimeout {
3380+
wmu.Lock()
3381+
if err := ct.fr.WritePing(true, f.Data); err != nil {
3382+
wmu.Unlock()
3383+
return err
3384+
}
3385+
wmu.Unlock()
3386+
} else {
3387+
return fmt.Errorf("Unexpected ping frame: %v", f)
3388+
}
3389+
default:
3390+
return fmt.Errorf("Unexpected client frame %v", f)
3391+
}
3392+
}
3393+
}
3394+
ct.run()
3395+
if serverResponseInterval > readIdleTimeout && !pinged {
3396+
t.Errorf("expect ping")
3397+
}
3398+
}
3399+
32733400
func TestTransportRetryAfterGOAWAY(t *testing.T) {
32743401
var dialer struct {
32753402
sync.Mutex

0 commit comments

Comments
 (0)