-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathgrpc.go
242 lines (208 loc) · 7.98 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.
package grpc
import (
"context"
"crypto/tls"
"crypto/x509"
"os"
"path/filepath"
"runtime/debug"
"time"
"github.com/gitpod-io/gitpod/common-go/log"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/xerrors"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
)
// maxMsgSize use 16MB as the default message size limit.
// grpc library default is 4MB
const maxMsgSize = 1024 * 1024 * 16
var defaultClientOptionsConfig struct {
Metrics *grpc_prometheus.ClientMetrics
}
// ClientMetrics produces client-side gRPC metrics
func ClientMetrics() prometheus.Collector {
res := grpc_prometheus.NewClientMetrics()
defaultClientOptionsConfig.Metrics = res
return res
}
// DefaultClientOptions returns the default grpc client connection options
func DefaultClientOptions() []grpc.DialOption {
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = 5 * time.Second
var (
unaryInterceptor = []grpc.UnaryClientInterceptor{
grpc_opentracing.UnaryClientInterceptor(grpc_opentracing.WithTracer(opentracing.GlobalTracer())),
}
streamInterceptor = []grpc.StreamClientInterceptor{
grpc_opentracing.StreamClientInterceptor(grpc_opentracing.WithTracer(opentracing.GlobalTracer())),
}
)
if defaultClientOptionsConfig.Metrics != nil {
unaryInterceptor = append(unaryInterceptor, defaultClientOptionsConfig.Metrics.UnaryClientInterceptor())
streamInterceptor = append(streamInterceptor, defaultClientOptionsConfig.Metrics.StreamClientInterceptor())
}
res := []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(unaryInterceptor...)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(streamInterceptor...)),
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: bfConf,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
}
return res
}
// DefaultServerOptions returns the default ServerOption sets options for internal components
func DefaultServerOptions() []grpc.ServerOption {
return ServerOptionsWithInterceptors([]grpc.StreamServerInterceptor{}, []grpc.UnaryServerInterceptor{})
}
// ServerOptionsWithInterceptors returns the default ServerOption sets options for internal components with additional interceptors.
// By default, Interceptors for OpenTracing (grpc_opentracing) are added as the last one.
func ServerOptionsWithInterceptors(stream []grpc.StreamServerInterceptor, unary []grpc.UnaryServerInterceptor) []grpc.ServerOption {
tracingFilterFunc := grpc_opentracing.WithFilterFunc(func(ctx context.Context, fullMethodName string) bool {
return fullMethodName != "/grpc.health.v1.Health/Check"
})
stream = append(stream,
grpc_opentracing.StreamServerInterceptor(tracingFilterFunc),
grpc_recovery.StreamServerInterceptor(), // must be last, to be executed first after the rpc handler, we want upstream interceptors to have a meaningful response to work with
)
unary = append(unary,
grpc_opentracing.UnaryServerInterceptor(tracingFilterFunc),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandlerContext(
func(ctx context.Context, p interface{}) error {
log.WithField("stack", string(debug.Stack())).Errorf("[PANIC] %s", p)
return status.Errorf(codes.Internal, "%s", p)
},
)), // must be last, to be executed first after the rpc handler, we want upstream interceptors to have a meaningful response to work with
)
return []grpc.ServerOption{
// terminate the connection if the client pings more than once every 2 seconds
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
}),
grpc.MaxRecvMsgSize(maxMsgSize),
// We don't know how good our cients are at closing connections. If they don't close them properly
// we'll be leaking goroutines left and right. Closing Idle connections should prevent that.
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 30 * time.Minute,
}),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(stream...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unary...)),
}
}
func SetupLogging() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(
log.WithField("component", "grpc").WriterLevel(logrus.DebugLevel),
log.WithField("component", "grpc").WriterLevel(logrus.WarnLevel),
log.WithField("component", "grpc").WriterLevel(logrus.ErrorLevel),
))
}
// TLSConfigOption configures a TLSConfig
type TLSConfigOption func(*tlsConfigOptions) error
type tlsConfigOptions struct {
ClientAuth tls.ClientAuthType
ServerName string
RootCAs bool
ClientCAs bool
}
// WithClientAuth configures a policy for TLS Client Authentication
func WithClientAuth(authType tls.ClientAuthType) TLSConfigOption {
return func(ico *tlsConfigOptions) error {
ico.ClientAuth = authType
return nil
}
}
// WithServerName configures thge ServerName used to verify the hostname
func WithServerName(serverName string) TLSConfigOption {
return func(ico *tlsConfigOptions) error {
ico.ServerName = serverName
return nil
}
}
func WithSetRootCAs(rootCAs bool) TLSConfigOption {
return func(ico *tlsConfigOptions) error {
ico.RootCAs = rootCAs
return nil
}
}
func WithSetClientCAs(clientCAs bool) TLSConfigOption {
return func(ico *tlsConfigOptions) error {
ico.ClientCAs = clientCAs
return nil
}
}
func ClientAuthTLSConfig(authority, certificate, privateKey string, opts ...TLSConfigOption) (*tls.Config, error) {
// Telepresence (used for debugging only) requires special paths to load files from
if root := os.Getenv("TELEPRESENCE_ROOT"); root != "" {
authority = filepath.Join(root, authority)
certificate = filepath.Join(root, certificate)
privateKey = filepath.Join(root, privateKey)
}
// Load certs
tlsCertificate, err := tls.LoadX509KeyPair(certificate, privateKey)
if err != nil {
return nil, xerrors.Errorf("cannot load TLS certificate: %w", err)
}
// Create a certificate pool from the certificate authority
certPool := x509.NewCertPool()
ca, err := os.ReadFile(authority)
if err != nil {
return nil, xerrors.Errorf("cannot not read ca certificate: %w", err)
}
if ok := certPool.AppendCertsFromPEM(ca); !ok {
return nil, xerrors.Errorf("failed to append ca certs")
}
options := tlsConfigOptions{}
for _, o := range opts {
err := o(&options)
if err != nil {
return nil, err
}
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{tlsCertificate},
CipherSuites: []uint16{
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
},
CurvePreferences: []tls.CurveID{tls.X25519, tls.CurveP256},
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS12,
NextProtos: []string{"h2"},
}
tlsConfig.ServerName = options.ServerName
if options.ClientAuth != tls.NoClientCert {
log.WithField("clientAuth", options.ClientAuth).Info("enabling client authentication")
tlsConfig.ClientAuth = options.ClientAuth
}
if options.ClientCAs {
tlsConfig.ClientCAs = certPool
}
if options.RootCAs {
tlsConfig.RootCAs = certPool
}
return tlsConfig, nil
}