Skip to content

Commit a671967

Browse files
authored
backport PRs to v1.41.x (#4810)
* xds: fix parent balancers to handle Idle children (#4801) * transport: fix log spam from Server Authentication Handshake errors (#4798)
1 parent 8c8b55e commit a671967

File tree

6 files changed

+152
-6
lines changed

6 files changed

+152
-6
lines changed

internal/transport/http2_server.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
139139
var err error
140140
conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
141141
if err != nil {
142-
// ErrConnDispatched means that the connection was dispatched away from
143-
// gRPC; those connections should be left open.
144-
if err == credentials.ErrConnDispatched {
142+
// ErrConnDispatched means that the connection was dispatched away
143+
// from gRPC; those connections should be left open. io.EOF means
144+
// the connection was closed before handshaking completed, which can
145+
// happen naturally from probers. Return these errors directly.
146+
if err == credentials.ErrConnDispatched || err == io.EOF {
145147
return nil, err
146148
}
147149
return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)

server.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -887,7 +887,12 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
887887
if err != credentials.ErrConnDispatched {
888888
c.Close()
889889
}
890-
channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
890+
// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
891+
if err != credentials.ErrConnDispatched {
892+
if err != io.EOF {
893+
channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
894+
}
895+
}
891896
return nil
892897
}
893898

xds/internal/balancer/clustermanager/balancerstateaggregator.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,18 @@ func (bsa *balancerStateAggregator) build() balancer.State {
183183
// handling the special connecting after ready, as in UpdateState(). Then a
184184
// function to calculate the aggregated connectivity state as in this
185185
// function.
186-
var readyN, connectingN int
186+
//
187+
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
188+
// state.
189+
var readyN, connectingN, idleN int
187190
for _, ps := range bsa.idToPickerState {
188191
switch ps.stateToAggregate {
189192
case connectivity.Ready:
190193
readyN++
191194
case connectivity.Connecting:
192195
connectingN++
196+
case connectivity.Idle:
197+
idleN++
193198
}
194199
}
195200
var aggregatedState connectivity.State
@@ -198,6 +203,8 @@ func (bsa *balancerStateAggregator) build() balancer.State {
198203
aggregatedState = connectivity.Ready
199204
case connectingN > 0:
200205
aggregatedState = connectivity.Connecting
206+
case idleN > 0:
207+
aggregatedState = connectivity.Idle
201208
default:
202209
aggregatedState = connectivity.TransientFailure
203210
}

xds/internal/balancer/clustermanager/clustermanager_test.go

+65
Original file line numberDiff line numberDiff line change
@@ -565,3 +565,68 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
565565
t.Fatal(err2)
566566
}
567567
}
568+
569+
const initIdleBalancerName = "test-init-Idle-balancer"
570+
571+
var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
572+
573+
func init() {
574+
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
575+
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
576+
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
577+
return nil
578+
},
579+
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
580+
err := fmt.Errorf("wrong picker error")
581+
if state.ConnectivityState == connectivity.Idle {
582+
err = errTestInitIdle
583+
}
584+
bd.ClientConn.UpdateState(balancer.State{
585+
ConnectivityState: state.ConnectivityState,
586+
Picker: &testutils.TestConstPicker{Err: err},
587+
})
588+
},
589+
})
590+
}
591+
592+
// TestInitialIdle covers the case that if the child reports Idle, the overall
593+
// state will be Idle.
594+
func TestInitialIdle(t *testing.T) {
595+
cc := testutils.NewTestClientConn(t)
596+
rtb := rtBuilder.Build(cc, balancer.BuildOptions{})
597+
598+
configJSON1 := `{
599+
"children": {
600+
"cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] }
601+
}
602+
}`
603+
604+
config1, err := rtParser.ParseConfig([]byte(configJSON1))
605+
if err != nil {
606+
t.Fatalf("failed to parse balancer config: %v", err)
607+
}
608+
609+
// Send the config, and an address with hierarchy path ["cluster_1"].
610+
wantAddrs := []resolver.Address{
611+
{Addr: testBackendAddrStrs[0], Attributes: nil},
612+
}
613+
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
614+
ResolverState: resolver.State{Addresses: []resolver.Address{
615+
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
616+
}},
617+
BalancerConfig: config1,
618+
}); err != nil {
619+
t.Fatalf("failed to update ClientConn state: %v", err)
620+
}
621+
622+
// Verify that a subconn is created with the address, and the hierarchy path
623+
// in the address is cleared.
624+
for range wantAddrs {
625+
sc := <-cc.NewSubConnCh
626+
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
627+
}
628+
629+
if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
630+
t.Fatalf("Received aggregated state: %v, want Idle", state1)
631+
}
632+
}

xds/internal/balancer/weightedtarget/weightedaggregator/aggregator.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,9 @@ func (wbsa *Aggregator) BuildAndUpdate() {
200200
func (wbsa *Aggregator) build() balancer.State {
201201
wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
202202
m := wbsa.idToPickerState
203-
var readyN, connectingN int
203+
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
204+
// state.
205+
var readyN, connectingN, idleN int
204206
readyPickerWithWeights := make([]weightedPickerState, 0, len(m))
205207
for _, ps := range m {
206208
switch ps.stateToAggregate {
@@ -209,6 +211,8 @@ func (wbsa *Aggregator) build() balancer.State {
209211
readyPickerWithWeights = append(readyPickerWithWeights, *ps)
210212
case connectivity.Connecting:
211213
connectingN++
214+
case connectivity.Idle:
215+
idleN++
212216
}
213217
}
214218
var aggregatedState connectivity.State
@@ -217,6 +221,8 @@ func (wbsa *Aggregator) build() balancer.State {
217221
aggregatedState = connectivity.Ready
218222
case connectingN > 0:
219223
aggregatedState = connectivity.Connecting
224+
case idleN > 0:
225+
aggregatedState = connectivity.Idle
220226
default:
221227
aggregatedState = connectivity.TransientFailure
222228
}

xds/internal/balancer/weightedtarget/weightedtarget_test.go

+61
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"google.golang.org/grpc/balancer"
3030
"google.golang.org/grpc/balancer/roundrobin"
3131
"google.golang.org/grpc/connectivity"
32+
"google.golang.org/grpc/internal/balancer/stub"
3233
"google.golang.org/grpc/internal/hierarchy"
3334
"google.golang.org/grpc/resolver"
3435
"google.golang.org/grpc/serviceconfig"
@@ -263,3 +264,63 @@ func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
263264
return scst.SubConn
264265
}
265266
}
267+
268+
const initIdleBalancerName = "test-init-Idle-balancer"
269+
270+
var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
271+
272+
func init() {
273+
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
274+
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
275+
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
276+
return nil
277+
},
278+
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
279+
err := fmt.Errorf("wrong picker error")
280+
if state.ConnectivityState == connectivity.Idle {
281+
err = errTestInitIdle
282+
}
283+
bd.ClientConn.UpdateState(balancer.State{
284+
ConnectivityState: state.ConnectivityState,
285+
Picker: &testutils.TestConstPicker{Err: err},
286+
})
287+
},
288+
})
289+
}
290+
291+
// TestInitialIdle covers the case that if the child reports Idle, the overall
292+
// state will be Idle.
293+
func TestInitialIdle(t *testing.T) {
294+
cc := testutils.NewTestClientConn(t)
295+
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
296+
297+
// Start with "cluster_1: round_robin".
298+
config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"test-init-Idle-balancer":""}]}}}`))
299+
if err != nil {
300+
t.Fatalf("failed to parse balancer config: %v", err)
301+
}
302+
303+
// Send the config, and an address with hierarchy path ["cluster_1"].
304+
wantAddrs := []resolver.Address{
305+
{Addr: testBackendAddrStrs[0], Attributes: nil},
306+
}
307+
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
308+
ResolverState: resolver.State{Addresses: []resolver.Address{
309+
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
310+
}},
311+
BalancerConfig: config1,
312+
}); err != nil {
313+
t.Fatalf("failed to update ClientConn state: %v", err)
314+
}
315+
316+
// Verify that a subconn is created with the address, and the hierarchy path
317+
// in the address is cleared.
318+
for range wantAddrs {
319+
sc := <-cc.NewSubConnCh
320+
wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
321+
}
322+
323+
if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
324+
t.Fatalf("Received aggregated state: %v, want Idle", state1)
325+
}
326+
}

0 commit comments

Comments
 (0)