Skip to content

Commit eaa4ea7

Browse files
committed
core: Fix NPE race during hedging
The problem was one hedge was committed before another had drained start(). This was not testable because HedgingRunnable checks whether scheduledHedgingRef is cancelled, which is racy, but there's no way to deterministically trigger either race. The same problem couldn't be triggered with retries because only one attempt will be draining at a time. Retries with cancellation also couldn't trigger it, for the surprising reason that the noop stream used in cancel() wasn't considered drained. This commit marks the noop stream as drained with cancel(), which allows memory to be garbage collected sooner and exposes the race for tests. That then showed the stream as hanging, because inFlightSubStreams wasn't being decremented. Fixes #9185
1 parent a9c66b5 commit eaa4ea7

File tree

2 files changed

+66
-20
lines changed

2 files changed

+66
-20
lines changed

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -282,14 +282,12 @@ private void drain(Substream substream) {
282282

283283
synchronized (lock) {
284284
savedState = state;
285-
if (streamStarted) {
286-
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
287-
// committed but not me, to be cancelled
288-
break;
289-
}
290-
if (savedState.cancelled) {
291-
break;
292-
}
285+
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
286+
// committed but not me, to be cancelled
287+
break;
288+
}
289+
if (savedState.cancelled) {
290+
break;
293291
}
294292
if (index == savedState.buffer.size()) { // I'm drained
295293
state = savedState.substreamDrained(substream);
@@ -326,15 +324,13 @@ public void run() {
326324
if (bufferEntry instanceof RetriableStream.StartEntry) {
327325
streamStarted = true;
328326
}
329-
if (streamStarted) {
330-
savedState = state;
331-
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
332-
// committed but not me, to be cancelled
333-
break;
334-
}
335-
if (savedState.cancelled) {
336-
break;
337-
}
327+
savedState = state;
328+
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
329+
// committed but not me, to be cancelled
330+
break;
331+
}
332+
if (savedState.cancelled) {
333+
break;
338334
}
339335
}
340336
}
@@ -344,6 +340,10 @@ public void run() {
344340
return;
345341
}
346342

343+
if (!streamStarted) {
344+
// Start stream so inFlightSubStreams is decremented in Sublistener.closed()
345+
substream.stream.start(new Sublistener(substream));
346+
}
347347
substream.stream.cancel(
348348
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
349349
}
@@ -484,6 +484,8 @@ public void run() {
484484
}
485485

486486
if (cancelled) {
487+
// Start stream so inFlightSubStreams is decremented in Sublistener.closed()
488+
newSubstream.stream.start(new Sublistener(newSubstream));
487489
newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
488490
return;
489491
}
@@ -507,6 +509,9 @@ public final void cancel(final Status reason) {
507509
Runnable runnable = commit(noopSubstream);
508510

509511
if (runnable != null) {
512+
synchronized (lock) {
513+
state = state.substreamDrained(noopSubstream);
514+
}
510515
runnable.run();
511516
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
512517
return;

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,18 +183,21 @@ Status prestart() {
183183
}
184184
}
185185

186-
private final RetriableStream<String> retriableStream =
186+
private RetriableStream<String> retriableStream =
187187
newThrottledRetriableStream(null /* throttle */);
188188
private final RetriableStream<String> hedgingStream =
189189
newThrottledHedgingStream(null /* throttle */);
190190

191191
private ClientStreamTracer bufferSizeTracer;
192192

193193
private RetriableStream<String> newThrottledRetriableStream(Throttle throttle) {
194+
return newThrottledRetriableStream(throttle, MoreExecutors.directExecutor());
195+
}
196+
197+
private RetriableStream<String> newThrottledRetriableStream(Throttle throttle, Executor drainer) {
194198
return new RecordedRetriableStream(
195199
method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
196-
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), RETRY_POLICY,
197-
null, throttle);
200+
drainer, fakeClock.getScheduledExecutorService(), RETRY_POLICY, null, throttle);
198201
}
199202

200203
private RetriableStream<String> newThrottledHedgingStream(Throttle throttle) {
@@ -593,6 +596,44 @@ public void retry_cancel_closed() {
593596
inOrder.verify(retriableStreamRecorder, never()).postCommit();
594597
}
595598

599+
@Test
600+
public void transparentRetry_cancel_race() {
601+
FakeClock drainer = new FakeClock();
602+
retriableStream = newThrottledRetriableStream(null, drainer.getScheduledExecutorService());
603+
ClientStream mockStream1 = mock(ClientStream.class);
604+
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
605+
InOrder inOrder = inOrder(retriableStreamRecorder);
606+
607+
retriableStream.start(masterListener);
608+
609+
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
610+
ArgumentCaptor.forClass(ClientStreamListener.class);
611+
verify(mockStream1).start(sublistenerCaptor1.capture());
612+
613+
// retry, but don't drain
614+
ClientStream mockStream2 = mock(ClientStream.class);
615+
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(0);
616+
sublistenerCaptor1.getValue().closed(
617+
Status.fromCode(NON_RETRIABLE_STATUS_CODE), MISCARRIED, new Metadata());
618+
assertEquals(1, drainer.numPendingTasks());
619+
620+
// cancel
621+
retriableStream.cancel(Status.CANCELLED);
622+
// drain transparent retry
623+
drainer.runDueTasks();
624+
inOrder.verify(retriableStreamRecorder).postCommit();
625+
626+
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
627+
ArgumentCaptor.forClass(ClientStreamListener.class);
628+
verify(mockStream2).start(sublistenerCaptor2.capture());
629+
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
630+
verify(mockStream2).cancel(statusCaptor.capture());
631+
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
632+
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
633+
sublistenerCaptor2.getValue().closed(statusCaptor.getValue(), PROCESSED, new Metadata());
634+
verify(masterListener).closed(same(Status.CANCELLED), same(PROCESSED), any(Metadata.class));
635+
}
636+
596637
@Test
597638
public void unretriableClosed_cancel() {
598639
ClientStream mockStream1 = mock(ClientStream.class);

0 commit comments

Comments
 (0)