Skip to content

Commit b1b2424

Browse files
authored
rls:Fix throttling in route lookup (b/262779100) (#9874) (#9879)
* Correct value being passed to throttler which had been backwards. * Fix flaky test. * Add a test using AdaptiveThrottler with a CachingRlsLBClient. * Address test flakiness.
1 parent 501ca8f commit b1b2424

File tree

4 files changed

+133
-9
lines changed

4 files changed

+133
-9
lines changed

interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ private void scheduleNextChunk() {
414414

415415
// Schedule the next response chunk if there is one.
416416
Chunk nextChunk = chunks.peek();
417-
if (nextChunk != null) {
417+
if (nextChunk != null && !executor.isShutdown()) {
418418
scheduled = true;
419419
// TODO(ejona): cancel future if RPC is cancelled
420420
Future<?> unused = executor.schedule(new LogExceptionRunnable(dispatchTask),

interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,11 @@ private void doTest(int bandwidth, int latency) throws InterruptedException {
147147
// deal with cases that either don't cause a window update or hit max window
148148
expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW));
149149

150-
// Range looks large, but this allows for only one extra/missed window update
150+
// Range looks large, but this allows for only one extra/missed window update plus
151+
// bdpPing variations.
151152
// (one extra update causes a 2x difference and one missed update causes a .5x difference)
152153
assertTrue("Window was " + lastWindow + " expecting " + expectedWindow,
153-
lastWindow < 2 * expectedWindow);
154+
lastWindow < 2.2 * expectedWindow);
154155
assertTrue("Window was " + lastWindow + " expecting " + expectedWindow,
155156
expectedWindow < 2 * lastWindow);
156157
}
@@ -194,6 +195,7 @@ private static class TestStreamObserver implements StreamObserver<StreamingOutpu
194195
final CountDownLatch latch = new CountDownLatch(1);
195196
final long expectedWindow;
196197
int lastWindow;
198+
boolean wasCompleted;
197199

198200
public TestStreamObserver(
199201
AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window) {
@@ -206,9 +208,18 @@ public TestStreamObserver(
206208
public void onNext(StreamingOutputCallResponse value) {
207209
GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get();
208210
Http2Stream connectionStream = grpcHandler.connection().connectionStream();
209-
lastWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
210-
if (lastWindow >= expectedWindow) {
211-
onCompleted();
211+
int curWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
212+
synchronized (this) {
213+
if (curWindow >= expectedWindow) {
214+
if (wasCompleted) {
215+
return;
216+
}
217+
wasCompleted = true;
218+
lastWindow = curWindow;
219+
onCompleted();
220+
} else if (!wasCompleted) {
221+
lastWindow = curWindow;
222+
}
212223
}
213224
}
214225

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,13 +218,13 @@ public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
218218
public void onError(Throwable t) {
219219
logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
220220
response.setException(t);
221-
throttler.registerBackendResponse(false);
221+
throttler.registerBackendResponse(true);
222222
helper.propagateRlsError();
223223
}
224224

225225
@Override
226226
public void onCompleted() {
227-
throttler.registerBackendResponse(true);
227+
throttler.registerBackendResponse(false);
228228
}
229229
});
230230
return response;

rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ public void uncaughtException(Thread t, Throwable e) {
151151
private String rlsChannelOverriddenAuthority;
152152

153153
private void setUpRlsLbClient() {
154+
fakeThrottler.resetCounts();
154155
rlsLbClient =
155156
CachingRlsLbClient.newBuilder()
156157
.setBackoffProvider(fakeBackoffProvider)
@@ -362,6 +363,8 @@ public void get_updatesLbState() throws Exception {
362363
assertThat(pickResult.getStatus().isOk()).isTrue();
363364
assertThat(pickResult.getSubchannel()).isNotNull();
364365
assertThat(headers.get(RLS_DATA_KEY)).isEqualTo("header-rls-data-value");
366+
assertThat(fakeThrottler.getNumThrottled()).isEqualTo(0);
367+
assertThat(fakeThrottler.getNumUnthrottled()).isEqualTo(1);
365368

366369
// move backoff further back to only test error behavior
367370
fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
@@ -388,6 +391,97 @@ public void get_updatesLbState() throws Exception {
388391
CallOptions.DEFAULT));
389392
assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
390393
assertThat(pickResult.getStatus().getDescription()).contains("fallback not available");
394+
assertThat(fakeThrottler.getNumThrottled()).isEqualTo(1);
395+
assertThat(fakeThrottler.getNumUnthrottled()).isEqualTo(1);
396+
}
397+
398+
@Test
399+
public void get_withAdaptiveThrottler() throws Exception {
400+
AdaptiveThrottler adaptiveThrottler =
401+
new AdaptiveThrottler.Builder()
402+
.setHistorySeconds(1)
403+
.setRatioForAccepts(1.0f)
404+
.setRequestsPadding(1)
405+
.setTicker(fakeClock.getTicker())
406+
.build();
407+
408+
this.rlsLbClient =
409+
CachingRlsLbClient.newBuilder()
410+
.setBackoffProvider(fakeBackoffProvider)
411+
.setResolvedAddressesFactory(resolvedAddressFactory)
412+
.setEvictionListener(evictionListener)
413+
.setHelper(helper)
414+
.setLbPolicyConfig(lbPolicyConfiguration)
415+
.setThrottler(adaptiveThrottler)
416+
.setTicker(fakeClock.getTicker())
417+
.build();
418+
InOrder inOrder = inOrder(helper);
419+
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of(
420+
"server", "bigtable.googleapis.com", "service-key", "service1", "method-key", "create"));
421+
rlsServerImpl.setLookupTable(
422+
ImmutableMap.of(
423+
routeLookupRequest,
424+
RouteLookupResponse.create(
425+
ImmutableList.of("primary.cloudbigtable.googleapis.com"),
426+
"header-rls-data-value")));
427+
428+
// valid channel
429+
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
430+
assertThat(resp.isPending()).isTrue();
431+
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
432+
433+
resp = getInSyncContext(routeLookupRequest);
434+
assertThat(resp.hasData()).isTrue();
435+
436+
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
437+
ArgumentCaptor<ConnectivityState> stateCaptor =
438+
ArgumentCaptor.forClass(ConnectivityState.class);
439+
inOrder.verify(helper, times(2))
440+
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
441+
442+
Metadata headers = new Metadata();
443+
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(
444+
new PickSubchannelArgsImpl(
445+
TestMethodDescriptors.voidMethod().toBuilder().setFullMethodName("service1/create")
446+
.build(),
447+
headers,
448+
CallOptions.DEFAULT));
449+
assertThat(pickResult.getSubchannel()).isNotNull();
450+
assertThat(headers.get(RLS_DATA_KEY)).isEqualTo("header-rls-data-value");
451+
452+
// move backoff further back to only test error behavior
453+
fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
454+
// try to get invalid
455+
RouteLookupRequest invalidRouteLookupRequest =
456+
RouteLookupRequest.create(ImmutableMap.<String, String>of());
457+
CachedRouteLookupResponse errorResp = getInSyncContext(invalidRouteLookupRequest);
458+
assertThat(errorResp.isPending()).isTrue();
459+
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
460+
461+
errorResp = getInSyncContext(invalidRouteLookupRequest);
462+
assertThat(errorResp.hasError()).isTrue();
463+
464+
// Channel is still READY because the subchannel for method /service1/create is still READY.
465+
// Method /doesn/exists will use fallback child balancer and fail immediately.
466+
inOrder.verify(helper)
467+
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
468+
PickSubchannelArgsImpl invalidArgs = getInvalidArgs(headers);
469+
pickResult = pickerCaptor.getValue().pickSubchannel(invalidArgs);
470+
assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
471+
assertThat(pickResult.getStatus().getDescription()).contains("fallback not available");
472+
long time = fakeClock.getTicker().read();
473+
assertThat(adaptiveThrottler.requestStat.get(time)).isEqualTo(2L);
474+
assertThat(adaptiveThrottler.throttledStat.get(time)).isEqualTo(1L);
475+
}
476+
477+
private PickSubchannelArgsImpl getInvalidArgs(Metadata headers) {
478+
PickSubchannelArgsImpl invalidArgs = new PickSubchannelArgsImpl(
479+
TestMethodDescriptors.voidMethod().toBuilder()
480+
.setFullMethodName("doesn/exists")
481+
.build(),
482+
headers,
483+
CallOptions.DEFAULT);
484+
return invalidArgs;
391485
}
392486

393487
@Test
@@ -755,6 +849,8 @@ public ChannelLogger getChannelLogger() {
755849
}
756850

757851
private static final class FakeThrottler implements Throttler {
852+
int numUnthrottled;
853+
int numThrottled;
758854

759855
private boolean nextResult = false;
760856

@@ -765,7 +861,24 @@ public boolean shouldThrottle() {
765861

766862
@Override
767863
public void registerBackendResponse(boolean throttled) {
768-
// no-op
864+
if (throttled) {
865+
numThrottled++;
866+
} else {
867+
numUnthrottled++;
868+
}
869+
}
870+
871+
public int getNumUnthrottled() {
872+
return numUnthrottled;
873+
}
874+
875+
public int getNumThrottled() {
876+
return numThrottled;
877+
}
878+
879+
public void resetCounts() {
880+
numThrottled = 0;
881+
numUnthrottled = 0;
769882
}
770883
}
771884
}

0 commit comments

Comments
 (0)