@@ -108,6 +108,7 @@ public class RetryTest {
108
108
@ SuppressWarnings ("unchecked" )
109
109
private ClientCall .Listener <Integer > mockCallListener =
110
110
mock (ClientCall .Listener .class , delegatesTo (testCallListener ));
111
+ private java .util .concurrent .ScheduledFuture <?> activeFuture = null ;
111
112
112
113
private CountDownLatch backoffLatch = new CountDownLatch (1 );
113
114
private final EventLoopGroup group = new DefaultEventLoopGroup () {
@@ -118,7 +119,7 @@ public ScheduledFuture<?> schedule(
118
119
if (!command .getClass ().getName ().contains ("RetryBackoffRunnable" )) {
119
120
return super .schedule (command , delay , unit );
120
121
}
121
- fakeClock .getScheduledExecutorService ().schedule (
122
+ activeFuture = fakeClock .getScheduledExecutorService ().schedule (
122
123
new Runnable () {
123
124
@ Override
124
125
public void run () {
@@ -248,15 +249,22 @@ private void assertInboundWireSizeRecorded(long length) throws Exception {
248
249
249
250
private void assertRpcStatusRecorded (
250
251
Status .Code code , long roundtripLatencyMs , long outboundMessages ) throws Exception {
252
+ assertRpcStatusRecorded (code , roundtripLatencyMs , 0 , outboundMessages );
253
+ }
254
+
255
+ private void assertRpcStatusRecorded (
256
+ Status .Code code , long roundtripLatencyMs , long toleranceMs , long outboundMessages )
257
+ throws Exception {
251
258
MetricsRecord record = clientStatsRecorder .pollRecord (7 , SECONDS );
252
259
assertNotNull (record );
253
260
TagValue statusTag = record .tags .get (RpcMeasureConstants .GRPC_CLIENT_STATUS );
254
261
assertNotNull (statusTag );
255
262
assertThat (statusTag .asString ()).isEqualTo (code .toString ());
256
263
assertThat (record .getMetricAsLongOrFail (DeprecatedCensusConstants .RPC_CLIENT_FINISHED_COUNT ))
257
264
.isEqualTo (1 );
258
- assertThat (record .getMetricAsLongOrFail (RpcMeasureConstants .GRPC_CLIENT_ROUNDTRIP_LATENCY ))
259
- .isEqualTo (roundtripLatencyMs );
265
+ long roundtripLatency =
266
+ record .getMetricAsLongOrFail (RpcMeasureConstants .GRPC_CLIENT_ROUNDTRIP_LATENCY );
267
+ assertThat (Math .abs (roundtripLatency - roundtripLatencyMs )).isAtMost (toleranceMs );
260
268
assertThat (record .getMetricAsLongOrFail (RpcMeasureConstants .GRPC_CLIENT_SENT_MESSAGES_PER_RPC ))
261
269
.isEqualTo (outboundMessages );
262
270
}
@@ -303,10 +311,12 @@ public void retryUntilBufferLimitExceeded() throws Exception {
303
311
call .sendMessage (message );
304
312
305
313
// let attempt fail
306
- testCallListener .clear ();
314
+ testCallListener .reset ();
307
315
serverCall .close (
308
316
Status .UNAVAILABLE .withDescription ("2nd attempt failed" ),
309
317
new Metadata ());
318
+ fakeClock .forwardTime (1 , SECONDS );
319
+ activeFuture .get (1 , SECONDS ); // Make sure the close is done.
310
320
// no more retry
311
321
testCallListener .verifyDescription ("2nd attempt failed" , 5000 );
312
322
}
@@ -420,9 +430,12 @@ public void streamClosed(Status status) {
420
430
call .cancel ("Cancelled before commit" , null );
421
431
// Let the netty substream listener be closed.
422
432
streamClosedLatch .countDown ();
433
+ assertNotNull ("No activeFuture" , activeFuture );
434
+ fakeClock .forwardTime (1 , SECONDS );
435
+ activeFuture .get (1 , SECONDS );
423
436
// The call listener is closed.
424
437
verify (mockCallListener , timeout (5000 )).onClose (any (Status .class ), any (Metadata .class ));
425
- assertRpcStatusRecorded (Code .CANCELLED , 17_000 , 1 );
438
+ assertRpcStatusRecorded (Code .CANCELLED , 18_000 , 1 );
426
439
assertRetryStatsRecorded (1 , 0 , 0 );
427
440
}
428
441
@@ -551,7 +564,7 @@ public void onClose(Status status, Metadata trailers) {
551
564
closeLatch .countDown ();
552
565
}
553
566
554
- void clear () {
567
+ void reset () {
555
568
status = null ;
556
569
closeLatch = new CountDownLatch (1 );
557
570
}
0 commit comments