Skip to content

Commit 3322700

Browse files
authored
Fix retry race condition that can lead to double decrementing inFlightSubStreams and so miss calling closed (#11026) (#11034)
1 parent eb17e8c commit 3322700

File tree

2 files changed

+37
-17
lines changed

2 files changed

+37
-17
lines changed

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,10 @@ public void uncaughtException(Thread t, Throwable e) {
149149
this.throttle = throttle;
150150
}
151151

152-
@SuppressWarnings("GuardedBy")
152+
@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153153
@Nullable // null if already committed
154154
@CheckReturnValue
155155
private Runnable commit(final Substream winningSubstream) {
156-
157156
synchronized (lock) {
158157
if (state.winningSubstream != null) {
159158
return null;
@@ -165,10 +164,9 @@ private Runnable commit(final Substream winningSubstream) {
165164
// subtract the share of this RPC from channelBufferUsed.
166165
channelBufferUsed.addAndGet(-perRpcBufferUsed);
167166

167+
final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
168168
final Future<?> retryFuture;
169169
if (scheduledRetry != null) {
170-
// TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead
171-
// found: 'this.lock'
172170
retryFuture = scheduledRetry.markCancelled();
173171
scheduledRetry = null;
174172
} else {
@@ -177,8 +175,6 @@ private Runnable commit(final Substream winningSubstream) {
177175
// cancel the scheduled hedging if it is scheduled prior to the commitment
178176
final Future<?> hedgingFuture;
179177
if (scheduledHedging != null) {
180-
// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
181-
// found: 'this.lock'
182178
hedgingFuture = scheduledHedging.markCancelled();
183179
scheduledHedging = null;
184180
} else {
@@ -196,7 +192,21 @@ public void run() {
196192
}
197193
if (retryFuture != null) {
198194
retryFuture.cancel(false);
195+
if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
196+
assert savedCloseMasterListenerReason != null;
197+
listenerSerializeExecutor.execute(
198+
new Runnable() {
199+
@Override
200+
public void run() {
201+
isClosed = true;
202+
masterListener.closed(savedCloseMasterListenerReason.status,
203+
savedCloseMasterListenerReason.progress,
204+
savedCloseMasterListenerReason.metadata);
205+
}
206+
});
207+
}
199208
}
209+
200210
if (hedgingFuture != null) {
201211
hedgingFuture.cancel(false);
202212
}
@@ -415,7 +425,7 @@ public final void start(ClientStreamListener listener) {
415425
drain(substream);
416426
}
417427

418-
@SuppressWarnings("GuardedBy")
428+
@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
419429
private void pushbackHedging(@Nullable Integer delayMillis) {
420430
if (delayMillis == null) {
421431
return;
@@ -434,8 +444,6 @@ private void pushbackHedging(@Nullable Integer delayMillis) {
434444
return;
435445
}
436446

437-
// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
438-
// found: 'this.lock'
439447
futureToBeCancelled = scheduledHedging.markCancelled();
440448
scheduledHedging = future = new FutureCanceller(lock);
441449
}
@@ -469,16 +477,13 @@ public void run() {
469477
}
470478
callExecutor.execute(
471479
new Runnable() {
472-
@SuppressWarnings("GuardedBy")
480+
@SuppressWarnings("GuardedBy") //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
473481
@Override
474482
public void run() {
475483
boolean cancelled = false;
476484
FutureCanceller future = null;
477485

478486
synchronized (lock) {
479-
// TODO(b/145386688): This access should be guarded by
480-
// 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found:
481-
// 'RetriableStream.this.lock'
482487
if (scheduledHedgingRef.isCancelled()) {
483488
cancelled = true;
484489
} else {
@@ -810,13 +815,11 @@ private boolean hasPotentialHedging(State state) {
810815
&& !state.hedgingFrozen;
811816
}
812817

813-
@SuppressWarnings("GuardedBy")
818+
@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
814819
private void freezeHedging() {
815820
Future<?> futureToBeCancelled = null;
816821
synchronized (lock) {
817822
if (scheduledHedging != null) {
818-
// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
819-
// found: 'this.lock'
820823
futureToBeCancelled = scheduledHedging.markCancelled();
821824
scheduledHedging = null;
822825
}
@@ -999,9 +1002,19 @@ public void run() {
9991002
synchronized (lock) {
10001003
scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
10011004
}
1005+
10021006
class RetryBackoffRunnable implements Runnable {
10031007
@Override
1008+
@SuppressWarnings("FutureReturnValueIgnored")
10041009
public void run() {
1010+
synchronized (scheduledRetryCopy.lock) {
1011+
if (scheduledRetryCopy.isCancelled()) {
1012+
return;
1013+
} else {
1014+
scheduledRetryCopy.markCancelled();
1015+
}
1016+
}
1017+
10051018
callExecutor.execute(
10061019
new Runnable() {
10071020
@Override
@@ -1563,11 +1576,16 @@ private static final class FutureCanceller {
15631576
}
15641577

15651578
void setFuture(Future<?> future) {
1579+
boolean wasCancelled;
15661580
synchronized (lock) {
1567-
if (!cancelled) {
1581+
wasCancelled = cancelled;
1582+
if (!wasCancelled) {
15681583
this.future = future;
15691584
}
15701585
}
1586+
if (wasCancelled) {
1587+
future.cancel(false);
1588+
}
15711589
}
15721590

15731591
@GuardedBy("lock")

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,7 @@ public void retry_unretriableClosed_cancel() {
705705
// cancel
706706
retriableStream.cancel(Status.CANCELLED);
707707
inOrder.verify(retriableStreamRecorder, never()).postCommit();
708+
verify(masterListener, times(1)).closed(any(), any(), any());
708709
}
709710

710711
@Test
@@ -733,6 +734,7 @@ public void retry_cancelWhileBackoff() {
733734

734735
verifyNoMoreInteractions(mockStream1);
735736
verifyNoMoreInteractions(mockStream2);
737+
verify(masterListener, times(1)).closed(any(), any(), any());
736738
}
737739

738740
@Test

0 commit comments

Comments
 (0)