@@ -147,6 +147,17 @@ public double nextDouble() {
147
147
private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter ();
148
148
private final FakeClock fakeClock = new FakeClock ();
149
149
150
+ private static long calculateBackoffWithRetries (int retryCount ) {
151
+ // Calculate the exponential backoff delay with jitter
152
+ double exponent = retryCount > 0 ? Math .pow (BACKOFF_MULTIPLIER , retryCount ) : 1 ;
153
+ long delay = (long ) (INITIAL_BACKOFF_IN_SECONDS * exponent );
154
+ return RetriableStream .intervalWithJitter (delay );
155
+ }
156
+
157
+ private static long calculateMaxBackoff () {
158
+ return RetriableStream .intervalWithJitter (MAX_BACKOFF_IN_SECONDS );
159
+ }
160
+
150
161
private final class RecordedRetriableStream extends RetriableStream <String > {
151
162
RecordedRetriableStream (MethodDescriptor <String , ?> method , Metadata headers ,
152
163
ChannelBufferMeter channelBufferUsed , long perRpcBufferLimit , long channelBufferLimit ,
@@ -307,7 +318,7 @@ public Void answer(InvocationOnMock in) {
307
318
retriableStream .sendMessage ("msg1 during backoff1" );
308
319
retriableStream .sendMessage ("msg2 during backoff1" );
309
320
310
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ) - 1L , TimeUnit .SECONDS );
321
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ) - 1L , TimeUnit .SECONDS );
311
322
inOrder .verifyNoMoreInteractions ();
312
323
assertEquals (1 , fakeClock .numPendingTasks ());
313
324
fakeClock .forwardTime (1L , TimeUnit .SECONDS );
@@ -364,9 +375,7 @@ public Void answer(InvocationOnMock in) {
364
375
retriableStream .sendMessage ("msg2 during backoff2" );
365
376
retriableStream .sendMessage ("msg3 during backoff2" );
366
377
367
- fakeClock .forwardTime (
368
- (long ) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM ) - 1L ,
369
- TimeUnit .SECONDS );
378
+ fakeClock .forwardTime (calculateBackoffWithRetries (1 ) - 1L , TimeUnit .SECONDS );
370
379
inOrder .verifyNoMoreInteractions ();
371
380
assertEquals (1 , fakeClock .numPendingTasks ());
372
381
fakeClock .forwardTime (1L , TimeUnit .SECONDS );
@@ -459,7 +468,7 @@ public void retry_headersRead_cancel() {
459
468
sublistenerCaptor1 .getValue ().closed (
460
469
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
461
470
assertEquals (1 , fakeClock .numPendingTasks ());
462
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
471
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
463
472
464
473
ArgumentCaptor <ClientStreamListener > sublistenerCaptor2 =
465
474
ArgumentCaptor .forClass (ClientStreamListener .class );
@@ -518,7 +527,7 @@ public void retry_headersRead_closed() {
518
527
doReturn (mockStream2 ).when (retriableStreamRecorder ).newSubstream (1 );
519
528
sublistenerCaptor1 .getValue ().closed (
520
529
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
521
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
530
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
522
531
523
532
ArgumentCaptor <ClientStreamListener > sublistenerCaptor2 =
524
533
ArgumentCaptor .forClass (ClientStreamListener .class );
@@ -584,7 +593,7 @@ public void retry_cancel_closed() {
584
593
doReturn (mockStream2 ).when (retriableStreamRecorder ).newSubstream (1 );
585
594
sublistenerCaptor1 .getValue ().closed (
586
595
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
587
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
596
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
588
597
589
598
ArgumentCaptor <ClientStreamListener > sublistenerCaptor2 =
590
599
ArgumentCaptor .forClass (ClientStreamListener .class );
@@ -687,7 +696,7 @@ public void retry_unretriableClosed_cancel() {
687
696
doReturn (mockStream2 ).when (retriableStreamRecorder ).newSubstream (1 );
688
697
sublistenerCaptor1 .getValue ().closed (
689
698
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
690
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
699
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
691
700
692
701
ArgumentCaptor <ClientStreamListener > sublistenerCaptor2 =
693
702
ArgumentCaptor .forClass (ClientStreamListener .class );
@@ -821,7 +830,7 @@ public boolean isReady() {
821
830
// send more requests during backoff
822
831
retriableStream .request (789 );
823
832
824
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
833
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
825
834
826
835
inOrder .verify (mockStream2 ).start (sublistenerCaptor2 .get ());
827
836
inOrder .verify (mockStream2 ).request (3 );
@@ -875,7 +884,7 @@ public void request(int numMessages) {
875
884
doReturn (mockStream2 ).when (retriableStreamRecorder ).newSubstream (1 );
876
885
sublistenerCaptor1 .getValue ().closed (
877
886
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
878
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
887
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
879
888
880
889
inOrder .verify (mockStream2 ).start (sublistenerCaptor2 .capture ());
881
890
inOrder .verify (mockStream2 ).request (3 );
@@ -920,7 +929,7 @@ public void start(ClientStreamListener listener) {
920
929
doReturn (mockStream2 ).when (retriableStreamRecorder ).newSubstream (1 );
921
930
sublistenerCaptor1 .getValue ().closed (
922
931
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
923
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
932
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
924
933
925
934
inOrder .verify (mockStream2 ).start (sublistenerCaptor2 .capture ());
926
935
inOrder .verify (retriableStreamRecorder ).postCommit ();
@@ -1028,7 +1037,7 @@ public boolean isReady() {
1028
1037
retriableStream .request (789 );
1029
1038
readiness .add (retriableStream .isReady ()); // expected false b/c in backoff
1030
1039
1031
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
1040
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
1032
1041
1033
1042
verify (mockStream2 ).start (any (ClientStreamListener .class ));
1034
1043
readiness .add (retriableStream .isReady ()); // expected true
@@ -1110,7 +1119,7 @@ public void addPrevRetryAttemptsToRespHeaders() {
1110
1119
doReturn (mockStream2 ).when (retriableStreamRecorder ).newSubstream (1 );
1111
1120
sublistenerCaptor1 .getValue ().closed (
1112
1121
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
1113
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
1122
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
1114
1123
1115
1124
ArgumentCaptor <ClientStreamListener > sublistenerCaptor2 =
1116
1125
ArgumentCaptor .forClass (ClientStreamListener .class );
@@ -1160,13 +1169,12 @@ public void start(ClientStreamListener listener) {
1160
1169
listener1 .closed (
1161
1170
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
1162
1171
assertEquals (1 , fakeClock .numPendingTasks ());
1163
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
1172
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
1164
1173
assertEquals (1 , fakeClock .numPendingTasks ());
1165
1174
1166
1175
// send requests during backoff
1167
1176
retriableStream .request (3 );
1168
- fakeClock .forwardTime (
1169
- (long ) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM ), TimeUnit .SECONDS );
1177
+ fakeClock .forwardTime (calculateBackoffWithRetries (1 ), TimeUnit .SECONDS );
1170
1178
1171
1179
retriableStream .request (1 );
1172
1180
verify (mockStream1 , never ()).request (anyInt ());
@@ -1207,7 +1215,7 @@ public void start(ClientStreamListener listener) {
1207
1215
// retry
1208
1216
listener1 .closed (
1209
1217
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
1210
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
1218
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
1211
1219
1212
1220
verify (mockStream2 ).start (any (ClientStreamListener .class ));
1213
1221
verify (retriableStreamRecorder ).postCommit ();
@@ -1260,7 +1268,7 @@ public void perRpcBufferLimitExceededDuringBackoff() {
1260
1268
bufferSizeTracer .outboundWireSize (2 );
1261
1269
verify (retriableStreamRecorder , never ()).postCommit ();
1262
1270
1263
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
1271
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
1264
1272
verify (mockStream2 ).start (any (ClientStreamListener .class ));
1265
1273
verify (mockStream2 ).isReady ();
1266
1274
@@ -1332,7 +1340,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
1332
1340
sublistenerCaptor1 .getValue ().closed (
1333
1341
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
1334
1342
assertEquals (1 , fakeClock .numPendingTasks ());
1335
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ) - 1L , TimeUnit .SECONDS );
1343
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ) - 1L , TimeUnit .SECONDS );
1336
1344
assertEquals (1 , fakeClock .numPendingTasks ());
1337
1345
fakeClock .forwardTime (1L , TimeUnit .SECONDS );
1338
1346
assertEquals (0 , fakeClock .numPendingTasks ());
@@ -1347,9 +1355,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
1347
1355
sublistenerCaptor2 .getValue ().closed (
1348
1356
Status .fromCode (RETRIABLE_STATUS_CODE_2 ), PROCESSED , new Metadata ());
1349
1357
assertEquals (1 , fakeClock .numPendingTasks ());
1350
- fakeClock .forwardTime (
1351
- (long ) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM ) - 1L ,
1352
- TimeUnit .SECONDS );
1358
+ fakeClock .forwardTime (calculateBackoffWithRetries (1 ) - 1L , TimeUnit .SECONDS );
1353
1359
assertEquals (1 , fakeClock .numPendingTasks ());
1354
1360
fakeClock .forwardTime (1L , TimeUnit .SECONDS );
1355
1361
assertEquals (0 , fakeClock .numPendingTasks ());
@@ -1364,10 +1370,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
1364
1370
sublistenerCaptor3 .getValue ().closed (
1365
1371
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
1366
1372
assertEquals (1 , fakeClock .numPendingTasks ());
1367
- fakeClock .forwardTime (
1368
- (long ) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM )
1369
- - 1L ,
1370
- TimeUnit .SECONDS );
1373
+ fakeClock .forwardTime (calculateBackoffWithRetries (2 ) - 1L , TimeUnit .SECONDS );
1371
1374
assertEquals (1 , fakeClock .numPendingTasks ());
1372
1375
fakeClock .forwardTime (1L , TimeUnit .SECONDS );
1373
1376
assertEquals (0 , fakeClock .numPendingTasks ());
@@ -1382,7 +1385,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
1382
1385
sublistenerCaptor4 .getValue ().closed (
1383
1386
Status .fromCode (RETRIABLE_STATUS_CODE_2 ), PROCESSED , new Metadata ());
1384
1387
assertEquals (1 , fakeClock .numPendingTasks ());
1385
- fakeClock .forwardTime (( long ) ( MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM ) - 1L , TimeUnit .SECONDS );
1388
+ fakeClock .forwardTime (calculateMaxBackoff ( ) - 1L , TimeUnit .SECONDS );
1386
1389
assertEquals (1 , fakeClock .numPendingTasks ());
1387
1390
fakeClock .forwardTime (1L , TimeUnit .SECONDS );
1388
1391
assertEquals (0 , fakeClock .numPendingTasks ());
@@ -1397,7 +1400,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() {
1397
1400
sublistenerCaptor5 .getValue ().closed (
1398
1401
Status .fromCode (RETRIABLE_STATUS_CODE_2 ), PROCESSED , new Metadata ());
1399
1402
assertEquals (1 , fakeClock .numPendingTasks ());
1400
- fakeClock .forwardTime (( long ) ( MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM ) - 1L , TimeUnit .SECONDS );
1403
+ fakeClock .forwardTime (calculateMaxBackoff ( ) - 1L , TimeUnit .SECONDS );
1401
1404
assertEquals (1 , fakeClock .numPendingTasks ());
1402
1405
fakeClock .forwardTime (1L , TimeUnit .SECONDS );
1403
1406
assertEquals (0 , fakeClock .numPendingTasks ());
@@ -1480,7 +1483,7 @@ public void pushback() {
1480
1483
sublistenerCaptor3 .getValue ().closed (
1481
1484
Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
1482
1485
assertEquals (1 , fakeClock .numPendingTasks ());
1483
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ) - 1L , TimeUnit .SECONDS );
1486
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ) - 1L , TimeUnit .SECONDS );
1484
1487
assertEquals (1 , fakeClock .numPendingTasks ());
1485
1488
fakeClock .forwardTime (1L , TimeUnit .SECONDS );
1486
1489
assertEquals (0 , fakeClock .numPendingTasks ());
@@ -1495,9 +1498,7 @@ public void pushback() {
1495
1498
sublistenerCaptor4 .getValue ().closed (
1496
1499
Status .fromCode (RETRIABLE_STATUS_CODE_2 ), PROCESSED , new Metadata ());
1497
1500
assertEquals (1 , fakeClock .numPendingTasks ());
1498
- fakeClock .forwardTime (
1499
- (long ) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM ) - 1L ,
1500
- TimeUnit .SECONDS );
1501
+ fakeClock .forwardTime (calculateBackoffWithRetries (1 ) - 1L , TimeUnit .SECONDS );
1501
1502
assertEquals (1 , fakeClock .numPendingTasks ());
1502
1503
fakeClock .forwardTime (1L , TimeUnit .SECONDS );
1503
1504
assertEquals (0 , fakeClock .numPendingTasks ());
@@ -1512,10 +1513,7 @@ public void pushback() {
1512
1513
sublistenerCaptor5 .getValue ().closed (
1513
1514
Status .fromCode (RETRIABLE_STATUS_CODE_2 ), PROCESSED , new Metadata ());
1514
1515
assertEquals (1 , fakeClock .numPendingTasks ());
1515
- fakeClock .forwardTime (
1516
- (long ) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM )
1517
- - 1L ,
1518
- TimeUnit .SECONDS );
1516
+ fakeClock .forwardTime (calculateBackoffWithRetries (2 ) - 1L , TimeUnit .SECONDS );
1519
1517
assertEquals (1 , fakeClock .numPendingTasks ());
1520
1518
fakeClock .forwardTime (1L , TimeUnit .SECONDS );
1521
1519
assertEquals (0 , fakeClock .numPendingTasks ());
@@ -1804,7 +1802,7 @@ public void transparentRetry_onlyOnceOnRefused() {
1804
1802
.closed (Status .fromCode (RETRIABLE_STATUS_CODE_1 ), REFUSED , new Metadata ());
1805
1803
1806
1804
assertEquals (1 , fakeClock .numPendingTasks ());
1807
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
1805
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
1808
1806
inOrder .verify (retriableStreamRecorder ).newSubstream (1 );
1809
1807
ArgumentCaptor <ClientStreamListener > sublistenerCaptor3 =
1810
1808
ArgumentCaptor .forClass (ClientStreamListener .class );
@@ -1907,7 +1905,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() {
1907
1905
.closed (Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
1908
1906
1909
1907
assertEquals (1 , fakeClock .numPendingTasks ());
1910
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
1908
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
1911
1909
inOrder .verify (retriableStreamRecorder ).newSubstream (1 );
1912
1910
ArgumentCaptor <ClientStreamListener > sublistenerCaptor2 =
1913
1911
ArgumentCaptor .forClass (ClientStreamListener .class );
@@ -1923,8 +1921,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() {
1923
1921
.closed (Status .fromCode (RETRIABLE_STATUS_CODE_1 ), REFUSED , new Metadata ());
1924
1922
1925
1923
assertEquals (1 , fakeClock .numPendingTasks ());
1926
- fakeClock .forwardTime (
1927
- (long ) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM ), TimeUnit .SECONDS );
1924
+ fakeClock .forwardTime (calculateBackoffWithRetries (1 ), TimeUnit .SECONDS );
1928
1925
inOrder .verify (retriableStreamRecorder ).newSubstream (2 );
1929
1926
ArgumentCaptor <ClientStreamListener > sublistenerCaptor3 =
1930
1927
ArgumentCaptor .forClass (ClientStreamListener .class );
@@ -1960,7 +1957,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() {
1960
1957
.closed (Status .fromCode (RETRIABLE_STATUS_CODE_1 ), PROCESSED , new Metadata ());
1961
1958
1962
1959
assertEquals (1 , fakeClock .numPendingTasks ());
1963
- fakeClock .forwardTime (( long ) ( INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM ), TimeUnit .SECONDS );
1960
+ fakeClock .forwardTime (calculateBackoffWithRetries ( 0 ), TimeUnit .SECONDS );
1964
1961
inOrder .verify (retriableStreamRecorder ).newSubstream (1 );
1965
1962
ArgumentCaptor <ClientStreamListener > sublistenerCaptor2 =
1966
1963
ArgumentCaptor .forClass (ClientStreamListener .class );
0 commit comments