25
25
import org .elasticsearch .cluster .routing .ShardRouting ;
26
26
import org .elasticsearch .cluster .routing .ShardRoutingState ;
27
27
import org .elasticsearch .cluster .routing .allocation .decider .EnableAllocationDecider ;
28
+ import org .elasticsearch .common .Priority ;
28
29
import org .elasticsearch .common .settings .Settings ;
29
30
import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
30
31
import org .elasticsearch .env .Environment ;
45
46
import java .util .Locale ;
46
47
import java .util .Map ;
47
48
import java .util .Set ;
49
+ import java .util .concurrent .TimeUnit ;
48
50
import java .util .stream .Collectors ;
49
51
50
52
import static org .elasticsearch .env .NodeEnvironment .INDICES_FOLDER ;
55
57
import static org .hamcrest .Matchers .hasSize ;
56
58
import static org .hamcrest .Matchers .notNullValue ;
57
59
58
- @ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST )
60
+ @ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 , numClientNodes = 0 )
59
61
public class IndexFoldersDeletionListenerIT extends ESIntegTestCase {
60
62
61
63
@ Override
@@ -66,26 +68,37 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
66
68
}
67
69
68
70
public void testListenersInvokedWhenIndexIsDeleted () throws Exception {
71
+ final String masterNode = internalCluster ().startMasterOnlyNode ();
72
+ internalCluster ().startDataOnlyNodes (2 );
73
+ ensureStableCluster (2 + 1 , masterNode );
74
+
69
75
final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
70
- createIndex (indexName );
76
+ createIndex (indexName , Settings .builder ()
77
+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 2 * between (1 , 2 ))
78
+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , between (0 , 1 ))
79
+ .build ());
71
80
72
81
final NumShards numShards = getNumShards (indexName );
73
- ensureClusterSizeConsistency (); // wait for a stable cluster
74
- ensureGreen (indexName ); // wait for no relocation
75
-
76
- final ClusterState clusterState = clusterService ().state ();
82
+ assertFalse (client ().admin ().cluster ().prepareHealth ()
83
+ .setIndices (indexName )
84
+ .setWaitForGreenStatus ()
85
+ .setWaitForEvents (Priority .LANGUID )
86
+ .setWaitForNoRelocatingShards (true )
87
+ .setWaitForNoInitializingShards (true )
88
+ .get ()
89
+ .isTimedOut ());
90
+
91
+ final ClusterState clusterState = internalCluster ().clusterService (masterNode ).state ();
77
92
final Index index = clusterState .metadata ().index (indexName ).getIndex ();
78
93
final Map <String , List <ShardRouting >> shardsByNodes = shardRoutingsByNodes (clusterState , index );
79
94
assertThat (shardsByNodes .values ().stream ().mapToInt (List ::size ).sum (), equalTo (numShards .totalNumShards ));
80
95
81
96
for (Map .Entry <String , List <ShardRouting >> shardsByNode : shardsByNodes .entrySet ()) {
82
- final String nodeName = shardsByNode .getKey ();
83
- final IndexFoldersDeletionListenerPlugin plugin = plugin (nodeName );
84
- assertTrue ("Expecting no indices deleted on node " + nodeName , plugin .deletedIndices .isEmpty ());
85
- assertTrue ("Expecting no shards deleted on node " + nodeName , plugin .deletedShards .isEmpty ());
97
+ assertNoDeletions (shardsByNode .getKey ());
86
98
}
87
99
88
100
assertAcked (client ().admin ().indices ().prepareDelete (indexName ));
101
+ assertPendingDeletesProcessed ();
89
102
90
103
assertBusy (() -> {
91
104
for (Map .Entry <String , List <ShardRouting >> shardsByNode : shardsByNodes .entrySet ()) {
@@ -105,30 +118,37 @@ public void testListenersInvokedWhenIndexIsDeleted() throws Exception {
105
118
deletedShards .contains (shardId ));
106
119
}
107
120
}
108
- });
121
+ }, 30L , TimeUnit . SECONDS );
109
122
}
110
123
111
124
public void testListenersInvokedWhenIndexIsRelocated () throws Exception {
112
- internalCluster ().ensureAtLeastNumDataNodes (4 );
125
+ final String masterNode = internalCluster ().startMasterOnlyNode ();
126
+ internalCluster ().startDataOnlyNodes (4 );
127
+ ensureStableCluster (4 + 1 , masterNode );
128
+
113
129
final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
114
130
createIndex (indexName , Settings .builder ()
115
- .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , between (4 , 10 ))
131
+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 4 * between (1 , 2 ))
116
132
.put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , between (0 , 1 ))
117
133
.build ());
118
134
119
135
final NumShards numShards = getNumShards (indexName );
120
- ensureGreen (indexName );
121
-
122
- final ClusterState clusterState = clusterService ().state ();
136
+ assertFalse (client ().admin ().cluster ().prepareHealth ()
137
+ .setIndices (indexName )
138
+ .setWaitForGreenStatus ()
139
+ .setWaitForEvents (Priority .LANGUID )
140
+ .setWaitForNoRelocatingShards (true )
141
+ .setWaitForNoInitializingShards (true )
142
+ .get ()
143
+ .isTimedOut ());
144
+
145
+ final ClusterState clusterState = internalCluster ().clusterService (masterNode ).state ();
123
146
final Index index = clusterState .metadata ().index (indexName ).getIndex ();
124
147
final Map <String , List <ShardRouting >> shardsByNodes = shardRoutingsByNodes (clusterState , index );
125
148
assertThat (shardsByNodes .values ().stream ().mapToInt (List ::size ).sum (), equalTo (numShards .totalNumShards ));
126
149
127
150
for (Map .Entry <String , List <ShardRouting >> shardsByNode : shardsByNodes .entrySet ()) {
128
- final String nodeName = shardsByNode .getKey ();
129
- final IndexFoldersDeletionListenerPlugin plugin = plugin (nodeName );
130
- assertTrue ("Expecting no indices deleted on node " + nodeName , plugin .deletedIndices .isEmpty ());
131
- assertTrue ("Expecting no shards deleted on node " + nodeName , plugin .deletedShards .isEmpty ());
151
+ assertNoDeletions (shardsByNode .getKey ());
132
152
}
133
153
134
154
final List <String > excludedNodes = randomSubsetOf (2 , shardsByNodes .keySet ());
@@ -158,48 +178,58 @@ public void testListenersInvokedWhenIndexIsRelocated() throws Exception {
158
178
deletedShards .contains (shardId ));
159
179
}
160
180
} else {
161
- assertTrue ("Expecting no indices deleted on node " + nodeName , plugin .deletedIndices .isEmpty ());
162
- assertTrue ("Expecting no shards deleted on node " + nodeName , plugin .deletedShards .isEmpty ());
181
+ assertNoDeletions (nodeName );
163
182
}
164
183
}
165
- });
184
+ }, 30L , TimeUnit . SECONDS );
166
185
}
167
186
168
187
public void testListenersInvokedWhenIndexIsDangling () throws Exception {
169
- internalCluster ().ensureAtLeastNumDataNodes (4 );
188
+ final String masterNode = internalCluster ().startMasterOnlyNode ();
189
+ internalCluster ().startDataOnlyNodes (4 );
190
+ ensureStableCluster (4 + 1 , masterNode );
191
+
170
192
final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
171
193
createIndex (indexName , Settings .builder ()
172
- .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , between (4 , 10 ))
194
+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 4 * between (1 , 2 ))
173
195
.put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , between (0 , 1 ))
174
196
.build ());
175
197
176
198
final NumShards numShards = getNumShards (indexName );
177
- ensureGreen (indexName );
178
-
179
- final ClusterState clusterState = clusterService ().state ();
199
+ assertFalse (client ().admin ().cluster ().prepareHealth ()
200
+ .setIndices (indexName )
201
+ .setWaitForGreenStatus ()
202
+ .setWaitForEvents (Priority .LANGUID )
203
+ .setWaitForNoRelocatingShards (true )
204
+ .setWaitForNoInitializingShards (true )
205
+ .get ()
206
+ .isTimedOut ());
207
+
208
+ final ClusterState clusterState = internalCluster ().clusterService (masterNode ).state ();
180
209
final Index index = clusterState .metadata ().index (indexName ).getIndex ();
181
210
final Map <String , List <ShardRouting >> shardsByNodes = shardRoutingsByNodes (clusterState , index );
182
211
assertThat (shardsByNodes .values ().stream ().mapToInt (List ::size ).sum (), equalTo (numShards .totalNumShards ));
183
212
184
213
for (Map .Entry <String , List <ShardRouting >> shardsByNode : shardsByNodes .entrySet ()) {
185
- final String nodeName = shardsByNode .getKey ();
186
- final IndexFoldersDeletionListenerPlugin plugin = plugin (nodeName );
187
- assertTrue ("Expecting no indices deleted on node " + nodeName , plugin .deletedIndices .isEmpty ());
188
- assertTrue ("Expecting no shards deleted on node " + nodeName , plugin .deletedShards .isEmpty ());
214
+ assertNoDeletions (shardsByNode .getKey ());
189
215
}
190
216
191
217
final String stoppedNode = randomFrom (shardsByNodes .keySet ());
192
218
final Settings stoppedNodeDataPathSettings = internalCluster ().dataPathSettings (stoppedNode );
193
219
internalCluster ().stopRandomNode (InternalTestCluster .nameFilter (stoppedNode ));
220
+ ensureStableCluster (3 + 1 , masterNode );
194
221
195
222
assertAcked (client ().admin ().indices ().prepareDelete (indexName ));
196
223
197
224
final String restartedNode = internalCluster ().startNode (stoppedNodeDataPathSettings );
225
+ ensureStableCluster (4 + 1 , masterNode );
226
+ assertPendingDeletesProcessed ();
227
+
198
228
assertBusy (() -> {
199
229
final IndexFoldersDeletionListenerPlugin plugin = plugin (restartedNode );
200
230
assertTrue ("Listener should have been notified of deletion of index " + index + " on node " + restartedNode ,
201
231
plugin .deletedIndices .contains (index ));
202
- });
232
+ }, 30L , TimeUnit . SECONDS );
203
233
}
204
234
205
235
public void testListenersInvokedWhenIndexHasLeftOverShard () throws Exception {
@@ -228,6 +258,7 @@ public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception {
228
258
229
259
logger .debug ("--> stopping data node [{}], the data left on disk will be injected as left-overs in a newer data node" , dataNode );
230
260
internalCluster ().stopRandomNode (InternalTestCluster .nameFilter (dataNode ));
261
+ ensureStableCluster (1 , masterNode );
231
262
232
263
logger .debug ("--> deleting leftover indices" );
233
264
assertAcked (client ().admin ().indices ().prepareDelete ("index-*" ));
@@ -264,6 +295,7 @@ public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception {
264
295
dataPaths .stream ().map (p -> p .toAbsolutePath ().toString ()).collect (Collectors .toList ()))
265
296
.putNull (Environment .PATH_SHARED_DATA_SETTING .getKey ())
266
297
.build ());
298
+ ensureStableCluster (1 + 1 , masterNode );
267
299
268
300
final IndexFoldersDeletionListenerPlugin plugin = plugin (dataNode );
269
301
assertTrue ("Expecting no shards deleted on node " + dataNode , plugin .deletedShards .isEmpty ());
@@ -328,4 +360,12 @@ private static void assertPendingDeletesProcessed() throws Exception {
328
360
services .forEach (indicesService -> assertFalse (indicesService .hasUncompletedPendingDeletes ()));
329
361
});
330
362
}
363
+
364
+ private static void assertNoDeletions (String nodeName ) {
365
+ final IndexFoldersDeletionListenerPlugin plugin = plugin (nodeName );
366
+ assertTrue ("Expecting no indices deleted on node [" + nodeName + "] but got: " + plugin .deletedIndices ,
367
+ plugin .deletedIndices .isEmpty ());
368
+ assertTrue ("Expecting no shards deleted on node [" + nodeName + "] but got: " + plugin .deletedShards ,
369
+ plugin .deletedShards .isEmpty ());
370
+ }
331
371
}
0 commit comments