@@ -108,7 +108,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {
108
108
final long retainingSequenceNumber = randomLongBetween (0 , Long .MAX_VALUE );
109
109
final String source = randomAlphaOfLength (8 );
110
110
final CountDownLatch latch = new CountDownLatch (1 );
111
- final ActionListener <ReplicationResponse > listener = ActionListener . wrap ( r -> latch . countDown (), e -> fail ( e . toString ()) );
111
+ final ActionListener <ReplicationResponse > listener = countDownLatchListener ( latch );
112
112
// simulate a peer recovery which locks the soft deletes policy on the primary
113
113
final Closeable retentionLock = randomBoolean () ? primary .acquireRetentionLock () : () -> {};
114
114
currentRetentionLeases .put (id , primary .addRetentionLease (id , retainingSequenceNumber , source , listener ));
@@ -155,7 +155,7 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception {
155
155
final long retainingSequenceNumber = randomLongBetween (0 , Long .MAX_VALUE );
156
156
final String source = randomAlphaOfLength (8 );
157
157
final CountDownLatch latch = new CountDownLatch (1 );
158
- final ActionListener <ReplicationResponse > listener = ActionListener . wrap ( r -> latch . countDown (), e -> fail ( e . toString ()) );
158
+ final ActionListener <ReplicationResponse > listener = countDownLatchListener ( latch );
159
159
// simulate a peer recovery which locks the soft deletes policy on the primary
160
160
final Closeable retentionLock = randomBoolean () ? primary .acquireRetentionLock () : () -> {};
161
161
currentRetentionLeases .put (id , primary .addRetentionLease (id , retainingSequenceNumber , source , listener ));
@@ -166,7 +166,7 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception {
166
166
for (int i = 0 ; i < length ; i ++) {
167
167
final String id = randomFrom (currentRetentionLeases .keySet ());
168
168
final CountDownLatch latch = new CountDownLatch (1 );
169
- primary .removeRetentionLease (id , ActionListener . wrap ( r -> latch . countDown (), e -> fail ( e . toString ()) ));
169
+ primary .removeRetentionLease (id , countDownLatchListener ( latch ));
170
170
// simulate a peer recovery which locks the soft deletes policy on the primary
171
171
final Closeable retentionLock = randomBoolean () ? primary .acquireRetentionLock () : () -> {};
172
172
currentRetentionLeases .remove (id );
@@ -228,7 +228,7 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception {
228
228
final long retainingSequenceNumber = randomLongBetween (0 , Long .MAX_VALUE );
229
229
final String source = randomAlphaOfLength (8 );
230
230
final CountDownLatch latch = new CountDownLatch (1 );
231
- final ActionListener <ReplicationResponse > listener = ActionListener . wrap ( r -> latch . countDown (), e -> fail ( e . toString ()) );
231
+ final ActionListener <ReplicationResponse > listener = countDownLatchListener ( latch );
232
232
final RetentionLease currentRetentionLease = primary .addRetentionLease (id , retainingSequenceNumber , source , listener );
233
233
final long now = System .nanoTime ();
234
234
latch .await ();
@@ -390,7 +390,7 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
390
390
final long retainingSequenceNumber = randomLongBetween (0 , Long .MAX_VALUE );
391
391
final String source = randomAlphaOfLength (8 );
392
392
final CountDownLatch latch = new CountDownLatch (1 );
393
- final ActionListener <ReplicationResponse > listener = ActionListener . wrap ( r -> latch . countDown (), e -> fail ( e . toString ()) );
393
+ final ActionListener <ReplicationResponse > listener = countDownLatchListener ( latch );
394
394
currentRetentionLeases .put (id , primary .addRetentionLease (id , retainingSequenceNumber , source , listener ));
395
395
latch .await ();
396
396
currentRetentionLeases .put (id , primary .renewRetentionLease (id , retainingSequenceNumber , source ));
@@ -479,7 +479,7 @@ public void testCanRenewRetentionLeaseUnderBlock() throws InterruptedException {
479
479
*/
480
480
assertBusy (() -> assertThat (primary .loadRetentionLeases ().leases (), contains (retentionLease .get ())));
481
481
} catch (final Exception e ) {
482
- fail ( e . toString () );
482
+ failWithException ( e );
483
483
}
484
484
});
485
485
@@ -516,7 +516,7 @@ private void runUnderBlockTest(
516
516
517
517
final String source = randomAlphaOfLength (8 );
518
518
final CountDownLatch latch = new CountDownLatch (1 );
519
- final ActionListener <ReplicationResponse > listener = ActionListener . wrap ( r -> latch . countDown (), e -> fail ( e . toString ()) );
519
+ final ActionListener <ReplicationResponse > listener = countDownLatchListener ( latch );
520
520
primary .addRetentionLease (idForInitialRetentionLease , initialRetainingSequenceNumber , source , listener );
521
521
latch .await ();
522
522
@@ -545,7 +545,7 @@ public void onResponse(final ReplicationResponse replicationResponse) {
545
545
546
546
@ Override
547
547
public void onFailure (final Exception e ) {
548
- fail ( e . toString () );
548
+ failWithException ( e );
549
549
}
550
550
551
551
});
@@ -598,7 +598,7 @@ public void testCanRenewRetentionLeaseWithoutWaitingForShards() throws Interrupt
598
598
*/
599
599
assertBusy (() -> assertThat (primary .loadRetentionLeases ().leases (), contains (retentionLease .get ())));
600
600
} catch (final Exception e ) {
601
- fail ( e . toString () );
601
+ failWithException ( e );
602
602
}
603
603
});
604
604
@@ -637,7 +637,7 @@ private void runWaitForShardsTest(
637
637
638
638
final String source = randomAlphaOfLength (8 );
639
639
final CountDownLatch latch = new CountDownLatch (1 );
640
- final ActionListener <ReplicationResponse > listener = ActionListener . wrap ( r -> latch . countDown (), e -> fail ( e . toString ()) );
640
+ final ActionListener <ReplicationResponse > listener = countDownLatchListener ( latch );
641
641
primary .addRetentionLease (idForInitialRetentionLease , initialRetainingSequenceNumber , source , listener );
642
642
latch .await ();
643
643
@@ -665,7 +665,7 @@ public void onResponse(final ReplicationResponse replicationResponse) {
665
665
666
666
@ Override
667
667
public void onFailure (final Exception e ) {
668
- fail ( e . toString () );
668
+ failWithException ( e );
669
669
}
670
670
671
671
});
@@ -674,4 +674,12 @@ public void onFailure(final Exception e) {
674
674
afterSync .accept (primary );
675
675
}
676
676
677
+ private static void failWithException (Exception e ) {
678
+ throw new AssertionError ("unexpected" , e );
679
+ }
680
+
681
+ private static ActionListener <ReplicationResponse > countDownLatchListener (CountDownLatch latch ) {
682
+ return ActionListener .wrap (r -> latch .countDown (), RetentionLeaseIT ::failWithException );
683
+ }
684
+
677
685
}
0 commit comments