@@ -149,11 +149,10 @@ public void uncaughtException(Thread t, Throwable e) {
149
149
this .throttle = throttle ;
150
150
}
151
151
152
- @ SuppressWarnings ("GuardedBy" )
152
+ @ SuppressWarnings ("GuardedBy" ) // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153
153
@ Nullable // null if already committed
154
154
@ CheckReturnValue
155
155
private Runnable commit (final Substream winningSubstream ) {
156
-
157
156
synchronized (lock ) {
158
157
if (state .winningSubstream != null ) {
159
158
return null ;
@@ -165,10 +164,9 @@ private Runnable commit(final Substream winningSubstream) {
165
164
// subtract the share of this RPC from channelBufferUsed.
166
165
channelBufferUsed .addAndGet (-perRpcBufferUsed );
167
166
167
+ final boolean wasCancelled = (scheduledRetry != null ) ? scheduledRetry .isCancelled () : false ;
168
168
final Future <?> retryFuture ;
169
169
if (scheduledRetry != null ) {
170
- // TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead
171
- // found: 'this.lock'
172
170
retryFuture = scheduledRetry .markCancelled ();
173
171
scheduledRetry = null ;
174
172
} else {
@@ -177,8 +175,6 @@ private Runnable commit(final Substream winningSubstream) {
177
175
// cancel the scheduled hedging if it is scheduled prior to the commitment
178
176
final Future <?> hedgingFuture ;
179
177
if (scheduledHedging != null ) {
180
- // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
181
- // found: 'this.lock'
182
178
hedgingFuture = scheduledHedging .markCancelled ();
183
179
scheduledHedging = null ;
184
180
} else {
@@ -195,11 +191,22 @@ public void run() {
195
191
}
196
192
}
197
193
if (retryFuture != null ) {
198
- boolean cancelled = retryFuture .cancel (false );
199
- if (cancelled ) {
200
- inFlightSubStreams .decrementAndGet ();
194
+ retryFuture .cancel (false );
195
+ if (!wasCancelled && inFlightSubStreams .decrementAndGet () == Integer .MIN_VALUE ) {
196
+ assert savedCloseMasterListenerReason != null ;
197
+ listenerSerializeExecutor .execute (
198
+ new Runnable () {
199
+ @ Override
200
+ public void run () {
201
+ isClosed = true ;
202
+ masterListener .closed (savedCloseMasterListenerReason .status ,
203
+ savedCloseMasterListenerReason .progress ,
204
+ savedCloseMasterListenerReason .metadata );
205
+ }
206
+ });
201
207
}
202
208
}
209
+
203
210
if (hedgingFuture != null ) {
204
211
hedgingFuture .cancel (false );
205
212
}
@@ -418,7 +425,7 @@ public final void start(ClientStreamListener listener) {
418
425
drain (substream );
419
426
}
420
427
421
- @ SuppressWarnings ("GuardedBy" )
428
+ @ SuppressWarnings ("GuardedBy" ) // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
422
429
private void pushbackHedging (@ Nullable Integer delayMillis ) {
423
430
if (delayMillis == null ) {
424
431
return ;
@@ -437,8 +444,6 @@ private void pushbackHedging(@Nullable Integer delayMillis) {
437
444
return ;
438
445
}
439
446
440
- // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
441
- // found: 'this.lock'
442
447
futureToBeCancelled = scheduledHedging .markCancelled ();
443
448
scheduledHedging = future = new FutureCanceller (lock );
444
449
}
@@ -472,16 +477,13 @@ public void run() {
472
477
}
473
478
callExecutor .execute (
474
479
new Runnable () {
475
- @ SuppressWarnings ("GuardedBy" )
480
+ @ SuppressWarnings ("GuardedBy" ) //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
476
481
@ Override
477
482
public void run () {
478
483
boolean cancelled = false ;
479
484
FutureCanceller future = null ;
480
485
481
486
synchronized (lock ) {
482
- // TODO(b/145386688): This access should be guarded by
483
- // 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found:
484
- // 'RetriableStream.this.lock'
485
487
if (scheduledHedgingRef .isCancelled ()) {
486
488
cancelled = true ;
487
489
} else {
@@ -813,13 +815,11 @@ private boolean hasPotentialHedging(State state) {
813
815
&& !state .hedgingFrozen ;
814
816
}
815
817
816
- @ SuppressWarnings ("GuardedBy" )
818
+ @ SuppressWarnings ("GuardedBy" ) // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
817
819
private void freezeHedging () {
818
820
Future <?> futureToBeCancelled = null ;
819
821
synchronized (lock ) {
820
822
if (scheduledHedging != null ) {
821
- // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
822
- // found: 'this.lock'
823
823
futureToBeCancelled = scheduledHedging .markCancelled ();
824
824
scheduledHedging = null ;
825
825
}
@@ -1002,9 +1002,19 @@ public void run() {
1002
1002
synchronized (lock ) {
1003
1003
scheduledRetry = scheduledRetryCopy = new FutureCanceller (lock );
1004
1004
}
1005
+
1005
1006
class RetryBackoffRunnable implements Runnable {
1006
1007
@ Override
1008
+ @ SuppressWarnings ("FutureReturnValueIgnored" )
1007
1009
public void run () {
1010
+ synchronized (scheduledRetryCopy .lock ) {
1011
+ if (scheduledRetryCopy .isCancelled ()) {
1012
+ return ;
1013
+ } else {
1014
+ scheduledRetryCopy .markCancelled ();
1015
+ }
1016
+ }
1017
+
1008
1018
callExecutor .execute (
1009
1019
new Runnable () {
1010
1020
@ Override
@@ -1566,11 +1576,16 @@ private static final class FutureCanceller {
1566
1576
}
1567
1577
1568
1578
void setFuture (Future <?> future ) {
1579
+ boolean wasCancelled ;
1569
1580
synchronized (lock ) {
1570
- if (!cancelled ) {
1581
+ wasCancelled = cancelled ;
1582
+ if (!wasCancelled ) {
1571
1583
this .future = future ;
1572
1584
}
1573
1585
}
1586
+ if (wasCancelled ) {
1587
+ future .cancel (false );
1588
+ }
1574
1589
}
1575
1590
1576
1591
@ GuardedBy ("lock" )
0 commit comments