Skip to content

Commit 0ba52f6

Browse files
Chao Xursc
Chao Xu
authored andcommitted
http2: perform connection health check
After the connection has been idle for a while, periodic pings are sent over the connection to check its health. Unhealthy connection is closed and removed from the connection pool. Fixes golang/go#31643 Change-Id: Idbbc9cb2d3e26c39f84a33e945e482d41cd8583c GitHub-Last-Rev: 36607fe GitHub-Pull-Request: #55 Reviewed-on: https://go-review.googlesource.com/c/net/+/198040 Run-TryBot: Andrew Bonventre <[email protected]> TryBot-Result: Gobot Gobot <[email protected]> Reviewed-by: Russ Cox <[email protected]>
1 parent 59133d7 commit 0ba52f6

File tree

2 files changed

+221
-5
lines changed

2 files changed

+221
-5
lines changed

http2/transport.go

+61-5
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ type Transport struct {
108108
// waiting for their turn.
109109
StrictMaxConcurrentStreams bool
110110

111+
// ReadIdleTimeout is the timeout after which a health check using ping
112+
// frame will be carried out if no frame is received on the connection.
113+
// Note that a ping response will is considered a received frame, so if
114+
// there is no other traffic on the connection, the health check will
115+
// be performed every ReadIdleTimeout interval.
116+
// If zero, no health check is performed.
117+
ReadIdleTimeout time.Duration
118+
119+
// PingTimeout is the timeout after which the connection will be closed
120+
// if a response to Ping is not received.
121+
// Defaults to 15s.
122+
PingTimeout time.Duration
123+
111124
// t1, if non-nil, is the standard library Transport using
112125
// this transport. Its settings are used (but not its
113126
// RoundTrip method, etc).
@@ -131,6 +144,14 @@ func (t *Transport) disableCompression() bool {
131144
return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
132145
}
133146

147+
func (t *Transport) pingTimeout() time.Duration {
148+
if t.PingTimeout == 0 {
149+
return 15 * time.Second
150+
}
151+
return t.PingTimeout
152+
153+
}
154+
134155
// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
135156
// It returns an error if t1 has already been HTTP/2-enabled.
136157
func ConfigureTransport(t1 *http.Transport) error {
@@ -675,6 +696,20 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
675696
return cc, nil
676697
}
677698

699+
func (cc *ClientConn) healthCheck() {
700+
pingTimeout := cc.t.pingTimeout()
701+
// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
702+
// trigger the healthCheck again if there is no frame received.
703+
ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
704+
defer cancel()
705+
err := cc.Ping(ctx)
706+
if err != nil {
707+
cc.closeForLostPing()
708+
cc.t.connPool().MarkDead(cc)
709+
return
710+
}
711+
}
712+
678713
func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
679714
cc.mu.Lock()
680715
defer cc.mu.Unlock()
@@ -846,14 +881,12 @@ func (cc *ClientConn) sendGoAway() error {
846881
return nil
847882
}
848883

849-
// Close closes the client connection immediately.
850-
//
851-
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
852-
func (cc *ClientConn) Close() error {
884+
// closes the client connection immediately. In-flight requests are interrupted.
885+
// err is sent to streams.
886+
func (cc *ClientConn) closeForError(err error) error {
853887
cc.mu.Lock()
854888
defer cc.cond.Broadcast()
855889
defer cc.mu.Unlock()
856-
err := errors.New("http2: client connection force closed via ClientConn.Close")
857890
for id, cs := range cc.streams {
858891
select {
859892
case cs.resc <- resAndError{err: err}:
@@ -866,6 +899,20 @@ func (cc *ClientConn) Close() error {
866899
return cc.tconn.Close()
867900
}
868901

902+
// Close closes the client connection immediately.
903+
//
904+
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
905+
func (cc *ClientConn) Close() error {
906+
err := errors.New("http2: client connection force closed via ClientConn.Close")
907+
return cc.closeForError(err)
908+
}
909+
910+
// closes the client connection immediately. In-flight requests are interrupted.
911+
func (cc *ClientConn) closeForLostPing() error {
912+
err := errors.New("http2: client connection lost")
913+
return cc.closeForError(err)
914+
}
915+
869916
const maxAllocFrameSize = 512 << 10
870917

871918
// frameBuffer returns a scratch buffer suitable for writing DATA frames.
@@ -1737,8 +1784,17 @@ func (rl *clientConnReadLoop) run() error {
17371784
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
17381785
gotReply := false // ever saw a HEADERS reply
17391786
gotSettings := false
1787+
readIdleTimeout := cc.t.ReadIdleTimeout
1788+
var t *time.Timer
1789+
if readIdleTimeout != 0 {
1790+
t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
1791+
defer t.Stop()
1792+
}
17401793
for {
17411794
f, err := cc.fr.ReadFrame()
1795+
if t != nil {
1796+
t.Reset(readIdleTimeout)
1797+
}
17421798
if err != nil {
17431799
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
17441800
}

http2/transport_test.go

+160
Original file line numberDiff line numberDiff line change
@@ -3309,6 +3309,166 @@ func TestTransportNoRaceOnRequestObjectAfterRequestComplete(t *testing.T) {
33093309
req.Header = http.Header{}
33103310
}
33113311

3312+
func TestTransportCloseAfterLostPing(t *testing.T) {
3313+
clientDone := make(chan struct{})
3314+
ct := newClientTester(t)
3315+
ct.tr.PingTimeout = 1 * time.Second
3316+
ct.tr.ReadIdleTimeout = 1 * time.Second
3317+
ct.client = func() error {
3318+
defer ct.cc.(*net.TCPConn).CloseWrite()
3319+
defer close(clientDone)
3320+
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
3321+
_, err := ct.tr.RoundTrip(req)
3322+
if err == nil || !strings.Contains(err.Error(), "client connection lost") {
3323+
return fmt.Errorf("expected to get error about \"connection lost\", got %v", err)
3324+
}
3325+
return nil
3326+
}
3327+
ct.server = func() error {
3328+
ct.greet()
3329+
<-clientDone
3330+
return nil
3331+
}
3332+
ct.run()
3333+
}
3334+
3335+
func TestTransportPingWhenReading(t *testing.T) {
3336+
testCases := []struct {
3337+
name string
3338+
readIdleTimeout time.Duration
3339+
serverResponseInterval time.Duration
3340+
expectedPingCount int
3341+
}{
3342+
{
3343+
name: "two pings in each serverResponseInterval",
3344+
readIdleTimeout: 400 * time.Millisecond,
3345+
serverResponseInterval: 1000 * time.Millisecond,
3346+
expectedPingCount: 4,
3347+
},
3348+
{
3349+
name: "one ping in each serverResponseInterval",
3350+
readIdleTimeout: 700 * time.Millisecond,
3351+
serverResponseInterval: 1000 * time.Millisecond,
3352+
expectedPingCount: 2,
3353+
},
3354+
{
3355+
name: "zero ping in each serverResponseInterval",
3356+
readIdleTimeout: 1000 * time.Millisecond,
3357+
serverResponseInterval: 500 * time.Millisecond,
3358+
expectedPingCount: 0,
3359+
},
3360+
{
3361+
name: "0 readIdleTimeout means no ping",
3362+
readIdleTimeout: 0 * time.Millisecond,
3363+
serverResponseInterval: 500 * time.Millisecond,
3364+
expectedPingCount: 0,
3365+
},
3366+
}
3367+
3368+
for _, tc := range testCases {
3369+
tc := tc // capture range variable
3370+
t.Run(tc.name, func(t *testing.T) {
3371+
t.Parallel()
3372+
testTransportPingWhenReading(t, tc.readIdleTimeout, tc.serverResponseInterval, tc.expectedPingCount)
3373+
})
3374+
}
3375+
}
3376+
3377+
func testTransportPingWhenReading(t *testing.T, readIdleTimeout, serverResponseInterval time.Duration, expectedPingCount int) {
3378+
var pingCount int
3379+
clientDone := make(chan struct{})
3380+
ct := newClientTester(t)
3381+
ct.tr.PingTimeout = 10 * time.Millisecond
3382+
ct.tr.ReadIdleTimeout = readIdleTimeout
3383+
// guards the ct.fr.Write
3384+
var wmu sync.Mutex
3385+
3386+
ct.client = func() error {
3387+
defer ct.cc.(*net.TCPConn).CloseWrite()
3388+
defer close(clientDone)
3389+
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
3390+
res, err := ct.tr.RoundTrip(req)
3391+
if err != nil {
3392+
return fmt.Errorf("RoundTrip: %v", err)
3393+
}
3394+
defer res.Body.Close()
3395+
if res.StatusCode != 200 {
3396+
return fmt.Errorf("status code = %v; want %v", res.StatusCode, 200)
3397+
}
3398+
_, err = ioutil.ReadAll(res.Body)
3399+
return err
3400+
}
3401+
3402+
ct.server = func() error {
3403+
ct.greet()
3404+
var buf bytes.Buffer
3405+
enc := hpack.NewEncoder(&buf)
3406+
for {
3407+
f, err := ct.fr.ReadFrame()
3408+
if err != nil {
3409+
select {
3410+
case <-clientDone:
3411+
// If the client's done, it
3412+
// will have reported any
3413+
// errors on its side.
3414+
return nil
3415+
default:
3416+
return err
3417+
}
3418+
}
3419+
switch f := f.(type) {
3420+
case *WindowUpdateFrame, *SettingsFrame:
3421+
case *HeadersFrame:
3422+
if !f.HeadersEnded() {
3423+
return fmt.Errorf("headers should have END_HEADERS be ended: %v", f)
3424+
}
3425+
enc.WriteField(hpack.HeaderField{Name: ":status", Value: strconv.Itoa(200)})
3426+
ct.fr.WriteHeaders(HeadersFrameParam{
3427+
StreamID: f.StreamID,
3428+
EndHeaders: true,
3429+
EndStream: false,
3430+
BlockFragment: buf.Bytes(),
3431+
})
3432+
3433+
go func() {
3434+
for i := 0; i < 2; i++ {
3435+
wmu.Lock()
3436+
if err := ct.fr.WriteData(f.StreamID, false, []byte(fmt.Sprintf("hello, this is server data frame %d", i))); err != nil {
3437+
wmu.Unlock()
3438+
t.Error(err)
3439+
return
3440+
}
3441+
wmu.Unlock()
3442+
time.Sleep(serverResponseInterval)
3443+
}
3444+
wmu.Lock()
3445+
if err := ct.fr.WriteData(f.StreamID, true, []byte("hello, this is last server data frame")); err != nil {
3446+
wmu.Unlock()
3447+
t.Error(err)
3448+
return
3449+
}
3450+
wmu.Unlock()
3451+
}()
3452+
case *PingFrame:
3453+
pingCount++
3454+
wmu.Lock()
3455+
if err := ct.fr.WritePing(true, f.Data); err != nil {
3456+
wmu.Unlock()
3457+
return err
3458+
}
3459+
wmu.Unlock()
3460+
default:
3461+
return fmt.Errorf("Unexpected client frame %v", f)
3462+
}
3463+
}
3464+
}
3465+
ct.run()
3466+
if e, a := expectedPingCount, pingCount; e != a {
3467+
t.Errorf("expected receiving %d pings, got %d pings", e, a)
3468+
3469+
}
3470+
}
3471+
33123472
func TestTransportRetryAfterGOAWAY(t *testing.T) {
33133473
var dialer struct {
33143474
sync.Mutex

0 commit comments

Comments
 (0)