Skip to content

Commit 5390859

Browse files
committed
[v1.41.x_backport] xds: fix parent balancers to handle Idle children (grpc#4801)
1 parent 39f0ef6 commit 5390859

File tree

4 files changed

+141
-2
lines changed

4 files changed

+141
-2
lines changed

xds/internal/balancer/clustermanager/balancerstateaggregator.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,18 @@ func (bsa *balancerStateAggregator) build() balancer.State {
183183
// handling the special connecting after ready, as in UpdateState(). Then a
184184
// function to calculate the aggregated connectivity state as in this
185185
// function.
186-
var readyN, connectingN int
186+
//
187+
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
188+
// state.
189+
var readyN, connectingN, idleN int
187190
for _, ps := range bsa.idToPickerState {
188191
switch ps.stateToAggregate {
189192
case connectivity.Ready:
190193
readyN++
191194
case connectivity.Connecting:
192195
connectingN++
196+
case connectivity.Idle:
197+
idleN++
193198
}
194199
}
195200
var aggregatedState connectivity.State
@@ -198,6 +203,8 @@ func (bsa *balancerStateAggregator) build() balancer.State {
198203
aggregatedState = connectivity.Ready
199204
case connectingN > 0:
200205
aggregatedState = connectivity.Connecting
206+
case idleN > 0:
207+
aggregatedState = connectivity.Idle
201208
default:
202209
aggregatedState = connectivity.TransientFailure
203210
}

xds/internal/balancer/clustermanager/clustermanager_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,3 +565,68 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
565565
t.Fatal(err2)
566566
}
567567
}
568+
569+
const initIdleBalancerName = "test-init-Idle-balancer"
570+
571+
var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
572+
573+
func init() {
574+
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
575+
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
576+
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
577+
return nil
578+
},
579+
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
580+
err := fmt.Errorf("wrong picker error")
581+
if state.ConnectivityState == connectivity.Idle {
582+
err = errTestInitIdle
583+
}
584+
bd.ClientConn.UpdateState(balancer.State{
585+
ConnectivityState: state.ConnectivityState,
586+
Picker: &testutils.TestConstPicker{Err: err},
587+
})
588+
},
589+
})
590+
}
591+
592+
// TestInitialIdle covers the case that if the child reports Idle, the overall
593+
// state will be Idle.
594+
func TestInitialIdle(t *testing.T) {
595+
cc := testutils.NewTestClientConn(t)
596+
rtb := rtBuilder.Build(cc, balancer.BuildOptions{})
597+
598+
configJSON1 := `{
599+
"children": {
600+
"cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] }
601+
}
602+
}`
603+
604+
config1, err := rtParser.ParseConfig([]byte(configJSON1))
605+
if err != nil {
606+
t.Fatalf("failed to parse balancer config: %v", err)
607+
}
608+
609+
// Send the config, and an address with hierarchy path ["cluster_1"].
610+
wantAddrs := []resolver.Address{
611+
{Addr: testBackendAddrStrs[0], Attributes: nil},
612+
}
613+
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
614+
ResolverState: resolver.State{Addresses: []resolver.Address{
615+
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
616+
}},
617+
BalancerConfig: config1,
618+
}); err != nil {
619+
t.Fatalf("failed to update ClientConn state: %v", err)
620+
}
621+
622+
// Verify that a subconn is created with the address, and the hierarchy path
623+
// in the address is cleared.
624+
for range wantAddrs {
625+
sc := <-cc.NewSubConnCh
626+
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
627+
}
628+
629+
if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
630+
t.Fatalf("Received aggregated state: %v, want Idle", state1)
631+
}
632+
}

xds/internal/balancer/weightedtarget/weightedaggregator/aggregator.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,9 @@ func (wbsa *Aggregator) BuildAndUpdate() {
200200
func (wbsa *Aggregator) build() balancer.State {
201201
wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
202202
m := wbsa.idToPickerState
203-
var readyN, connectingN int
203+
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
204+
// state.
205+
var readyN, connectingN, idleN int
204206
readyPickerWithWeights := make([]weightedPickerState, 0, len(m))
205207
for _, ps := range m {
206208
switch ps.stateToAggregate {
@@ -209,6 +211,8 @@ func (wbsa *Aggregator) build() balancer.State {
209211
readyPickerWithWeights = append(readyPickerWithWeights, *ps)
210212
case connectivity.Connecting:
211213
connectingN++
214+
case connectivity.Idle:
215+
idleN++
212216
}
213217
}
214218
var aggregatedState connectivity.State
@@ -217,6 +221,8 @@ func (wbsa *Aggregator) build() balancer.State {
217221
aggregatedState = connectivity.Ready
218222
case connectingN > 0:
219223
aggregatedState = connectivity.Connecting
224+
case idleN > 0:
225+
aggregatedState = connectivity.Idle
220226
default:
221227
aggregatedState = connectivity.TransientFailure
222228
}

xds/internal/balancer/weightedtarget/weightedtarget_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"google.golang.org/grpc/balancer"
3030
"google.golang.org/grpc/balancer/roundrobin"
3131
"google.golang.org/grpc/connectivity"
32+
"google.golang.org/grpc/internal/balancer/stub"
3233
"google.golang.org/grpc/internal/hierarchy"
3334
"google.golang.org/grpc/resolver"
3435
"google.golang.org/grpc/serviceconfig"
@@ -263,3 +264,63 @@ func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
263264
return scst.SubConn
264265
}
265266
}
267+
268+
const initIdleBalancerName = "test-init-Idle-balancer"
269+
270+
var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
271+
272+
func init() {
273+
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
274+
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
275+
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
276+
return nil
277+
},
278+
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
279+
err := fmt.Errorf("wrong picker error")
280+
if state.ConnectivityState == connectivity.Idle {
281+
err = errTestInitIdle
282+
}
283+
bd.ClientConn.UpdateState(balancer.State{
284+
ConnectivityState: state.ConnectivityState,
285+
Picker: &testutils.TestConstPicker{Err: err},
286+
})
287+
},
288+
})
289+
}
290+
291+
// TestInitialIdle covers the case that if the child reports Idle, the overall
292+
// state will be Idle.
293+
func TestInitialIdle(t *testing.T) {
294+
cc := testutils.NewTestClientConn(t)
295+
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
296+
297+
// Start with "cluster_1: round_robin".
298+
config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"test-init-Idle-balancer":""}]}}}`))
299+
if err != nil {
300+
t.Fatalf("failed to parse balancer config: %v", err)
301+
}
302+
303+
// Send the config, and an address with hierarchy path ["cluster_1"].
304+
wantAddrs := []resolver.Address{
305+
{Addr: testBackendAddrStrs[0], Attributes: nil},
306+
}
307+
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
308+
ResolverState: resolver.State{Addresses: []resolver.Address{
309+
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
310+
}},
311+
BalancerConfig: config1,
312+
}); err != nil {
313+
t.Fatalf("failed to update ClientConn state: %v", err)
314+
}
315+
316+
// Verify that a subconn is created with the address, and the hierarchy path
317+
// in the address is cleared.
318+
for range wantAddrs {
319+
sc := <-cc.NewSubConnCh
320+
wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
321+
}
322+
323+
if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
324+
t.Fatalf("Received aggregated state: %v, want Idle", state1)
325+
}
326+
}

0 commit comments

Comments
 (0)