36
36
import java .util .Collections ;
37
37
import java .util .List ;
38
38
import java .util .Locale ;
39
+ import java .util .concurrent .CountDownLatch ;
39
40
import java .util .concurrent .TimeUnit ;
40
41
import java .util .concurrent .atomic .AtomicBoolean ;
41
42
import java .util .concurrent .atomic .AtomicInteger ;
@@ -499,6 +500,9 @@ public void testPauseAndResumeWithMultipleAutoFollowPatterns() throws Exception
499
500
500
501
final AtomicBoolean running = new AtomicBoolean (true );
501
502
final AtomicInteger leaderIndices = new AtomicInteger (0 );
503
+ final CountDownLatch latchThree = new CountDownLatch (3 );
504
+ final CountDownLatch latchSix = new CountDownLatch (6 );
505
+ final CountDownLatch latchNine = new CountDownLatch (9 );
502
506
503
507
// start creating new indices on the remote cluster
504
508
final Thread createNewLeaderIndicesThread = new Thread (() -> {
@@ -513,6 +517,9 @@ public void testPauseAndResumeWithMultipleAutoFollowPatterns() throws Exception
513
517
} else {
514
518
Thread .sleep (200L );
515
519
}
520
+ latchThree .countDown ();
521
+ latchSix .countDown ();
522
+ latchNine .countDown ();
516
523
} catch (Exception e ) {
517
524
throw new AssertionError (e );
518
525
}
@@ -521,23 +528,29 @@ public void testPauseAndResumeWithMultipleAutoFollowPatterns() throws Exception
521
528
createNewLeaderIndicesThread .start ();
522
529
523
530
// wait for 3 leader indices to be created on the remote cluster
524
- assertBusy (() -> assertThat (leaderIndices .get (), greaterThanOrEqualTo (3 )));
525
- assertBusy (() -> assertThat (getAutoFollowStats ().getNumberOfSuccessfulFollowIndices (), greaterThanOrEqualTo (3L )));
531
+ latchThree .await (30L , TimeUnit .SECONDS );
532
+ assertThat (leaderIndices .get (), greaterThanOrEqualTo (3 ));
533
+ assertBusy (() -> assertThat (getAutoFollowStats ().getNumberOfSuccessfulFollowIndices (), greaterThanOrEqualTo (3L )),
534
+ 30L , TimeUnit .SECONDS );
526
535
527
536
// now pause some random patterns
528
537
pausedAutoFollowerPatterns .forEach (this ::pauseAutoFollowPattern );
529
538
assertBusy (() -> autoFollowPatterns .forEach (pattern ->
530
- assertThat (getAutoFollowPattern (pattern ).isActive (), equalTo (pausedAutoFollowerPatterns .contains (pattern ) == false ))));
539
+ assertThat (getAutoFollowPattern (pattern ).isActive (), equalTo (pausedAutoFollowerPatterns .contains (pattern ) == false ))),
540
+ 30L , TimeUnit .SECONDS );
531
541
532
542
// wait for more leader indices to be created on the remote cluster
533
- assertBusy (() -> assertThat (leaderIndices .get (), greaterThanOrEqualTo (6 )));
543
+ latchSix .await (30L , TimeUnit .SECONDS );
544
+ assertThat (leaderIndices .get (), greaterThanOrEqualTo (6 ));
534
545
535
546
// resume auto follow patterns
536
547
pausedAutoFollowerPatterns .forEach (this ::resumeAutoFollowPattern );
537
- assertBusy (() -> autoFollowPatterns .forEach (pattern -> assertTrue (getAutoFollowPattern (pattern ).isActive ())));
548
+ assertBusy (() -> autoFollowPatterns .forEach (pattern -> assertTrue (getAutoFollowPattern (pattern ).isActive ())),
549
+ 30L , TimeUnit .SECONDS );
538
550
539
551
// wait for more leader indices to be created on the remote cluster
540
- assertBusy (() -> assertThat (leaderIndices .get (), greaterThanOrEqualTo (9 )));
552
+ latchNine .await (30L , TimeUnit .SECONDS );
553
+ assertThat (leaderIndices .get (), greaterThanOrEqualTo (9 ));
541
554
assertBusy (() -> assertThat (getAutoFollowStats ().getNumberOfSuccessfulFollowIndices (), greaterThanOrEqualTo (9L )),
542
555
30L , TimeUnit .SECONDS );
543
556
0 commit comments