@@ -442,9 +442,9 @@ public void testIndicesDeleted() throws Exception {
442
442
assertFalse (client ().admin ().indices ().prepareExists (idxName ).get ().isExists ());
443
443
}
444
444
445
- public void testRestartPrimaryNodeWhileIndexing () throws Exception {
445
+ public void testRestartNodeWhileIndexing () throws Exception {
446
446
startCluster (3 );
447
- String index = "failover_index " ;
447
+ String index = "restart_while_indexing " ;
448
448
assertAcked (client ().admin ().indices ().prepareCreate (index ).setSettings (Settings .builder ()
449
449
.put (IndexMetaData .SETTING_NUMBER_OF_SHARDS , 1 ).put (IndexMetaData .SETTING_NUMBER_OF_REPLICAS , between (1 , 2 ))));
450
450
AtomicBoolean stopped = new AtomicBoolean ();
@@ -454,37 +454,29 @@ public void testRestartPrimaryNodeWhileIndexing() throws Exception {
454
454
for (int i = 0 ; i < threads .length ; i ++) {
455
455
threads [i ] = new Thread (() -> {
456
456
while (stopped .get () == false && docID .get () < 5000 ) {
457
- int docId = frequently () ? docID . getAndIncrement () : between ( 0 , docID .getAndIncrement ());
457
+ String id = Integer . toString ( docID .incrementAndGet ());
458
458
try {
459
- IndexResponse response = client ().prepareIndex (index , "_doc" , Integer .toString (docId ))
460
- .setSource ("{\" f\" :" + docId + "}" , XContentType .JSON ).get ();
459
+ IndexResponse response = client ().prepareIndex (index , "_doc" , id ).setSource ("{}" , XContentType .JSON ).get ();
461
460
assertThat (response .getResult (), isOneOf (CREATED , UPDATED ));
462
461
logger .info ("--> index id={} seq_no={}" , response .getId (), response .getSeqNo ());
463
462
ackedDocs .add (response .getId ());
464
463
} catch (ElasticsearchException ignore ) {
465
- logger .info ("--> fail to index id={}" , docId );
464
+ logger .info ("--> fail to index id={}" , id );
466
465
}
467
466
}
468
467
});
469
468
threads [i ].start ();
470
469
}
471
470
ensureGreen (index );
472
471
assertBusy (() -> assertThat (docID .get (), greaterThanOrEqualTo (100 )));
473
- ClusterState clusterState = internalCluster ().clusterService ().state ();
474
- for (ShardRouting shardRouting : clusterState .routingTable ().allShards (index )) {
475
- if (shardRouting .primary ()) {
476
- String nodeName = clusterState .nodes ().get (shardRouting .currentNodeId ()).getName ();
477
- internalCluster ().restartNode (nodeName , new InternalTestCluster .RestartCallback ());
478
- break ;
479
- }
480
- }
472
+ internalCluster ().restartRandomDataNode (new InternalTestCluster .RestartCallback ());
481
473
ensureGreen (index );
482
474
assertBusy (() -> assertThat (docID .get (), greaterThanOrEqualTo (200 )));
483
475
stopped .set (true );
484
476
for (Thread thread : threads ) {
485
477
thread .join ();
486
478
}
487
- clusterState = internalCluster ().clusterService ().state ();
479
+ ClusterState clusterState = internalCluster ().clusterService ().state ();
488
480
for (ShardRouting shardRouting : clusterState .routingTable ().allShards (index )) {
489
481
String nodeName = clusterState .nodes ().get (shardRouting .currentNodeId ()).getName ();
490
482
IndicesService indicesService = internalCluster ().getInstance (IndicesService .class , nodeName );
0 commit comments