Skip to content

Commit 988a8c9

Browse files
committed
core: Fix NPE race during hedging
The problem was one hedge was committed before another had drained start(). This was not testable because HedgingRunnable checks whether scheduledHedgingRef is cancelled, which is racy, but there's no way to deterministically trigger either race. The same problem couldn't be triggered with retries because only one attempt will be draining at a time. Retries with cancellation also couldn't trigger it, for the surprising reason that the noop stream used in cancel() wasn't considered drained. This commit marks the noop stream as drained with cancel(), which allows memory to be garbage collected sooner and exposes the race for tests. That then showed the stream as hanging, because inFlightSubStreams wasn't being decremented. Fixes grpc#9185
1 parent 92b4fae commit 988a8c9

File tree

2 files changed

+66
-20
lines changed

2 files changed

+66
-20
lines changed

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -282,14 +282,12 @@ private void drain(Substream substream) {
282282

283283
synchronized (lock) {
284284
savedState = state;
285-
if (streamStarted) {
286-
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
287-
// committed but not me, to be cancelled
288-
break;
289-
}
290-
if (savedState.cancelled) {
291-
break;
292-
}
285+
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
286+
// committed but not me, to be cancelled
287+
break;
288+
}
289+
if (savedState.cancelled) {
290+
break;
293291
}
294292
if (index == savedState.buffer.size()) { // I'm drained
295293
state = savedState.substreamDrained(substream);
@@ -326,15 +324,13 @@ public void run() {
326324
if (bufferEntry instanceof RetriableStream.StartEntry) {
327325
streamStarted = true;
328326
}
329-
if (streamStarted) {
330-
savedState = state;
331-
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
332-
// committed but not me, to be cancelled
333-
break;
334-
}
335-
if (savedState.cancelled) {
336-
break;
337-
}
327+
savedState = state;
328+
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
329+
// committed but not me, to be cancelled
330+
break;
331+
}
332+
if (savedState.cancelled) {
333+
break;
338334
}
339335
}
340336
}
@@ -344,6 +340,10 @@ public void run() {
344340
return;
345341
}
346342

343+
if (!streamStarted) {
344+
// Start stream so inFlightSubStreams is decremented in Sublistener.closed()
345+
substream.stream.start(new Sublistener(substream));
346+
}
347347
substream.stream.cancel(
348348
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
349349
}
@@ -484,6 +484,8 @@ public void run() {
484484
}
485485

486486
if (cancelled) {
487+
// Start stream so inFlightSubStreams is decremented in Sublistener.closed()
488+
newSubstream.stream.start(new Sublistener(newSubstream));
487489
newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
488490
return;
489491
}
@@ -507,6 +509,9 @@ public final void cancel(final Status reason) {
507509
Runnable runnable = commit(noopSubstream);
508510

509511
if (runnable != null) {
512+
synchronized (lock) {
513+
state = state.substreamDrained(noopSubstream);
514+
}
510515
runnable.run();
511516
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
512517
return;

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,18 +188,21 @@ Status prestart() {
188188
}
189189
}
190190

191-
private final RetriableStream<String> retriableStream =
191+
private RetriableStream<String> retriableStream =
192192
newThrottledRetriableStream(null /* throttle */);
193193
private final RetriableStream<String> hedgingStream =
194194
newThrottledHedgingStream(null /* throttle */);
195195

196196
private ClientStreamTracer bufferSizeTracer;
197197

198198
private RetriableStream<String> newThrottledRetriableStream(Throttle throttle) {
199+
return newThrottledRetriableStream(throttle, MoreExecutors.directExecutor());
200+
}
201+
202+
private RetriableStream<String> newThrottledRetriableStream(Throttle throttle, Executor drainer) {
199203
return new RecordedRetriableStream(
200204
method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
201-
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), RETRY_POLICY,
202-
null, throttle);
205+
drainer, fakeClock.getScheduledExecutorService(), RETRY_POLICY, null, throttle);
203206
}
204207

205208
private RetriableStream<String> newThrottledHedgingStream(Throttle throttle) {
@@ -598,6 +601,44 @@ public void retry_cancel_closed() {
598601
inOrder.verify(retriableStreamRecorder, never()).postCommit();
599602
}
600603

604+
@Test
605+
public void transparentRetry_cancel_race() {
606+
FakeClock drainer = new FakeClock();
607+
retriableStream = newThrottledRetriableStream(null, drainer.getScheduledExecutorService());
608+
ClientStream mockStream1 = mock(ClientStream.class);
609+
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
610+
InOrder inOrder = inOrder(retriableStreamRecorder);
611+
612+
retriableStream.start(masterListener);
613+
614+
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
615+
ArgumentCaptor.forClass(ClientStreamListener.class);
616+
verify(mockStream1).start(sublistenerCaptor1.capture());
617+
618+
// retry, but don't drain
619+
ClientStream mockStream2 = mock(ClientStream.class);
620+
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(0);
621+
sublistenerCaptor1.getValue().closed(
622+
Status.fromCode(NON_RETRIABLE_STATUS_CODE), MISCARRIED, new Metadata());
623+
assertEquals(1, drainer.numPendingTasks());
624+
625+
// cancel
626+
retriableStream.cancel(Status.CANCELLED);
627+
// drain transparent retry
628+
drainer.runDueTasks();
629+
inOrder.verify(retriableStreamRecorder).postCommit();
630+
631+
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
632+
ArgumentCaptor.forClass(ClientStreamListener.class);
633+
verify(mockStream2).start(sublistenerCaptor2.capture());
634+
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
635+
verify(mockStream2).cancel(statusCaptor.capture());
636+
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
637+
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
638+
sublistenerCaptor2.getValue().closed(statusCaptor.getValue(), PROCESSED, new Metadata());
639+
verify(masterListener).closed(same(Status.CANCELLED), same(PROCESSED), any(Metadata.class));
640+
}
641+
601642
@Test
602643
public void unretriableClosed_cancel() {
603644
ClientStream mockStream1 = mock(ClientStream.class);

0 commit comments

Comments
 (0)