Skip to content

Commit bc0d6c6

Browse files
author
Chao Xu
committed
http2: connection pool periodically sends ping frame and closes the
connection if the ping is not responded on time. DO NOT SUBMIT Updates golang/go#31643
1 parent aa69164 commit bc0d6c6

File tree

3 files changed

+100
-5
lines changed

3 files changed

+100
-5
lines changed

http2/client_conn_pool.go

+53
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
package http2
88

99
import (
10+
"context"
1011
"crypto/tls"
1112
"net/http"
1213
"sync"
14+
"time"
1315
)
1416

1517
// ClientConnPool manages a pool of HTTP/2 client connections.
@@ -41,6 +43,16 @@ type clientConnPool struct {
4143
dialing map[string]*dialCall // currently in-flight dials
4244
keys map[*ClientConn][]string
4345
addConnCalls map[string]*addConnCall // in-flight addConnIfNeede calls
46+
47+
// TODO: figure out a way to allow user to configure pingPeriod and
48+
// pingTimeout.
49+
pingPeriod time.Duration // how often pings are sent on idle
50+
// connections. The connection will be closed if response is not
51+
// received within pingTimeout. 0 means no periodic pings.
52+
pingTimeout time.Duration // connection will be force closed if a Ping
53+
// response is not received within pingTimeout.
54+
pingStops map[*ClientConn]chan struct{} // channels to stop the
55+
// periodic Pings.
4456
}
4557

4658
func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
@@ -219,13 +231,54 @@ func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
219231
if p.keys == nil {
220232
p.keys = make(map[*ClientConn][]string)
221233
}
234+
if p.pingStops == nil {
235+
p.pingStops = make(map[*ClientConn]chan struct{})
236+
}
222237
p.conns[key] = append(p.conns[key], cc)
223238
p.keys[cc] = append(p.keys[cc], key)
239+
if p.pingPeriod != 0 {
240+
p.pingStops[cc] = p.pingConnection(key, cc)
241+
}
242+
}
243+
244+
// TODO: ping all connections at the same tick to save tickers?
245+
func (p *clientConnPool) pingConnection(key string, cc *ClientConn) chan struct{} {
246+
done := make(chan struct{})
247+
go func() {
248+
ticker := time.NewTicker(p.pingPeriod)
249+
defer ticker.Stop()
250+
for {
251+
select {
252+
case <-done:
253+
return
254+
default:
255+
}
256+
257+
select {
258+
case <-done:
259+
return
260+
case <-ticker.C:
261+
ctx, _ := context.WithTimeout(context.Background(), p.pingTimeout)
262+
err := cc.Ping(ctx)
263+
if err != nil {
264+
cc.closeForLostPing()
265+
p.MarkDead(cc)
266+
}
267+
}
268+
}
269+
}()
270+
return done
224271
}
225272

226273
func (p *clientConnPool) MarkDead(cc *ClientConn) {
227274
p.mu.Lock()
228275
defer p.mu.Unlock()
276+
277+
if done, ok := p.pingStops[cc]; ok {
278+
close(done)
279+
delete(p.pingStops, cc)
280+
}
281+
229282
for _, key := range p.keys[cc] {
230283
vv, ok := p.conns[key]
231284
if !ok {

http2/transport.go

+21-5
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ func ConfigureTransport(t1 *http.Transport) error {
140140

141141
func configureTransport(t1 *http.Transport) (*Transport, error) {
142142
connPool := new(clientConnPool)
143+
// TODO: figure out a way to allow user to configure pingPeriod and
144+
// pingTimeout.
145+
connPool.pingPeriod = 5 * time.Second
146+
connPool.pingTimeout = 1 * time.Second
143147
t2 := &Transport{
144148
ConnPool: noDialClientConnPool{connPool},
145149
t1: t1,
@@ -834,14 +838,12 @@ func (cc *ClientConn) sendGoAway() error {
834838
return nil
835839
}
836840

837-
// Close closes the client connection immediately.
838-
//
839-
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
840-
func (cc *ClientConn) Close() error {
841+
// closes the client connection immediately. In-flight requests are interrupted.
842+
// err is sent to streams.
843+
func (cc *ClientConn) closeForError(err error) error {
841844
cc.mu.Lock()
842845
defer cc.cond.Broadcast()
843846
defer cc.mu.Unlock()
844-
err := errors.New("http2: client connection force closed via ClientConn.Close")
845847
for id, cs := range cc.streams {
846848
select {
847849
case cs.resc <- resAndError{err: err}:
@@ -854,6 +856,20 @@ func (cc *ClientConn) Close() error {
854856
return cc.tconn.Close()
855857
}
856858

859+
// Close closes the client connection immediately.
860+
//
861+
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
862+
func (cc *ClientConn) Close() error {
863+
err := errors.New("http2: client connection force closed via ClientConn.Close")
864+
return cc.closeForError(err)
865+
}
866+
867+
// closes the client connection immediately. In-flight requests are interrupted.
868+
func (cc *ClientConn) closeForLostPing() error {
869+
err := errors.New("http2: client connection force closed because ping frame is not responded")
870+
return cc.closeForError(err)
871+
}
872+
857873
const maxAllocFrameSize = 512 << 10
858874

859875
// frameBuffer returns a scratch buffer suitable for writing DATA frames.

http2/transport_test.go

+26
Original file line numberDiff line numberDiff line change
@@ -3244,6 +3244,32 @@ func TestTransportNoRaceOnRequestObjectAfterRequestComplete(t *testing.T) {
32443244
req.Header = http.Header{}
32453245
}
32463246

3247+
func TestTransportCloseAfterLostPing(t *testing.T) {
3248+
clientDone := make(chan struct{})
3249+
ct := newClientTester(t)
3250+
connPool := new(clientConnPool)
3251+
connPool.pingPeriod = 1 * time.Second
3252+
connPool.pingTimeout = 100 * time.Millisecond
3253+
connPool.t = ct.tr
3254+
ct.tr.ConnPool = connPool
3255+
ct.client = func() error {
3256+
defer ct.cc.(*net.TCPConn).CloseWrite()
3257+
defer close(clientDone)
3258+
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
3259+
_, err := ct.tr.RoundTrip(req)
3260+
if err == nil || !strings.Contains(err.Error(), "ping frame is not responded") {
3261+
return fmt.Errorf("expected to get error about \"ping frame is not responded\", got %v", err)
3262+
}
3263+
return nil
3264+
}
3265+
ct.server = func() error {
3266+
ct.greet()
3267+
<-clientDone
3268+
return nil
3269+
}
3270+
ct.run()
3271+
}
3272+
32473273
func TestTransportRetryAfterGOAWAY(t *testing.T) {
32483274
var dialer struct {
32493275
sync.Mutex

0 commit comments

Comments
 (0)