Skip to content

Commit 0467e47

Browse files
arvindbr8zasweq
andauthored
balancer/leastrequest: Cache atomic load and also add concurrent rpc test (#6607)
Co-authored-by: Zach Reyes <[email protected]>
1 parent 5d1c0ae commit 0467e47

File tree

2 files changed

+59
-5
lines changed

2 files changed

+59
-5
lines changed

balancer/leastrequest/balancer_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"encoding/json"
2323
"fmt"
2424
"strings"
25+
"sync"
2526
"testing"
2627
"time"
2728

@@ -455,3 +456,57 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) {
455456
t.Fatalf("addr count (-got:, +want): %v", diff)
456457
}
457458
}
459+
460+
// TestConcurrentRPCs tests concurrent RPCs on the least request balancer. It
461+
// configures a channel with a least request balancer as the top level balancer,
462+
// and makes 100 RPCs asynchronously. This makes sure no race conditions happen
463+
// in this scenario.
464+
func (s) TestConcurrentRPCs(t *testing.T) {
465+
addresses := setupBackends(t)
466+
467+
mr := manual.NewBuilderWithScheme("lr-e2e")
468+
defer mr.Close()
469+
470+
// Configure least request as top level balancer of channel.
471+
lrscJSON := `
472+
{
473+
"loadBalancingConfig": [
474+
{
475+
"least_request_experimental": {
476+
"choiceCount": 2
477+
}
478+
}
479+
]
480+
}`
481+
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
482+
firstTwoAddresses := []resolver.Address{
483+
{Addr: addresses[0]},
484+
{Addr: addresses[1]},
485+
}
486+
mr.InitialState(resolver.State{
487+
Addresses: firstTwoAddresses,
488+
ServiceConfig: sc,
489+
})
490+
491+
cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
492+
if err != nil {
493+
t.Fatalf("grpc.Dial() failed: %v", err)
494+
}
495+
defer cc.Close()
496+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
497+
defer cancel()
498+
testServiceClient := testgrpc.NewTestServiceClient(cc)
499+
500+
var wg sync.WaitGroup
501+
for i := 0; i < 100; i++ {
502+
wg.Add(1)
503+
go func() {
504+
defer wg.Done()
505+
for j := 0; j < 5; j++ {
506+
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
507+
}
508+
}()
509+
}
510+
wg.Wait()
511+
512+
}

balancer/leastrequest/leastrequest.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,15 +155,14 @@ type picker struct {
155155

156156
func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
157157
var pickedSC *scWithRPCCount
158+
var pickedSCNumRPCs int32
158159
for i := 0; i < int(p.choiceCount); i++ {
159160
index := grpcranduint32() % uint32(len(p.subConns))
160161
sc := p.subConns[index]
161-
if pickedSC == nil {
162-
pickedSC = &sc
163-
continue
164-
}
165-
if sc.numRPCs.Load() < pickedSC.numRPCs.Load() {
162+
n := sc.numRPCs.Load()
163+
if pickedSC == nil || n < pickedSCNumRPCs {
166164
pickedSC = &sc
165+
pickedSCNumRPCs = n
167166
}
168167
}
169168
// "The counter for a subchannel should be atomically incremented by one

0 commit comments

Comments
 (0)