Skip to content

Commit 09c8616

Browse files
authored
Fix retries that timeout hanging forever. [ Backport to 1.61.x] (grpc#10886)
* Fix retries that timeout hanging forever. (grpc#10855) Fixes grpc#10336 * Fix flaky retry tests (grpc#10887) * Reorder tracing and actually closing listener to eliminate test flakiness * Use real value rather than mock for flaky test
1 parent a00d395 commit 09c8616

File tree

3 files changed

+41
-10
lines changed

3 files changed

+41
-10
lines changed

core/src/main/java/io/grpc/internal/AbstractClientStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,10 +455,10 @@ private void closeListener(
455455
if (!listenerClosed) {
456456
listenerClosed = true;
457457
statsTraceCtx.streamClosed(status);
458-
listener().closed(status, rpcProgress, trailers);
459458
if (getTransportTracer() != null) {
460459
getTransportTracer().reportStreamClosed(status.isOk());
461460
}
461+
listener().closed(status, rpcProgress, trailers);
462462
}
463463
}
464464
}

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,10 @@ public void run() {
195195
}
196196
}
197197
if (retryFuture != null) {
198-
retryFuture.cancel(false);
198+
boolean cancelled = retryFuture.cancel(false);
199+
if (cancelled) {
200+
inFlightSubStreams.decrementAndGet();
201+
}
199202
}
200203
if (hedgingFuture != null) {
201204
hedgingFuture.cancel(false);

interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818

1919
import static com.google.common.truth.Truth.assertThat;
2020
import static java.util.concurrent.TimeUnit.SECONDS;
21+
import static org.junit.Assert.assertNotNull;
22+
import static org.mockito.AdditionalAnswers.delegatesTo;
2123
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.Mockito.mock;
2225
import static org.mockito.Mockito.never;
2326
import static org.mockito.Mockito.timeout;
2427
import static org.mockito.Mockito.verify;
@@ -78,8 +81,6 @@
7881
import org.junit.Test;
7982
import org.junit.runner.RunWith;
8083
import org.junit.runners.JUnit4;
81-
import org.mockito.ArgumentCaptor;
82-
import org.mockito.Mock;
8384
import org.mockito.junit.MockitoJUnit;
8485
import org.mockito.junit.MockitoRule;
8586

@@ -103,8 +104,11 @@ public class RetryTest {
103104
@Rule
104105
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
105106
private final FakeClock fakeClock = new FakeClock();
106-
@Mock
107-
private ClientCall.Listener<Integer> mockCallListener;
107+
private TestListener testCallListener = new TestListener();
108+
@SuppressWarnings("unchecked")
109+
private ClientCall.Listener<Integer> mockCallListener =
110+
mock(ClientCall.Listener.class, delegatesTo(testCallListener));
111+
108112
private CountDownLatch backoffLatch = new CountDownLatch(1);
109113
private final EventLoopGroup group = new DefaultEventLoopGroup() {
110114
@SuppressWarnings("FutureReturnValueIgnored")
@@ -244,8 +248,10 @@ private void assertInboundWireSizeRecorded(long length) throws Exception {
244248

245249
private void assertRpcStatusRecorded(
246250
Status.Code code, long roundtripLatencyMs, long outboundMessages) throws Exception {
247-
MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
251+
MetricsRecord record = clientStatsRecorder.pollRecord(7, SECONDS);
252+
assertNotNull(record);
248253
TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
254+
assertNotNull(statusTag);
249255
assertThat(statusTag.asString()).isEqualTo(code.toString());
250256
assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT))
251257
.isEqualTo(1);
@@ -295,14 +301,14 @@ public void retryUntilBufferLimitExceeded() throws Exception {
295301
verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
296302
// send one more message, should exceed buffer limit
297303
call.sendMessage(message);
304+
298305
// let attempt fail
306+
testCallListener.clear();
299307
serverCall.close(
300308
Status.UNAVAILABLE.withDescription("2nd attempt failed"),
301309
new Metadata());
302310
// no more retry
303-
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
304-
verify(mockCallListener, timeout(5000)).onClose(statusCaptor.capture(), any(Metadata.class));
305-
assertThat(statusCaptor.getValue().getDescription()).contains("2nd attempt failed");
311+
testCallListener.verifyDescription("2nd attempt failed", 5000);
306312
}
307313

308314
@Test
@@ -534,4 +540,26 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata header
534540
assertRpcStatusRecorded(Code.INVALID_ARGUMENT, 0, 0);
535541
assertRetryStatsRecorded(0, 1, 0);
536542
}
543+
544+
private static class TestListener extends ClientCall.Listener<Integer> {
545+
Status status = null;
546+
private CountDownLatch closeLatch = new CountDownLatch(1);
547+
548+
@Override
549+
public void onClose(Status status, Metadata trailers) {
550+
this.status = status;
551+
closeLatch.countDown();
552+
}
553+
554+
void clear() {
555+
status = null;
556+
closeLatch = new CountDownLatch(1);
557+
}
558+
559+
void verifyDescription(String description, long timeoutMs) throws InterruptedException {
560+
closeLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
561+
assertNotNull(status);
562+
assertThat(status.getDescription()).contains(description);
563+
}
564+
}
537565
}

0 commit comments

Comments
 (0)