From f3ed6effde8c3fede35e78f217364a1585c04286 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Tue, 28 Jan 2025 18:33:23 +0530 Subject: [PATCH 1/5] core: updates the backoff range being used from [0, 1] to [0.8, 1.2] as per the A6 redefinition --- .../io/grpc/internal/RetriableStream.java | 3 +- .../grpc/internal/ManagedChannelImplTest.java | 4 +- .../io/grpc/internal/RetriableStreamTest.java | 89 ++++++++++--------- .../grpc/testing/integration/RetryTest.java | 10 +-- 4 files changed, 57 insertions(+), 49 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 85d7bc86584..e895e9a3743 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -1066,7 +1066,8 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { if (pushbackMillis == null) { if (isRetryableStatusCode) { shouldRetry = true; - backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble()); + // Apply jitter by multiplying with a random factor between 0.8 and 1.2 + backoffNanos = (long) (nextBackoffIntervalNanos * (0.8 + random.nextDouble() * 0.4)); nextBackoffIntervalNanos = Math.min( (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier), retryPolicy.maxBackoffNanos); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 98900cecf2b..21ccf1095df 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -3875,7 +3875,7 @@ public double nextDouble() { Status.UNAVAILABLE, PROCESSED, new Metadata()); // in backoff - timer.forwardTime(5, TimeUnit.SECONDS); + timer.forwardTime(6, TimeUnit.SECONDS); assertThat(timer.getPendingTasks()).hasSize(1); verify(mockStream2, never()).start(any(ClientStreamListener.class)); @@ -3894,7 +3894,7 @@ public double nextDouble() { assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription()); // backoff ends - timer.forwardTime(5, TimeUnit.SECONDS); + timer.forwardTime(6, TimeUnit.SECONDS); assertThat(timer.getPendingTasks()).isEmpty(); verify(mockStream2).start(streamListenerCaptor.capture()); verify(mockLoadBalancer, never()).shutdown(); diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 658ed70a135..23c3155279f 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -147,6 +147,27 @@ public double nextDouble() { private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter(); private final FakeClock fakeClock = new FakeClock(); + private static double calculateJitterFactor() { + return (0.8 + FAKE_RANDOM * 0.4); + } + + private static long calculateInitialBackoff() { + return (long) (INITIAL_BACKOFF_IN_SECONDS * calculateJitterFactor()); + } + + private static long calculateBackoff() { + return (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * calculateJitterFactor()); + } + + private static long calculateBackoffSquared() { + return (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER + * calculateJitterFactor()); + } + + private static long calculateMaxBackoff() { + return (long) (MAX_BACKOFF_IN_SECONDS * calculateJitterFactor()); + } + private final class RecordedRetriableStream extends RetriableStream { RecordedRetriableStream(MethodDescriptor method, Metadata headers, ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, @@ -307,7 +328,7 @@ public Void answer(InvocationOnMock in) { retriableStream.sendMessage("msg1 during backoff1"); retriableStream.sendMessage("msg2 during backoff1"); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff() - 1L, TimeUnit.SECONDS); inOrder.verifyNoMoreInteractions(); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); @@ -364,9 +385,7 @@ public Void answer(InvocationOnMock in) { retriableStream.sendMessage("msg2 during backoff2"); retriableStream.sendMessage("msg3 during backoff2"); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L, - TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoff() - 1L, TimeUnit.SECONDS); inOrder.verifyNoMoreInteractions(); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); @@ -459,7 +478,7 @@ public void retry_headersRead_cancel() { sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -518,7 +537,7 @@ public void retry_headersRead_closed() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -584,7 +603,7 @@ public void retry_cancel_closed() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -687,7 +706,7 @@ public void retry_unretriableClosed_cancel() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -821,7 +840,7 @@ public boolean isReady() { // send more requests during backoff retriableStream.request(789); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); inOrder.verify(mockStream2).start(sublistenerCaptor2.get()); inOrder.verify(mockStream2).request(3); @@ -875,7 +894,7 @@ public void request(int numMessages) { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).request(3); @@ -920,7 +939,7 @@ public void start(ClientStreamListener listener) { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(retriableStreamRecorder).postCommit(); @@ -1028,7 +1047,7 @@ public boolean isReady() { retriableStream.request(789); readiness.add(retriableStream.isReady()); // expected false b/c in backoff - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); readiness.add(retriableStream.isReady()); // expected true @@ -1110,7 +1129,7 @@ public void addPrevRetryAttemptsToRespHeaders() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1160,13 +1179,12 @@ public void start(ClientStreamListener listener) { listener1.closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); // send requests during backoff retriableStream.request(3); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoff(), TimeUnit.SECONDS); retriableStream.request(1); verify(mockStream1, never()).request(anyInt()); @@ -1207,7 +1225,7 @@ public void start(ClientStreamListener listener) { // retry listener1.closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); verify(retriableStreamRecorder).postCommit(); @@ -1260,7 +1278,7 @@ public void perRpcBufferLimitExceededDuringBackoff() { bufferSizeTracer.outboundWireSize(2); verify(retriableStreamRecorder, never()).postCommit(); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); verify(mockStream2).isReady(); @@ -1332,7 +1350,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff() - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1347,9 +1365,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor2.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L, - TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoff() - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1364,10 +1380,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor3.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM) - - 1L, - TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffSquared() - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1382,7 +1395,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor4.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateMaxBackoff() - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1397,7 +1410,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor5.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateMaxBackoff() - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1480,7 +1493,7 @@ public void pushback() { sublistenerCaptor3.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff() - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1495,9 +1508,7 @@ public void pushback() { sublistenerCaptor4.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L, - TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoff() - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1512,10 +1523,7 @@ public void pushback() { sublistenerCaptor5.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM) - - 1L, - TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffSquared() - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1804,7 +1812,7 @@ public void transparentRetry_onlyOnceOnRefused() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(1); ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1907,7 +1915,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(1); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1923,8 +1931,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoff(), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(2); ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1960,7 +1967,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(1); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java index edd2a57ab9d..669ce1c69db 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -303,7 +303,7 @@ public void retryUntilBufferLimitExceeded() throws Exception { serverCall.close( Status.UNAVAILABLE.withDescription("original attempt failed"), new Metadata()); - elapseBackoff(10, SECONDS); + elapseBackoff(12, SECONDS); // 2nd attempt received serverCall = serverCalls.poll(5, SECONDS); serverCall.request(2); @@ -348,7 +348,7 @@ public void statsRecorded() throws Exception { Status.UNAVAILABLE.withDescription("original attempt failed"), new Metadata()); assertRpcStatusRecorded(Status.Code.UNAVAILABLE, 1000, 1); - elapseBackoff(10, SECONDS); + elapseBackoff(12, SECONDS); assertRpcStartedRecorded(); assertOutboundMessageRecorded(); serverCall = serverCalls.poll(5, SECONDS); @@ -366,7 +366,7 @@ public void statsRecorded() throws Exception { call.request(1); assertInboundMessageRecorded(); assertInboundWireSizeRecorded(1); - assertRpcStatusRecorded(Status.Code.OK, 12000, 2); + assertRpcStatusRecorded(Status.Code.OK, 14000, 2); assertRetryStatsRecorded(1, 0, 0); } @@ -418,7 +418,7 @@ public void streamClosed(Status status) { Status.UNAVAILABLE.withDescription("original attempt failed"), new Metadata()); assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1); - elapseBackoff(10, SECONDS); + elapseBackoff(12, SECONDS); assertRpcStartedRecorded(); assertOutboundMessageRecorded(); serverCall = serverCalls.poll(5, SECONDS); @@ -431,7 +431,7 @@ public void streamClosed(Status status) { streamClosedLatch.countDown(); // The call listener is closed. verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class)); - assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1); + assertRpcStatusRecorded(Code.CANCELLED, 19_000, 1); assertRetryStatsRecorded(1, 0, 0); } From 29824e31884b8a71c13db1817121473cab4847f3 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Thu, 30 Jan 2025 14:27:06 +0530 Subject: [PATCH 2/5] adds a flag for experimental jitter --- .../io/grpc/internal/RetriableStream.java | 13 +++- .../io/grpc/internal/RetriableStreamTest.java | 72 ++++++++----------- 2 files changed, 42 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index e895e9a3743..0cc2848ffbd 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -846,6 +846,16 @@ public void run() { } } + private static boolean isExperimentalRetryJitterEnabled() { + return GrpcUtil.getFlag("GRPC_EXPERIMENTAL_RETRY_JITTER", true); + } + + public static long intervalWithJitter(long intervalNanos) { + double inverseJitterFactor = isExperimentalRetryJitterEnabled() + ? 0.8 * random.nextDouble() + 0.4 : random.nextDouble(); + return (long) (intervalNanos * inverseJitterFactor); + } + private static final class SavedCloseMasterListenerReason { private final Status status; private final RpcProgress progress; @@ -1066,8 +1076,7 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { if (pushbackMillis == null) { if (isRetryableStatusCode) { shouldRetry = true; - // Apply jitter by multiplying with a random factor between 0.8 and 1.2 - backoffNanos = (long) (nextBackoffIntervalNanos * (0.8 + random.nextDouble() * 0.4)); + backoffNanos = intervalWithJitter(nextBackoffIntervalNanos); nextBackoffIntervalNanos = Math.min( (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier), retryPolicy.maxBackoffNanos); diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 23c3155279f..9b1ec343bb7 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -147,25 +147,15 @@ public double nextDouble() { private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter(); private final FakeClock fakeClock = new FakeClock(); - private static double calculateJitterFactor() { - return (0.8 + FAKE_RANDOM * 0.4); - } - - private static long calculateInitialBackoff() { - return (long) (INITIAL_BACKOFF_IN_SECONDS * calculateJitterFactor()); - } - - private static long calculateBackoff() { - return (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * calculateJitterFactor()); - } - - private static long calculateBackoffSquared() { - return (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER - * calculateJitterFactor()); + private static long calculateBackoffWithRetries(int retryCount) { + // Calculate the exponential backoff delay with jitter + double exponent = retryCount > 0 ? Math.pow(BACKOFF_MULTIPLIER, retryCount) : 1; + long delay = (long) (INITIAL_BACKOFF_IN_SECONDS * exponent); + return RetriableStream.intervalWithJitter(delay); } private static long calculateMaxBackoff() { - return (long) (MAX_BACKOFF_IN_SECONDS * calculateJitterFactor()); + return RetriableStream.intervalWithJitter(MAX_BACKOFF_IN_SECONDS); } private final class RecordedRetriableStream extends RetriableStream { @@ -328,7 +318,7 @@ public Void answer(InvocationOnMock in) { retriableStream.sendMessage("msg1 during backoff1"); retriableStream.sendMessage("msg2 during backoff1"); - fakeClock.forwardTime(calculateInitialBackoff() - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0) - 1L, TimeUnit.SECONDS); inOrder.verifyNoMoreInteractions(); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); @@ -385,7 +375,7 @@ public Void answer(InvocationOnMock in) { retriableStream.sendMessage("msg2 during backoff2"); retriableStream.sendMessage("msg3 during backoff2"); - fakeClock.forwardTime(calculateBackoff() - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(1) - 1L, TimeUnit.SECONDS); inOrder.verifyNoMoreInteractions(); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); @@ -478,7 +468,7 @@ public void retry_headersRead_cancel() { sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -537,7 +527,7 @@ public void retry_headersRead_closed() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -603,7 +593,7 @@ public void retry_cancel_closed() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -706,7 +696,7 @@ public void retry_unretriableClosed_cancel() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -840,7 +830,7 @@ public boolean isReady() { // send more requests during backoff retriableStream.request(789); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(mockStream2).start(sublistenerCaptor2.get()); inOrder.verify(mockStream2).request(3); @@ -894,7 +884,7 @@ public void request(int numMessages) { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).request(3); @@ -939,7 +929,7 @@ public void start(ClientStreamListener listener) { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(retriableStreamRecorder).postCommit(); @@ -1047,7 +1037,7 @@ public boolean isReady() { retriableStream.request(789); readiness.add(retriableStream.isReady()); // expected false b/c in backoff - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); readiness.add(retriableStream.isReady()); // expected true @@ -1129,7 +1119,7 @@ public void addPrevRetryAttemptsToRespHeaders() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1179,12 +1169,12 @@ public void start(ClientStreamListener listener) { listener1.closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); // send requests during backoff retriableStream.request(3); - fakeClock.forwardTime(calculateBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(1), TimeUnit.SECONDS); retriableStream.request(1); verify(mockStream1, never()).request(anyInt()); @@ -1225,7 +1215,7 @@ public void start(ClientStreamListener listener) { // retry listener1.closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); verify(retriableStreamRecorder).postCommit(); @@ -1278,7 +1268,7 @@ public void perRpcBufferLimitExceededDuringBackoff() { bufferSizeTracer.outboundWireSize(2); verify(retriableStreamRecorder, never()).postCommit(); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); verify(mockStream2).isReady(); @@ -1350,7 +1340,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateInitialBackoff() - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1365,7 +1355,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor2.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateBackoff() - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(1) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1380,7 +1370,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor3.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateBackoffSquared() - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(2) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1493,7 +1483,7 @@ public void pushback() { sublistenerCaptor3.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateInitialBackoff() - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1508,7 +1498,7 @@ public void pushback() { sublistenerCaptor4.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateBackoff() - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(1) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1523,7 +1513,7 @@ public void pushback() { sublistenerCaptor5.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateBackoffSquared() - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(2) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1812,7 +1802,7 @@ public void transparentRetry_onlyOnceOnRefused() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(1); ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1915,7 +1905,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(1); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1931,7 +1921,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(1), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(2); ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1967,7 +1957,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime(calculateInitialBackoff(), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(1); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); From c2f5c1c5f04010b3f31b117b227434cd3bc50125 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Tue, 28 Jan 2025 16:38:32 -0800 Subject: [PATCH 3/5] xds: Allow FaultFilter's interceptor to be reused This is the only usage of PickSubchannelArgs when creating a filter's ClientInterceptor, and a follow-up commit will remove the argument and actually reuse the interceptors. Other filter's interceptors can already be reused. There doesn't seem to be any significant loss of legibility by making FaultFilter a more ordinary interceptor, but the change does cause the ForwardingClientCall to be present when faultDelay is configured, independent of whether the fault delay ends up being triggered. Reusing interceptors will move more state management out of the RPC path which will be more relevant with RLQS. --- .../main/java/io/grpc/xds/FaultFilter.java | 154 +++++++++--------- 1 file changed, 81 insertions(+), 73 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/FaultFilter.java b/xds/src/main/java/io/grpc/xds/FaultFilter.java index d46b3d30f5a..b7f7fa9c226 100644 --- a/xds/src/main/java/io/grpc/xds/FaultFilter.java +++ b/xds/src/main/java/io/grpc/xds/FaultFilter.java @@ -190,94 +190,102 @@ public ClientInterceptor buildClientInterceptor( config = overrideConfig; } FaultConfig faultConfig = (FaultConfig) config; - Long delayNanos = null; - Status abortStatus = null; - if (faultConfig.maxActiveFaults() == null - || activeFaultCounter.get() < faultConfig.maxActiveFaults()) { - Metadata headers = args.getHeaders(); - if (faultConfig.faultDelay() != null) { - delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers); - } - if (faultConfig.faultAbort() != null) { - abortStatus = determineFaultAbortStatus(faultConfig.faultAbort(), headers); - } - } - if (delayNanos == null && abortStatus == null) { - return null; - } - final Long finalDelayNanos = delayNanos; - final Status finalAbortStatus = getAbortStatusWithDescription(abortStatus); final class FaultInjectionInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( final MethodDescriptor method, final CallOptions callOptions, final Channel next) { - Executor callExecutor = callOptions.getExecutor(); - if (callExecutor == null) { // This should never happen in practice because - // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with - // a callExecutor. - // TODO(https://github.com/grpc/grpc-java/issues/7868) - callExecutor = MoreExecutors.directExecutor(); + boolean checkFault = false; + if (faultConfig.maxActiveFaults() == null + || activeFaultCounter.get() < faultConfig.maxActiveFaults()) { + checkFault = faultConfig.faultDelay() != null || faultConfig.faultAbort() != null; } - if (finalDelayNanos != null) { - Supplier> callSupplier; - if (finalAbortStatus != null) { - callSupplier = Suppliers.ofInstance( - new FailingClientCall(finalAbortStatus, callExecutor)); - } else { - callSupplier = new Supplier>() { - @Override - public ClientCall get() { - return next.newCall(method, callOptions); - } - }; + if (!checkFault) { + return next.newCall(method, callOptions); + } + final class DeadlineInsightForwardingCall extends ForwardingClientCall { + private ClientCall delegate; + + @Override + protected ClientCall delegate() { + return delegate; } - final DelayInjectedCall delayInjectedCall = new DelayInjectedCall<>( - finalDelayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier); - final class DeadlineInsightForwardingCall extends ForwardingClientCall { - @Override - protected ClientCall delegate() { - return delayInjectedCall; + @Override + public void start(Listener listener, Metadata headers) { + Executor callExecutor = callOptions.getExecutor(); + if (callExecutor == null) { // This should never happen in practice because + // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with + // a callExecutor. + // TODO(https://github.com/grpc/grpc-java/issues/7868) + callExecutor = MoreExecutors.directExecutor(); } - @Override - public void start(Listener listener, Metadata headers) { - Listener finalListener = - new SimpleForwardingClientCallListener(listener) { - @Override - public void onClose(Status status, Metadata trailers) { - if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) { - // TODO(zdapeng:) check effective deadline locally, and - // do the following only if the local deadline is exceeded. - // (If the server sends DEADLINE_EXCEEDED for its own deadline, then the - // injected delay does not contribute to the error, because the request is - // only sent out after the delay. There could be a race between local and - // remote, but it is rather rare.) - String description = String.format( - Locale.US, - "Deadline exceeded after up to %d ns of fault-injected delay", - finalDelayNanos); - if (status.getDescription() != null) { - description = description + ": " + status.getDescription(); - } - status = Status.DEADLINE_EXCEEDED - .withDescription(description).withCause(status.getCause()); - // Replace trailers to prevent mixing sources of status and trailers. - trailers = new Metadata(); + Long delayNanos; + Status abortStatus = null; + if (faultConfig.faultDelay() != null) { + delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers); + } else { + delayNanos = null; + } + if (faultConfig.faultAbort() != null) { + abortStatus = getAbortStatusWithDescription( + determineFaultAbortStatus(faultConfig.faultAbort(), headers)); + } + + Supplier> callSupplier; + if (abortStatus != null) { + callSupplier = Suppliers.ofInstance( + new FailingClientCall(abortStatus, callExecutor)); + } else { + callSupplier = new Supplier>() { + @Override + public ClientCall get() { + return next.newCall(method, callOptions); + } + }; + } + if (delayNanos == null) { + delegate = callSupplier.get(); + delegate().start(listener, headers); + return; + } + + delegate = new DelayInjectedCall<>( + delayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier); + + Listener finalListener = + new SimpleForwardingClientCallListener(listener) { + @Override + public void onClose(Status status, Metadata trailers) { + if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) { + // TODO(zdapeng:) check effective deadline locally, and + // do the following only if the local deadline is exceeded. + // (If the server sends DEADLINE_EXCEEDED for its own deadline, then the + // injected delay does not contribute to the error, because the request is + // only sent out after the delay. There could be a race between local and + // remote, but it is rather rare.) + String description = String.format( + Locale.US, + "Deadline exceeded after up to %d ns of fault-injected delay", + delayNanos); + if (status.getDescription() != null) { + description = description + ": " + status.getDescription(); } - delegate().onClose(status, trailers); + status = Status.DEADLINE_EXCEEDED + .withDescription(description).withCause(status.getCause()); + // Replace trailers to prevent mixing sources of status and trailers. + trailers = new Metadata(); } - }; - delegate().start(finalListener, headers); - } + delegate().onClose(status, trailers); + } + }; + delegate().start(finalListener, headers); } - - return new DeadlineInsightForwardingCall(); - } else { - return new FailingClientCall<>(finalAbortStatus, callExecutor); } + + return new DeadlineInsightForwardingCall(); } } From c3244f6a2391c86578b4c8c3ba3e182cd2b70af1 Mon Sep 17 00:00:00 2001 From: Abhishek Agrawal <81427947+AgraVator@users.noreply.github.com> Date: Thu, 30 Jan 2025 12:52:37 +0530 Subject: [PATCH 4/5] netty: Removed 4096 min buffer size (#11856) * netty: Removed 4096 min buffer size --- .../java/io/grpc/netty/NettyWritableBufferAllocator.java | 5 +---- .../io/grpc/netty/NettyWritableBufferAllocatorTest.java | 7 ------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java b/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java index 9e93ee1155c..40b84717160 100644 --- a/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java +++ b/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java @@ -33,9 +33,6 @@ */ class NettyWritableBufferAllocator implements WritableBufferAllocator { - // Use 4k as our minimum buffer size. - private static final int MIN_BUFFER = 4 * 1024; - // Set the maximum buffer size to 1MB. private static final int MAX_BUFFER = 1024 * 1024; @@ -47,7 +44,7 @@ class NettyWritableBufferAllocator implements WritableBufferAllocator { @Override public WritableBuffer allocate(int capacityHint) { - capacityHint = Math.min(MAX_BUFFER, Math.max(MIN_BUFFER, capacityHint)); + capacityHint = Math.min(MAX_BUFFER, capacityHint); return new NettyWritableBuffer(allocator.buffer(capacityHint, capacityHint)); } } diff --git a/netty/src/test/java/io/grpc/netty/NettyWritableBufferAllocatorTest.java b/netty/src/test/java/io/grpc/netty/NettyWritableBufferAllocatorTest.java index d577ec46b03..0b741ae24b3 100644 --- a/netty/src/test/java/io/grpc/netty/NettyWritableBufferAllocatorTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyWritableBufferAllocatorTest.java @@ -40,13 +40,6 @@ protected WritableBufferAllocator allocator() { return allocator; } - @Test - public void testCapacityHasMinimum() { - WritableBuffer buffer = allocator().allocate(100); - assertEquals(0, buffer.readableBytes()); - assertEquals(4096, buffer.writableBytes()); - } - @Test public void testCapacityIsExactAboveMinimum() { WritableBuffer buffer = allocator().allocate(9000); From dad68ffd5543f770a6aec822b006f3cd17a9d917 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Wed, 5 Feb 2025 14:34:58 +0530 Subject: [PATCH 5/5] turns the flag in a var for better efficiency --- core/src/main/java/io/grpc/internal/RetriableStream.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 0cc2848ffbd..7765408a627 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -846,12 +846,11 @@ public void run() { } } - private static boolean isExperimentalRetryJitterEnabled() { - return GrpcUtil.getFlag("GRPC_EXPERIMENTAL_RETRY_JITTER", true); - } + private static final boolean isExperimentalRetryJitterEnabled = GrpcUtil + .getFlag("GRPC_EXPERIMENTAL_XDS_RLS_LB", true); public static long intervalWithJitter(long intervalNanos) { - double inverseJitterFactor = isExperimentalRetryJitterEnabled() + double inverseJitterFactor = isExperimentalRetryJitterEnabled ? 0.8 * random.nextDouble() + 0.4 : random.nextDouble(); return (long) (intervalNanos * inverseJitterFactor); }