19
19
20
20
package org .elasticsearch .indices .recovery ;
21
21
22
- import org .apache .lucene .util .Constants ;
23
22
import org .elasticsearch .action .DocWriteResponse ;
24
23
import org .elasticsearch .action .admin .cluster .health .ClusterHealthResponse ;
24
+ import org .elasticsearch .action .admin .cluster .node .hotthreads .NodeHotThreads ;
25
25
import org .elasticsearch .action .delete .DeleteResponse ;
26
26
import org .elasticsearch .action .index .IndexResponse ;
27
27
import org .elasticsearch .cluster .ClusterState ;
28
28
import org .elasticsearch .cluster .node .DiscoveryNode ;
29
29
import org .elasticsearch .cluster .routing .allocation .command .MoveAllocationCommand ;
30
30
import org .elasticsearch .common .Priority ;
31
31
import org .elasticsearch .common .settings .Settings ;
32
+ import org .elasticsearch .common .unit .TimeValue ;
32
33
import org .elasticsearch .index .query .QueryBuilders ;
33
34
import org .elasticsearch .test .ESIntegTestCase ;
34
35
import org .elasticsearch .test .hamcrest .ElasticsearchAssertions ;
35
36
36
37
import java .util .concurrent .atomic .AtomicBoolean ;
37
38
import java .util .concurrent .atomic .AtomicInteger ;
39
+ import java .util .stream .Collectors ;
38
40
39
- import static org .hamcrest .Matchers .equalTo ;
40
41
41
42
@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST )
42
43
public class IndexPrimaryRelocationIT extends ESIntegTestCase {
43
44
44
45
private static final int RELOCATION_COUNT = 15 ;
45
46
46
47
public void testPrimaryRelocationWhileIndexing () throws Exception {
47
- assumeFalse ("https://github.com/elastic/elasticsearch/issues/46526" , Constants .MAC_OS_X );
48
48
internalCluster ().ensureAtLeastNumDataNodes (randomIntBetween (2 , 3 ));
49
49
client ().admin ().indices ().prepareCreate ("test" )
50
50
.setSettings (Settings .builder ().put ("index.number_of_shards" , 1 ).put ("index.number_of_replicas" , 0 ))
@@ -56,7 +56,7 @@ public void testPrimaryRelocationWhileIndexing() throws Exception {
56
56
Thread indexingThread = new Thread () {
57
57
@ Override
58
58
public void run () {
59
- while (finished .get () == false ) {
59
+ while (finished .get () == false && numAutoGenDocs . get () < 10_000 ) {
60
60
IndexResponse indexResponse = client ().prepareIndex ("test" , "type" , "id" ).setSource ("field" , "value" ).get ();
61
61
assertEquals (DocWriteResponse .Result .CREATED , indexResponse .getResult ());
62
62
DeleteResponse deleteResponse = client ().prepareDelete ("test" , "type" , "id" ).get ();
@@ -82,8 +82,18 @@ public void run() {
82
82
.add (new MoveAllocationCommand ("test" , 0 , relocationSource .getId (), relocationTarget .getId ()))
83
83
.execute ().actionGet ();
84
84
ClusterHealthResponse clusterHealthResponse = client ().admin ().cluster ().prepareHealth ()
85
+ .setTimeout (TimeValue .timeValueSeconds (60 ))
85
86
.setWaitForEvents (Priority .LANGUID ).setWaitForNoRelocatingShards (true ).execute ().actionGet ();
86
- assertThat (clusterHealthResponse .isTimedOut (), equalTo (false ));
87
+ if (clusterHealthResponse .isTimedOut ()) {
88
+ final String hotThreads = client ().admin ().cluster ().prepareNodesHotThreads ().setIgnoreIdleThreads (false ).get ().getNodes ()
89
+ .stream ().map (NodeHotThreads ::getHotThreads ).collect (Collectors .joining ("\n " ));
90
+ final ClusterState clusterState = client ().admin ().cluster ().prepareState ().get ().getState ();
91
+ logger .info ("timed out for waiting for relocation iteration [{}] \n cluster state {} \n hot threads {}" ,
92
+ i , clusterState , hotThreads );
93
+ finished .set (true );
94
+ indexingThread .join ();
95
+ throw new AssertionError ("timed out waiting for relocation iteration [" + i + "] " );
96
+ }
87
97
logger .info ("--> [iteration {}] relocation complete" , i );
88
98
relocationSource = relocationTarget ;
89
99
// indexing process aborted early, no need for more relocations as test has already failed
0 commit comments