24
24
import org .apache .logging .log4j .Logger ;
25
25
import org .elasticsearch .action .admin .cluster .health .ClusterHealthResponse ;
26
26
import org .elasticsearch .cluster .ClusterState ;
27
+ import org .elasticsearch .cluster .metadata .IndexMetaData ;
28
+ import org .elasticsearch .cluster .metadata .IndexMetaData .State ;
27
29
import org .elasticsearch .cluster .routing .IndexRoutingTable ;
28
30
import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
29
31
import org .elasticsearch .cluster .routing .ShardRouting ;
33
35
import org .elasticsearch .test .ESIntegTestCase ;
34
36
import org .elasticsearch .test .ESIntegTestCase .ClusterScope ;
35
37
38
+ import java .util .Arrays ;
36
39
import java .util .List ;
37
40
import java .util .concurrent .TimeUnit ;
38
41
42
+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
39
43
import static org .hamcrest .Matchers .anyOf ;
40
44
import static org .hamcrest .Matchers .equalTo ;
41
45
@@ -54,7 +58,6 @@ public void testSimpleAwareness() throws Exception {
54
58
.put ("cluster.routing.allocation.awareness.attributes" , "rack_id" )
55
59
.build ();
56
60
57
-
58
61
logger .info ("--> starting 2 nodes on the same rack" );
59
62
internalCluster ().startNodes (2 , Settings .builder ().put (commonSettings ).put ("node.attr.rack_id" , "rack_1" ).build ());
60
63
@@ -68,21 +71,33 @@ public void testSimpleAwareness() throws Exception {
68
71
69
72
ensureGreen ();
70
73
74
+ final List <String > indicesToClose = randomSubsetOf (Arrays .asList ("test1" , "test2" ));
75
+ indicesToClose .forEach (indexToClose -> assertAcked (client ().admin ().indices ().prepareClose (indexToClose ).get ()));
76
+
71
77
logger .info ("--> starting 1 node on a different rack" );
72
78
final String node3 = internalCluster ().startNode (Settings .builder ().put (commonSettings ).put ("node.attr.rack_id" , "rack_2" ).build ());
73
79
74
80
// On slow machines the initial relocation might be delayed
75
81
assertThat (awaitBusy (
76
82
() -> {
77
83
logger .info ("--> waiting for no relocation" );
78
- ClusterHealthResponse clusterHealth = client ().admin ().cluster ().prepareHealth ().setWaitForEvents (Priority .LANGUID )
79
- .setWaitForGreenStatus ().setWaitForNodes ("3" ).setWaitForNoRelocatingShards (true ).get ();
84
+ ClusterHealthResponse clusterHealth = client ().admin ().cluster ().prepareHealth ()
85
+ .setIndices ("test1" , "test2" )
86
+ .setWaitForEvents (Priority .LANGUID )
87
+ .setWaitForGreenStatus ()
88
+ .setWaitForNodes ("3" )
89
+ .setWaitForNoRelocatingShards (true )
90
+ .get ();
80
91
if (clusterHealth .isTimedOut ()) {
81
92
return false ;
82
93
}
83
94
84
95
logger .info ("--> checking current state" );
85
96
ClusterState clusterState = client ().admin ().cluster ().prepareState ().execute ().actionGet ().getState ();
97
+ // check that closed indices are effectively closed
98
+ if (indicesToClose .stream ().anyMatch (index -> clusterState .metaData ().index (index ).getState () != State .CLOSE )) {
99
+ return false ;
100
+ }
86
101
// verify that we have all the primaries on node3
87
102
ObjectIntHashMap <String > counts = new ObjectIntHashMap <>();
88
103
for (IndexRoutingTable indexRoutingTable : clusterState .routingTable ()) {
@@ -99,7 +114,7 @@ public void testSimpleAwareness() throws Exception {
99
114
), equalTo (true ));
100
115
}
101
116
102
- public void testAwarenessZones () throws Exception {
117
+ public void testAwarenessZones () {
103
118
Settings commonSettings = Settings .builder ()
104
119
.put (AwarenessAllocationDecider .CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING .getKey () + "zone.values" , "a,b" )
105
120
.put (AwarenessAllocationDecider .CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING .getKey (), "zone" )
@@ -121,12 +136,20 @@ public void testAwarenessZones() throws Exception {
121
136
ClusterHealthResponse health = client ().admin ().cluster ().prepareHealth ().setWaitForNodes ("4" ).execute ().actionGet ();
122
137
assertThat (health .isTimedOut (), equalTo (false ));
123
138
124
- client ().admin ().indices ().prepareCreate ("test" )
125
- .setSettings (Settings .builder ().put ("index.number_of_shards" , 5 )
126
- .put ("index.number_of_replicas" , 1 )).execute ().actionGet ();
139
+ createIndex ("test" , Settings .builder ()
140
+ .put (IndexMetaData .SETTING_NUMBER_OF_SHARDS , 5 )
141
+ .put (IndexMetaData .SETTING_NUMBER_OF_REPLICAS , 1 )
142
+ .build ());
143
+
144
+ if (randomBoolean ()) {
145
+ assertAcked (client ().admin ().indices ().prepareClose ("test" ));
146
+ }
127
147
128
148
logger .info ("--> waiting for shards to be allocated" );
129
- health = client ().admin ().cluster ().prepareHealth ().setWaitForEvents (Priority .LANGUID ).setWaitForGreenStatus ()
149
+ health = client ().admin ().cluster ().prepareHealth ()
150
+ .setIndices ("test" )
151
+ .setWaitForEvents (Priority .LANGUID )
152
+ .setWaitForGreenStatus ()
130
153
.setWaitForNoRelocatingShards (true ).execute ().actionGet ();
131
154
assertThat (health .isTimedOut (), equalTo (false ));
132
155
@@ -146,7 +169,7 @@ public void testAwarenessZones() throws Exception {
146
169
assertThat (counts .get (B_0 ), anyOf (equalTo (2 ),equalTo (3 )));
147
170
}
148
171
149
- public void testAwarenessZonesIncrementalNodes () throws Exception {
172
+ public void testAwarenessZonesIncrementalNodes () {
150
173
Settings commonSettings = Settings .builder ()
151
174
.put ("cluster.routing.allocation.awareness.force.zone.values" , "a,b" )
152
175
.put ("cluster.routing.allocation.awareness.attributes" , "zone" )
@@ -159,11 +182,23 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
159
182
);
160
183
String A_0 = nodes .get (0 );
161
184
String B_0 = nodes .get (1 );
162
- client ().admin ().indices ().prepareCreate ("test" )
163
- .setSettings (Settings .builder ().put ("index.number_of_shards" , 5 )
164
- .put ("index.number_of_replicas" , 1 )).execute ().actionGet ();
165
- ClusterHealthResponse health = client ().admin ().cluster ().prepareHealth ().setWaitForEvents (Priority .LANGUID )
166
- .setWaitForGreenStatus ().setWaitForNodes ("2" ).setWaitForNoRelocatingShards (true ).execute ().actionGet ();
185
+
186
+ createIndex ("test" , Settings .builder ()
187
+ .put (IndexMetaData .SETTING_NUMBER_OF_SHARDS , 5 )
188
+ .put (IndexMetaData .SETTING_NUMBER_OF_REPLICAS , 1 )
189
+ .build ());
190
+
191
+ if (randomBoolean ()) {
192
+ assertAcked (client ().admin ().indices ().prepareClose ("test" ));
193
+ }
194
+
195
+ ClusterHealthResponse health = client ().admin ().cluster ().prepareHealth ()
196
+ .setIndices ("test" )
197
+ .setWaitForEvents (Priority .LANGUID )
198
+ .setWaitForGreenStatus ()
199
+ .setWaitForNodes ("2" )
200
+ .setWaitForNoRelocatingShards (true )
201
+ .execute ().actionGet ();
167
202
assertThat (health .isTimedOut (), equalTo (false ));
168
203
ClusterState clusterState = client ().admin ().cluster ().prepareState ().execute ().actionGet ().getState ();
169
204
ObjectIntHashMap <String > counts = new ObjectIntHashMap <>();
@@ -180,12 +215,22 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
180
215
logger .info ("--> starting another node in zone 'b'" );
181
216
182
217
String B_1 = internalCluster ().startNode (Settings .builder ().put (commonSettings ).put ("node.attr.zone" , "b" ).build ());
183
- health = client ().admin ().cluster ().prepareHealth ().setWaitForEvents (Priority .LANGUID ).setWaitForGreenStatus ()
184
- .setWaitForNodes ("3" ).execute ().actionGet ();
218
+ health = client ().admin ().cluster ().prepareHealth ()
219
+ .setIndices ("test" )
220
+ .setWaitForEvents (Priority .LANGUID )
221
+ .setWaitForGreenStatus ()
222
+ .setWaitForNodes ("3" )
223
+ .execute ().actionGet ();
185
224
assertThat (health .isTimedOut (), equalTo (false ));
186
225
client ().admin ().cluster ().prepareReroute ().get ();
187
- health = client ().admin ().cluster ().prepareHealth ().setWaitForEvents (Priority .LANGUID ).setWaitForGreenStatus ()
188
- .setWaitForNodes ("3" ).setWaitForActiveShards (10 ).setWaitForNoRelocatingShards (true ).execute ().actionGet ();
226
+ health = client ().admin ().cluster ().prepareHealth ()
227
+ .setIndices ("test" )
228
+ .setWaitForEvents (Priority .LANGUID )
229
+ .setWaitForGreenStatus ()
230
+ .setWaitForNodes ("3" )
231
+ .setWaitForActiveShards (10 )
232
+ .setWaitForNoRelocatingShards (true )
233
+ .execute ().actionGet ();
189
234
190
235
assertThat (health .isTimedOut (), equalTo (false ));
191
236
clusterState = client ().admin ().cluster ().prepareState ().execute ().actionGet ().getState ();
@@ -204,12 +249,22 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
204
249
assertThat (counts .get (B_1 ), equalTo (2 ));
205
250
206
251
String noZoneNode = internalCluster ().startNode ();
207
- health = client ().admin ().cluster ().prepareHealth ().setWaitForEvents (Priority .LANGUID ).setWaitForGreenStatus ()
208
- .setWaitForNodes ("4" ).execute ().actionGet ();
252
+ health = client ().admin ().cluster ().prepareHealth ()
253
+ .setIndices ("test" )
254
+ .setWaitForEvents (Priority .LANGUID )
255
+ .setWaitForGreenStatus ()
256
+ .setWaitForNodes ("4" )
257
+ .execute ().actionGet ();
209
258
assertThat (health .isTimedOut (), equalTo (false ));
210
259
client ().admin ().cluster ().prepareReroute ().get ();
211
- health = client ().admin ().cluster ().prepareHealth ().setWaitForEvents (Priority .LANGUID ).setWaitForGreenStatus ()
212
- .setWaitForNodes ("4" ).setWaitForActiveShards (10 ).setWaitForNoRelocatingShards (true ).execute ().actionGet ();
260
+ health = client ().admin ().cluster ().prepareHealth ()
261
+ .setIndices ("test" )
262
+ .setWaitForEvents (Priority .LANGUID )
263
+ .setWaitForGreenStatus ()
264
+ .setWaitForNodes ("4" )
265
+ .setWaitForActiveShards (10 )
266
+ .setWaitForNoRelocatingShards (true )
267
+ .execute ().actionGet ();
213
268
214
269
assertThat (health .isTimedOut (), equalTo (false ));
215
270
clusterState = client ().admin ().cluster ().prepareState ().execute ().actionGet ().getState ();
@@ -231,8 +286,14 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
231
286
client ().admin ().cluster ().prepareUpdateSettings ()
232
287
.setTransientSettings (Settings .builder ().put ("cluster.routing.allocation.awareness.attributes" , "" ).build ()).get ();
233
288
234
- health = client ().admin ().cluster ().prepareHealth ().setWaitForEvents (Priority .LANGUID ).setWaitForGreenStatus ()
235
- .setWaitForNodes ("4" ).setWaitForActiveShards (10 ).setWaitForNoRelocatingShards (true ).execute ().actionGet ();
289
+ health = client ().admin ().cluster ().prepareHealth ()
290
+ .setIndices ("test" )
291
+ .setWaitForEvents (Priority .LANGUID )
292
+ .setWaitForGreenStatus ()
293
+ .setWaitForNodes ("4" )
294
+ .setWaitForActiveShards (10 )
295
+ .setWaitForNoRelocatingShards (true )
296
+ .execute ().actionGet ();
236
297
237
298
assertThat (health .isTimedOut (), equalTo (false ));
238
299
clusterState = client ().admin ().cluster ().prepareState ().execute ().actionGet ().getState ();
0 commit comments