Skip to content

Commit 87a3b15

Browse files
authored
Fix retry race condition that can lead to double decrementing inFlightSubStreams and so miss calling closed (#11026) (#11035)
1 parent 09c8616 commit 87a3b15

File tree

2 files changed

+37
-20
lines changed

2 files changed

+37
-20
lines changed

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,10 @@ public void uncaughtException(Thread t, Throwable e) {
149149
this.throttle = throttle;
150150
}
151151

152-
@SuppressWarnings("GuardedBy")
152+
@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153153
@Nullable // null if already committed
154154
@CheckReturnValue
155155
private Runnable commit(final Substream winningSubstream) {
156-
157156
synchronized (lock) {
158157
if (state.winningSubstream != null) {
159158
return null;
@@ -165,10 +164,9 @@ private Runnable commit(final Substream winningSubstream) {
165164
// subtract the share of this RPC from channelBufferUsed.
166165
channelBufferUsed.addAndGet(-perRpcBufferUsed);
167166

167+
final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
168168
final Future<?> retryFuture;
169169
if (scheduledRetry != null) {
170-
// TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead
171-
// found: 'this.lock'
172170
retryFuture = scheduledRetry.markCancelled();
173171
scheduledRetry = null;
174172
} else {
@@ -177,8 +175,6 @@ private Runnable commit(final Substream winningSubstream) {
177175
// cancel the scheduled hedging if it is scheduled prior to the commitment
178176
final Future<?> hedgingFuture;
179177
if (scheduledHedging != null) {
180-
// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
181-
// found: 'this.lock'
182178
hedgingFuture = scheduledHedging.markCancelled();
183179
scheduledHedging = null;
184180
} else {
@@ -195,11 +191,22 @@ public void run() {
195191
}
196192
}
197193
if (retryFuture != null) {
198-
boolean cancelled = retryFuture.cancel(false);
199-
if (cancelled) {
200-
inFlightSubStreams.decrementAndGet();
194+
retryFuture.cancel(false);
195+
if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
196+
assert savedCloseMasterListenerReason != null;
197+
listenerSerializeExecutor.execute(
198+
new Runnable() {
199+
@Override
200+
public void run() {
201+
isClosed = true;
202+
masterListener.closed(savedCloseMasterListenerReason.status,
203+
savedCloseMasterListenerReason.progress,
204+
savedCloseMasterListenerReason.metadata);
205+
}
206+
});
201207
}
202208
}
209+
203210
if (hedgingFuture != null) {
204211
hedgingFuture.cancel(false);
205212
}
@@ -418,7 +425,7 @@ public final void start(ClientStreamListener listener) {
418425
drain(substream);
419426
}
420427

421-
@SuppressWarnings("GuardedBy")
428+
@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
422429
private void pushbackHedging(@Nullable Integer delayMillis) {
423430
if (delayMillis == null) {
424431
return;
@@ -437,8 +444,6 @@ private void pushbackHedging(@Nullable Integer delayMillis) {
437444
return;
438445
}
439446

440-
// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
441-
// found: 'this.lock'
442447
futureToBeCancelled = scheduledHedging.markCancelled();
443448
scheduledHedging = future = new FutureCanceller(lock);
444449
}
@@ -472,16 +477,13 @@ public void run() {
472477
}
473478
callExecutor.execute(
474479
new Runnable() {
475-
@SuppressWarnings("GuardedBy")
480+
@SuppressWarnings("GuardedBy") //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
476481
@Override
477482
public void run() {
478483
boolean cancelled = false;
479484
FutureCanceller future = null;
480485

481486
synchronized (lock) {
482-
// TODO(b/145386688): This access should be guarded by
483-
// 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found:
484-
// 'RetriableStream.this.lock'
485487
if (scheduledHedgingRef.isCancelled()) {
486488
cancelled = true;
487489
} else {
@@ -813,13 +815,11 @@ private boolean hasPotentialHedging(State state) {
813815
&& !state.hedgingFrozen;
814816
}
815817

816-
@SuppressWarnings("GuardedBy")
818+
@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
817819
private void freezeHedging() {
818820
Future<?> futureToBeCancelled = null;
819821
synchronized (lock) {
820822
if (scheduledHedging != null) {
821-
// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
822-
// found: 'this.lock'
823823
futureToBeCancelled = scheduledHedging.markCancelled();
824824
scheduledHedging = null;
825825
}
@@ -1002,9 +1002,19 @@ public void run() {
10021002
synchronized (lock) {
10031003
scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
10041004
}
1005+
10051006
class RetryBackoffRunnable implements Runnable {
10061007
@Override
1008+
@SuppressWarnings("FutureReturnValueIgnored")
10071009
public void run() {
1010+
synchronized (scheduledRetryCopy.lock) {
1011+
if (scheduledRetryCopy.isCancelled()) {
1012+
return;
1013+
} else {
1014+
scheduledRetryCopy.markCancelled();
1015+
}
1016+
}
1017+
10081018
callExecutor.execute(
10091019
new Runnable() {
10101020
@Override
@@ -1566,11 +1576,16 @@ private static final class FutureCanceller {
15661576
}
15671577

15681578
void setFuture(Future<?> future) {
1579+
boolean wasCancelled;
15691580
synchronized (lock) {
1570-
if (!cancelled) {
1581+
wasCancelled = cancelled;
1582+
if (!wasCancelled) {
15711583
this.future = future;
15721584
}
15731585
}
1586+
if (wasCancelled) {
1587+
future.cancel(false);
1588+
}
15741589
}
15751590

15761591
@GuardedBy("lock")

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,7 @@ public void retry_unretriableClosed_cancel() {
705705
// cancel
706706
retriableStream.cancel(Status.CANCELLED);
707707
inOrder.verify(retriableStreamRecorder, never()).postCommit();
708+
verify(masterListener, times(1)).closed(any(), any(), any());
708709
}
709710

710711
@Test
@@ -733,6 +734,7 @@ public void retry_cancelWhileBackoff() {
733734

734735
verifyNoMoreInteractions(mockStream1);
735736
verifyNoMoreInteractions(mockStream2);
737+
verify(masterListener, times(1)).closed(any(), any(), any());
736738
}
737739

738740
@Test

0 commit comments

Comments
 (0)