Skip to content

Commit bbd6747

Browse files
skipper: support http2 over cleartext TCP (h2c)
Works around golang/go#26682 by introducing ShutdownListener that tracks active connections and implements graceful shutdown. For #1253 Signed-off-by: Alexander Yastrebov <[email protected]>
1 parent f35389e commit bbd6747

File tree

4 files changed

+233
-25
lines changed

4 files changed

+233
-25
lines changed

config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ type Config struct {
245245
ExpectContinueTimeoutBackend time.Duration `yaml:"expect-continue-timeout-backend"`
246246
MaxIdleConnsBackend int `yaml:"max-idle-connection-backend"`
247247
DisableHTTPKeepalives bool `yaml:"disable-http-keepalives"`
248+
EnableHttp2Cleartext bool `yaml:"enable-http2-cleartext"`
248249

249250
// swarm:
250251
EnableSwarm bool `yaml:"enable-swarm"`
@@ -523,6 +524,7 @@ func NewConfig() *Config {
523524
flag.IntVar(&cfg.MaxIdleConnsBackend, "max-idle-connection-backend", 0, "sets the maximum idle connections for all backend connections")
524525
flag.BoolVar(&cfg.DisableHTTPKeepalives, "disable-http-keepalives", false, "forces backend to always create a new connection")
525526
flag.BoolVar(&cfg.KubernetesEnableTLS, "kubernetes-enable-tls", false, "enable using kubnernetes resources to terminate tls")
527+
flag.BoolVar(&cfg.EnableHttp2Cleartext, "enable-http2-cleartext", false, "enables HTTP/2 connections over cleartext TCP")
526528

527529
// Swarm:
528530
flag.BoolVar(&cfg.EnableSwarm, "enable-swarm", false, "enable swarm communication between nodes in a skipper fleet")
@@ -850,6 +852,7 @@ func (c *Config) ToOptions() skipper.Options {
850852
MaxIdleConnsBackend: c.MaxIdleConnsBackend,
851853
DisableHTTPKeepalives: c.DisableHTTPKeepalives,
852854
KubernetesEnableTLS: c.KubernetesEnableTLS,
855+
EnableHttp2Cleartext: c.EnableHttp2Cleartext,
853856

854857
// swarm:
855858
EnableSwarm: c.EnableSwarm,

net/shutdown_listener.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package net
2+
3+
import (
4+
"context"
5+
"net"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
10+
log "github.com/sirupsen/logrus"
11+
)
12+
13+
type (
14+
ShutdownListener struct {
15+
net.Listener
16+
activeConns atomic.Int64
17+
}
18+
19+
shutdownListenerConn struct {
20+
net.Conn
21+
listener *ShutdownListener
22+
closeOnce sync.Once
23+
}
24+
)
25+
26+
var _ net.Listener = &ShutdownListener{}
27+
28+
func NewShutdownListener(l net.Listener) *ShutdownListener {
29+
return &ShutdownListener{Listener: l}
30+
}
31+
32+
func (l *ShutdownListener) Accept() (net.Conn, error) {
33+
c, err := l.Listener.Accept()
34+
if err != nil {
35+
return nil, err
36+
}
37+
38+
l.registerConn()
39+
40+
return &shutdownListenerConn{Conn: c, listener: l}, nil
41+
}
42+
43+
func (l *ShutdownListener) Close() error {
44+
err := l.Listener.Close()
45+
return err
46+
}
47+
48+
func (l *ShutdownListener) Shutdown(ctx context.Context) error {
49+
ticker := time.NewTicker(500 * time.Millisecond)
50+
defer ticker.Stop()
51+
for {
52+
n := l.activeConns.Load()
53+
log.Debugf("ShutdownListener Shutdown: %d connections", n)
54+
if n == 0 {
55+
return nil
56+
}
57+
select {
58+
case <-ctx.Done():
59+
return ctx.Err()
60+
case <-ticker.C:
61+
}
62+
}
63+
}
64+
65+
func (c *shutdownListenerConn) Close() error {
66+
err := c.Conn.Close()
67+
68+
c.closeOnce.Do(func() { c.listener.unregisterConn() })
69+
70+
return err
71+
}
72+
73+
func (l *ShutdownListener) registerConn() {
74+
n := l.activeConns.Add(1)
75+
log.Debugf("ShutdownListener registerConn: %d connections", n)
76+
}
77+
78+
func (l *ShutdownListener) unregisterConn() {
79+
n := l.activeConns.Add(-1)
80+
log.Debugf("ShutdownListener unregisterConn: %d connections", n)
81+
}

skipper.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
ot "github.com/opentracing/opentracing-go"
2323
"github.com/prometheus/client_golang/prometheus"
2424
log "github.com/sirupsen/logrus"
25+
"golang.org/x/net/http2"
26+
"golang.org/x/net/http2/h2c"
2527

2628
"github.com/zalando/skipper/circuit"
2729
"github.com/zalando/skipper/dataclients/kubernetes"
@@ -42,7 +44,7 @@ import (
4244
"github.com/zalando/skipper/loadbalancer"
4345
"github.com/zalando/skipper/logging"
4446
"github.com/zalando/skipper/metrics"
45-
skpnet "github.com/zalando/skipper/net"
47+
snet "github.com/zalando/skipper/net"
4648
pauth "github.com/zalando/skipper/predicates/auth"
4749
"github.com/zalando/skipper/predicates/content"
4850
"github.com/zalando/skipper/predicates/cookie"
@@ -381,6 +383,9 @@ type Options struct {
381383
// a backend to always create a new connection.
382384
DisableHTTPKeepalives bool
383385

386+
// EnableHttp2Cleartext enables HTTP/2 connections over cleartext TCP.
387+
EnableHttp2Cleartext bool
388+
384389
// Flag indicating to ignore trailing slashes in paths during route
385390
// lookup.
386391
IgnoreTrailingSlash bool
@@ -1233,11 +1238,31 @@ func listenAndServeQuit(
12331238
}
12341239
}
12351240

1241+
if o.EnableHttp2Cleartext {
1242+
if serveTLS {
1243+
return fmt.Errorf("HTTP/2 connections over cleartext TCP are not supported when TLS is enabled")
1244+
}
1245+
1246+
h2srv := &http2.Server{}
1247+
srv.Handler = h2c.NewHandler(srv.Handler, h2srv)
1248+
1249+
// Work around https://github.com/golang/go/issues/26682
1250+
// http2.ConfigureServer registers unexported h2srv graceful shutdown handler on srv shutdown -
1251+
// it calls srv.RegisterOnShutdown(h2srv.state.startGracefulShutdown).
1252+
// h2srv graceful shutdown handler sends GOAWAY frame to all connections and closes them after predefined delay.
1253+
//
1254+
// srv.Shutdown() runs h2srv shutdown handler in a goroutine so a special snet.ShutdownListener
1255+
// waits until all connections are closed.
1256+
http2.ConfigureServer(srv, h2srv)
1257+
}
1258+
12361259
log.Infof("Listen on %v", address)
12371260

1238-
l, err := listen(o, address, mtr)
1239-
if err != nil {
1261+
var listener *snet.ShutdownListener
1262+
if l, err := listen(o, address, mtr); err != nil {
12401263
return err
1264+
} else {
1265+
listener = snet.NewShutdownListener(l)
12411266
}
12421267

12431268
// making idleConnsCH and sigs optional parameters is required to be able to tear down a server
@@ -1258,10 +1283,16 @@ func listenAndServeQuit(
12581283
log.Infof("Got shutdown signal, wait %v for health check", o.WaitForHealthcheckInterval)
12591284
time.Sleep(o.WaitForHealthcheckInterval)
12601285

1261-
log.Info("Start shutdown")
1286+
log.Info("Start server shutdown")
12621287
if err := srv.Shutdown(context.Background()); err != nil {
1263-
log.Errorf("Failed to graceful shutdown: %v", err)
1288+
log.Errorf("Failed to gracefully shutdown: %v", err)
1289+
}
1290+
1291+
log.Info("Start listener shutdown")
1292+
if err := listener.Shutdown(context.Background()); err != nil {
1293+
log.Errorf("Failed to gracefully shutdown listener: %v", err)
12641294
}
1295+
12651296
close(idleConnsCH)
12661297
}()
12671298

@@ -1281,20 +1312,21 @@ func listenAndServeQuit(
12811312
}()
12821313
}
12831314

1284-
if err := srv.ServeTLS(l, "", ""); err != http.ErrServerClosed {
1315+
if err := srv.ServeTLS(listener, "", ""); err != http.ErrServerClosed {
12851316
log.Errorf("ServeTLS failed: %v", err)
12861317
return err
12871318
}
12881319
} else {
12891320
log.Infof("TLS settings not found, defaulting to HTTP")
12901321

1291-
if err := srv.Serve(l); err != http.ErrServerClosed {
1322+
if err := srv.Serve(listener); err != http.ErrServerClosed {
12921323
log.Errorf("Serve failed: %v", err)
12931324
return err
12941325
}
12951326
}
12961327

12971328
<-idleConnsCH
1329+
12981330
log.Infof("done.")
12991331
return nil
13001332
}
@@ -1580,13 +1612,13 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
15801612
}
15811613

15821614
var swarmer ratelimit.Swarmer
1583-
var redisOptions *skpnet.RedisOptions
1615+
var redisOptions *snet.RedisOptions
15841616
log.Infof("enable swarm: %v", o.EnableSwarm)
15851617
if o.EnableSwarm {
15861618
if len(o.SwarmRedisURLs) > 0 || o.KubernetesRedisServiceName != "" || o.SwarmRedisEndpointsRemoteURL != "" {
15871619
log.Infof("Redis based swarm with %d shards", len(o.SwarmRedisURLs))
15881620

1589-
redisOptions = &skpnet.RedisOptions{
1621+
redisOptions = &snet.RedisOptions{
15901622
Addrs: o.SwarmRedisURLs,
15911623
Password: o.SwarmRedisPassword,
15921624
HashAlgorithm: o.SwarmRedisHashAlgorithm,

0 commit comments

Comments
 (0)