20
20
package org .elasticsearch .cluster ;
21
21
22
22
import org .elasticsearch .action .admin .cluster .health .ClusterHealthResponse ;
23
+ import org .elasticsearch .action .admin .cluster .state .ClusterStateResponse ;
24
+ import org .elasticsearch .action .admin .indices .create .CreateIndexResponse ;
23
25
import org .elasticsearch .cluster .health .ClusterHealthStatus ;
26
+ import org .elasticsearch .cluster .metadata .IndexMetaData ;
27
+ import org .elasticsearch .cluster .routing .ShardRouting ;
28
+ import org .elasticsearch .cluster .routing .allocation .decider .ThrottlingAllocationDecider ;
29
+ import org .elasticsearch .cluster .service .ClusterService ;
24
30
import org .elasticsearch .common .Priority ;
31
+ import org .elasticsearch .common .settings .Settings ;
32
+ import org .elasticsearch .common .unit .TimeValue ;
25
33
import org .elasticsearch .test .ESIntegTestCase ;
34
+ import org .elasticsearch .threadpool .ThreadPool ;
35
+ import org .junit .AfterClass ;
36
+ import org .junit .BeforeClass ;
37
+
38
+ import java .util .concurrent .TimeUnit ;
26
39
27
40
import static org .hamcrest .Matchers .equalTo ;
28
41
29
42
public class ClusterHealthIT extends ESIntegTestCase {
43
+
44
+ private static ThreadPool threadPool ;
45
+
46
+ @ BeforeClass
47
+ public static void customBeforeClass () throws Exception {
48
+ threadPool = new ThreadPool ("ClusterHealthIT" );
49
+ }
50
+
51
+ @ AfterClass
52
+ public static void customAfterClass () throws Exception {
53
+ ThreadPool .terminate (threadPool , 10L , TimeUnit .SECONDS );
54
+ threadPool = null ;
55
+ }
56
+
30
57
public void testSimpleLocalHealth () {
31
58
createIndex ("test" );
32
59
ensureGreen (); // master should thing it's green now.
33
60
34
61
for (String node : internalCluster ().getNodeNames ()) {
35
62
// a very high time out, which should never fire due to the local flag
36
- ClusterHealthResponse health = client (node ).admin ().cluster ().prepareHealth ().setLocal (true ).setWaitForEvents (Priority .LANGUID ).setTimeout ("30s" ).get ("10s" );
63
+ ClusterHealthResponse health = client (node ).admin ().cluster ().prepareHealth ()
64
+ .setLocal (true )
65
+ .setWaitForEvents (Priority .LANGUID )
66
+ .setTimeout ("30s" )
67
+ .get ("10s" );
37
68
assertThat (health .getStatus (), equalTo (ClusterHealthStatus .GREEN ));
38
69
assertThat (health .isTimedOut (), equalTo (false ));
39
70
}
40
71
}
41
72
42
73
public void testHealth () {
43
74
logger .info ("--> running cluster health on an index that does not exists" );
44
- ClusterHealthResponse healthResponse = client ().admin ().cluster ().prepareHealth ("test1" ).setWaitForYellowStatus ().setTimeout ("1s" ).execute ().actionGet ();
75
+ ClusterHealthResponse healthResponse = client ().admin ().cluster ().prepareHealth ("test1" )
76
+ .setWaitForYellowStatus ()
77
+ .setTimeout ("1s" )
78
+ .execute ()
79
+ .actionGet ();
45
80
assertThat (healthResponse .isTimedOut (), equalTo (true ));
46
81
assertThat (healthResponse .getStatus (), equalTo (ClusterHealthStatus .RED ));
47
82
assertThat (healthResponse .getIndices ().isEmpty (), equalTo (true ));
@@ -62,10 +97,91 @@ public void testHealth() {
62
97
assertThat (healthResponse .getIndices ().get ("test1" ).getStatus (), equalTo (ClusterHealthStatus .GREEN ));
63
98
64
99
logger .info ("--> running cluster health on an index that does exists and an index that doesn't exists" );
65
- healthResponse = client ().admin ().cluster ().prepareHealth ("test1" , "test2" ).setWaitForYellowStatus ().setTimeout ("1s" ).execute ().actionGet ();
100
+ healthResponse = client ().admin ().cluster ().prepareHealth ("test1" , "test2" )
101
+ .setWaitForYellowStatus ()
102
+ .setTimeout ("1s" )
103
+ .execute ()
104
+ .actionGet ();
66
105
assertThat (healthResponse .isTimedOut (), equalTo (true ));
67
106
assertThat (healthResponse .getStatus (), equalTo (ClusterHealthStatus .RED ));
68
107
assertThat (healthResponse .getIndices ().get ("test1" ).getStatus (), equalTo (ClusterHealthStatus .GREEN ));
69
108
assertThat (healthResponse .getIndices ().size (), equalTo (1 ));
70
109
}
71
- }
110
+
111
+ public void testHealthOnIndexCreation () throws Exception {
112
+ final int numNodes = randomIntBetween (2 , 5 );
113
+ logger .info ("--> starting {} nodes" , numNodes );
114
+ final Settings nodeSettings = Settings .builder ().put (
115
+ ThrottlingAllocationDecider .CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING .getKey (), numNodes
116
+ ).build ();
117
+ internalCluster ().ensureAtLeastNumDataNodes (numNodes , nodeSettings );
118
+
119
+ ClusterHealthResponse healthResponse = client ().admin ().cluster ()
120
+ .prepareHealth ()
121
+ .setWaitForGreenStatus ()
122
+ .setTimeout ("10s" )
123
+ .execute ()
124
+ .actionGet ();
125
+ assertThat (healthResponse .isTimedOut (), equalTo (false ));
126
+ assertThat (healthResponse .getStatus (), equalTo (ClusterHealthStatus .GREEN ));
127
+
128
+ // first, register a cluster state observer that checks cluster health
129
+ // upon index creation in the cluster state
130
+ final String masterNode = internalCluster ().getMasterName ();
131
+ final String indexName = "test-idx" ;
132
+ final ClusterService clusterService = internalCluster ().clusterService (masterNode );
133
+ final ClusterStateObserver observer = new ClusterStateObserver (clusterService , logger , threadPool .getThreadContext ());
134
+ final ClusterStateObserver .ChangePredicate validationPredicate = new ClusterStateObserver .ValidationPredicate () {
135
+ @ Override
136
+ protected boolean validate (ClusterState newState ) {
137
+ return newState .status () == ClusterState .ClusterStateStatus .APPLIED
138
+ && newState .metaData ().hasIndex (indexName );
139
+ }
140
+ };
141
+
142
+ final ClusterStateObserver .Listener stateListener = new ClusterStateObserver .Listener () {
143
+ @ Override
144
+ public void onNewClusterState (ClusterState clusterState ) {
145
+ // make sure we have inactive primaries
146
+ // see if we can terminate observing on the cluster state
147
+ final ClusterStateResponse csResponse = client ().admin ().cluster ().prepareState ().execute ().actionGet ();
148
+ boolean inactivePrimaries = false ;
149
+ for (ShardRouting shardRouting : csResponse .getState ().routingTable ().allShards (indexName )) {
150
+ if (shardRouting .primary () == false ) {
151
+ continue ;
152
+ }
153
+ if (shardRouting .active () == false ) {
154
+ inactivePrimaries = true ;
155
+ break ;
156
+ }
157
+ }
158
+ assertTrue (inactivePrimaries );
159
+ // verify cluster health is YELLOW (even though primaries are still being allocated)
160
+ final ClusterHealthResponse response = client ().admin ().cluster ().prepareHealth (indexName ).get ();
161
+ assertThat (response .getStatus (), equalTo (ClusterHealthStatus .YELLOW ));
162
+ }
163
+ @ Override
164
+ public void onClusterServiceClose () {
165
+ fail ("cluster service should not have closed" );
166
+ }
167
+ @ Override
168
+ public void onTimeout (TimeValue timeout ) {
169
+ fail ("timeout on cluster state observer" );
170
+ }
171
+ };
172
+ observer .waitForNextChange (stateListener , validationPredicate , TimeValue .timeValueSeconds (30L ));
173
+ final Settings settings = Settings .builder ().put (IndexMetaData .SETTING_NUMBER_OF_SHARDS , numNodes )
174
+ .put (IndexMetaData .SETTING_NUMBER_OF_REPLICAS , numNodes - 1 )
175
+ .build ();
176
+ CreateIndexResponse response = client ().admin ().indices ().prepareCreate (indexName )
177
+ .setSettings (settings )
178
+ .execute ()
179
+ .actionGet ();
180
+ assertTrue (response .isAcknowledged ());
181
+
182
+ // now, make sure we eventually get to the green state,
183
+ // we have at least two nodes so this should happen
184
+ ensureGreen (indexName );
185
+ }
186
+
187
+ }
0 commit comments