21
21
22
22
import org .elasticsearch .action .DocWriteResponse ;
23
23
import org .elasticsearch .action .admin .cluster .health .ClusterHealthResponse ;
24
+ import org .elasticsearch .action .admin .cluster .node .hotthreads .NodeHotThreads ;
24
25
import org .elasticsearch .action .delete .DeleteResponse ;
25
26
import org .elasticsearch .action .index .IndexResponse ;
26
27
import org .elasticsearch .cluster .ClusterState ;
27
28
import org .elasticsearch .cluster .node .DiscoveryNode ;
28
29
import org .elasticsearch .cluster .routing .allocation .command .MoveAllocationCommand ;
29
30
import org .elasticsearch .common .Priority ;
30
31
import org .elasticsearch .common .settings .Settings ;
32
+ import org .elasticsearch .common .unit .TimeValue ;
31
33
import org .elasticsearch .index .query .QueryBuilders ;
32
34
import org .elasticsearch .test .ESIntegTestCase ;
33
35
import org .elasticsearch .test .hamcrest .ElasticsearchAssertions ;
34
36
35
37
import java .util .concurrent .atomic .AtomicBoolean ;
36
38
import java .util .concurrent .atomic .AtomicInteger ;
39
+ import java .util .stream .Collectors ;
37
40
38
- import static org .hamcrest .Matchers .equalTo ;
39
41
40
42
@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST )
41
43
public class IndexPrimaryRelocationIT extends ESIntegTestCase {
@@ -54,7 +56,7 @@ public void testPrimaryRelocationWhileIndexing() throws Exception {
54
56
Thread indexingThread = new Thread () {
55
57
@ Override
56
58
public void run () {
57
- while (finished .get () == false ) {
59
+ while (finished .get () == false && numAutoGenDocs . get () < 10_000 ) {
58
60
IndexResponse indexResponse = client ().prepareIndex ("test" , "type" , "id" ).setSource ("field" , "value" ).get ();
59
61
assertEquals (DocWriteResponse .Result .CREATED , indexResponse .getResult ());
60
62
DeleteResponse deleteResponse = client ().prepareDelete ("test" , "type" , "id" ).get ();
@@ -80,8 +82,18 @@ public void run() {
80
82
.add (new MoveAllocationCommand ("test" , 0 , relocationSource .getId (), relocationTarget .getId ()))
81
83
.execute ().actionGet ();
82
84
ClusterHealthResponse clusterHealthResponse = client ().admin ().cluster ().prepareHealth ()
85
+ .setTimeout (TimeValue .timeValueSeconds (60 ))
83
86
.setWaitForEvents (Priority .LANGUID ).setWaitForNoRelocatingShards (true ).execute ().actionGet ();
84
- assertThat (clusterHealthResponse .isTimedOut (), equalTo (false ));
87
+ if (clusterHealthResponse .isTimedOut ()) {
88
+ final String hotThreads = client ().admin ().cluster ().prepareNodesHotThreads ().setIgnoreIdleThreads (false ).get ().getNodes ()
89
+ .stream ().map (NodeHotThreads ::getHotThreads ).collect (Collectors .joining ("\n " ));
90
+ final ClusterState clusterState = client ().admin ().cluster ().prepareState ().get ().getState ();
91
+ logger .info ("timed out for waiting for relocation iteration [{}] \n cluster state {} \n hot threads {}" ,
92
+ i , clusterState , hotThreads );
93
+ finished .set (true );
94
+ indexingThread .join ();
95
+ throw new AssertionError ("timed out waiting for relocation iteration [" + i + "] " );
96
+ }
85
97
logger .info ("--> [iteration {}] relocation complete" , i );
86
98
relocationSource = relocationTarget ;
87
99
// indexing process aborted early, no need for more relocations as test has already failed
0 commit comments