27
27
import org .elasticsearch .action .admin .cluster .health .ClusterHealthResponse ;
28
28
import org .elasticsearch .action .admin .cluster .reroute .ClusterRerouteResponse ;
29
29
import org .elasticsearch .action .index .IndexRequestBuilder ;
30
+ import org .elasticsearch .action .index .IndexResponse ;
30
31
import org .elasticsearch .action .search .SearchResponse ;
31
32
import org .elasticsearch .action .support .WriteRequest ;
32
33
import org .elasticsearch .client .Client ;
@@ -552,7 +553,7 @@ public void testRelocateWhileWaitingForRefresh() {
552
553
assertThat (client ().prepareSearch ("test" ).setSize (0 ).execute ().actionGet ().getHits ().getTotalHits (), equalTo (20L ));
553
554
}
554
555
555
- public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh () {
556
+ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh () throws Exception {
556
557
logger .info ("--> starting [node1] ..." );
557
558
final String node1 = internalCluster ().startNode ();
558
559
@@ -570,9 +571,11 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() {
570
571
logger .info ("--> flush so we have an actual index" );
571
572
client ().admin ().indices ().prepareFlush ().execute ().actionGet ();
572
573
logger .info ("--> index more docs so we have something in the translog" );
574
+ final List <ActionFuture <IndexResponse >> pendingIndexResponses = new ArrayList <>();
573
575
for (int i = 10 ; i < 20 ; i ++) {
574
- client ().prepareIndex ("test" , "type" , Integer .toString (i )).setRefreshPolicy (WriteRequest .RefreshPolicy .WAIT_UNTIL )
575
- .setSource ("field" , "value" + i ).execute ();
576
+ pendingIndexResponses .add (client ().prepareIndex ("test" , "type" , Integer .toString (i ))
577
+ .setRefreshPolicy (WriteRequest .RefreshPolicy .WAIT_UNTIL )
578
+ .setSource ("field" , "value" + i ).execute ());
576
579
}
577
580
578
581
logger .info ("--> start another node" );
@@ -587,16 +590,21 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() {
587
590
.execute ();
588
591
logger .info ("--> index 100 docs while relocating" );
589
592
for (int i = 20 ; i < 120 ; i ++) {
590
- client ().prepareIndex ("test" , "type" , Integer .toString (i )).setRefreshPolicy (WriteRequest .RefreshPolicy .WAIT_UNTIL )
591
- .setSource ("field" , "value" + i ).execute ();
593
+ pendingIndexResponses .add (client ().prepareIndex ("test" , "type" , Integer .toString (i ))
594
+ .setRefreshPolicy (WriteRequest .RefreshPolicy .WAIT_UNTIL )
595
+ .setSource ("field" , "value" + i ).execute ());
592
596
}
593
597
relocationListener .actionGet ();
594
598
clusterHealthResponse = client ().admin ().cluster ().prepareHealth ().setWaitForEvents (Priority .LANGUID )
595
599
.setWaitForNoRelocatingShards (true ).setTimeout (ACCEPTABLE_RELOCATION_TIME ).execute ().actionGet ();
596
600
assertThat (clusterHealthResponse .isTimedOut (), equalTo (false ));
597
601
598
602
logger .info ("--> verifying count" );
599
- client ().admin ().indices ().prepareRefresh ().execute ().actionGet ();
603
+ assertBusy (() -> {
604
+ client ().admin ().indices ().prepareRefresh ().execute ().actionGet ();
605
+ assertTrue (pendingIndexResponses .stream ().allMatch (ActionFuture ::isDone ));
606
+ }, 1 , TimeUnit .MINUTES );
607
+
600
608
assertThat (client ().prepareSearch ("test" ).setSize (0 ).execute ().actionGet ().getHits ().getTotalHits (), equalTo (120L ));
601
609
}
602
610
0 commit comments