25
25
import org .elasticsearch .cluster .routing .ShardRouting ;
26
26
import org .elasticsearch .cluster .routing .ShardRoutingState ;
27
27
import org .elasticsearch .cluster .routing .allocation .decider .EnableAllocationDecider ;
28
+ import org .elasticsearch .common .Priority ;
28
29
import org .elasticsearch .common .settings .Settings ;
29
30
import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
30
31
import org .elasticsearch .env .Environment ;
45
46
import java .util .Locale ;
46
47
import java .util .Map ;
47
48
import java .util .Set ;
49
+ import java .util .concurrent .TimeUnit ;
48
50
import java .util .stream .Collectors ;
49
51
50
52
import static java .util .Collections .singletonList ;
56
58
import static org .hamcrest .Matchers .hasSize ;
57
59
import static org .hamcrest .Matchers .notNullValue ;
58
60
59
- @ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST )
61
+ @ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 , numClientNodes = 0 )
60
62
public class IndexFoldersDeletionListenerIT extends ESIntegTestCase {
61
63
62
64
@ Override
@@ -67,26 +69,37 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
67
69
}
68
70
69
71
public void testListenersInvokedWhenIndexIsDeleted () throws Exception {
72
+ final String masterNode = internalCluster ().startMasterOnlyNode ();
73
+ internalCluster ().startDataOnlyNodes (2 );
74
+ ensureStableCluster (2 + 1 , masterNode );
75
+
70
76
final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
71
- createIndex (indexName );
77
+ createIndex (indexName , Settings .builder ()
78
+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 2 * between (1 , 2 ))
79
+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , between (0 , 1 ))
80
+ .build ());
72
81
73
82
final NumShards numShards = getNumShards (indexName );
74
- ensureClusterSizeConsistency (); // wait for a stable cluster
75
- ensureGreen (indexName ); // wait for no relocation
76
-
77
- final ClusterState clusterState = clusterService ().state ();
83
+ assertFalse (client ().admin ().cluster ().prepareHealth ()
84
+ .setIndices (indexName )
85
+ .setWaitForGreenStatus ()
86
+ .setWaitForEvents (Priority .LANGUID )
87
+ .setWaitForNoRelocatingShards (true )
88
+ .setWaitForNoInitializingShards (true )
89
+ .get ()
90
+ .isTimedOut ());
91
+
92
+ final ClusterState clusterState = internalCluster ().clusterService (masterNode ).state ();
78
93
final Index index = clusterState .metadata ().index (indexName ).getIndex ();
79
94
final Map <String , List <ShardRouting >> shardsByNodes = shardRoutingsByNodes (clusterState , index );
80
95
assertThat (shardsByNodes .values ().stream ().mapToInt (List ::size ).sum (), equalTo (numShards .totalNumShards ));
81
96
82
97
for (Map .Entry <String , List <ShardRouting >> shardsByNode : shardsByNodes .entrySet ()) {
83
- final String nodeName = shardsByNode .getKey ();
84
- final IndexFoldersDeletionListenerPlugin plugin = plugin (nodeName );
85
- assertTrue ("Expecting no indices deleted on node " + nodeName , plugin .deletedIndices .isEmpty ());
86
- assertTrue ("Expecting no shards deleted on node " + nodeName , plugin .deletedShards .isEmpty ());
98
+ assertNoDeletions (shardsByNode .getKey ());
87
99
}
88
100
89
101
assertAcked (client ().admin ().indices ().prepareDelete (indexName ));
102
+ assertPendingDeletesProcessed ();
90
103
91
104
assertBusy (() -> {
92
105
for (Map .Entry <String , List <ShardRouting >> shardsByNode : shardsByNodes .entrySet ()) {
@@ -106,30 +119,37 @@ public void testListenersInvokedWhenIndexIsDeleted() throws Exception {
106
119
deletedShards .contains (shardId ));
107
120
}
108
121
}
109
- });
122
+ }, 30L , TimeUnit . SECONDS );
110
123
}
111
124
112
125
public void testListenersInvokedWhenIndexIsRelocated () throws Exception {
113
- internalCluster ().ensureAtLeastNumDataNodes (4 );
126
+ final String masterNode = internalCluster ().startMasterOnlyNode ();
127
+ internalCluster ().startDataOnlyNodes (4 );
128
+ ensureStableCluster (4 + 1 , masterNode );
129
+
114
130
final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
115
131
createIndex (indexName , Settings .builder ()
116
- .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , between (4 , 10 ))
132
+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 4 * between (1 , 2 ))
117
133
.put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , between (0 , 1 ))
118
134
.build ());
119
135
120
136
final NumShards numShards = getNumShards (indexName );
121
- ensureGreen (indexName );
122
-
123
- final ClusterState clusterState = clusterService ().state ();
137
+ assertFalse (client ().admin ().cluster ().prepareHealth ()
138
+ .setIndices (indexName )
139
+ .setWaitForGreenStatus ()
140
+ .setWaitForEvents (Priority .LANGUID )
141
+ .setWaitForNoRelocatingShards (true )
142
+ .setWaitForNoInitializingShards (true )
143
+ .get ()
144
+ .isTimedOut ());
145
+
146
+ final ClusterState clusterState = internalCluster ().clusterService (masterNode ).state ();
124
147
final Index index = clusterState .metadata ().index (indexName ).getIndex ();
125
148
final Map <String , List <ShardRouting >> shardsByNodes = shardRoutingsByNodes (clusterState , index );
126
149
assertThat (shardsByNodes .values ().stream ().mapToInt (List ::size ).sum (), equalTo (numShards .totalNumShards ));
127
150
128
151
for (Map .Entry <String , List <ShardRouting >> shardsByNode : shardsByNodes .entrySet ()) {
129
- final String nodeName = shardsByNode .getKey ();
130
- final IndexFoldersDeletionListenerPlugin plugin = plugin (nodeName );
131
- assertTrue ("Expecting no indices deleted on node " + nodeName , plugin .deletedIndices .isEmpty ());
132
- assertTrue ("Expecting no shards deleted on node " + nodeName , plugin .deletedShards .isEmpty ());
152
+ assertNoDeletions (shardsByNode .getKey ());
133
153
}
134
154
135
155
final List <String > excludedNodes = randomSubsetOf (2 , shardsByNodes .keySet ());
@@ -159,48 +179,58 @@ public void testListenersInvokedWhenIndexIsRelocated() throws Exception {
159
179
deletedShards .contains (shardId ));
160
180
}
161
181
} else {
162
- assertTrue ("Expecting no indices deleted on node " + nodeName , plugin .deletedIndices .isEmpty ());
163
- assertTrue ("Expecting no shards deleted on node " + nodeName , plugin .deletedShards .isEmpty ());
182
+ assertNoDeletions (nodeName );
164
183
}
165
184
}
166
- });
185
+ }, 30L , TimeUnit . SECONDS );
167
186
}
168
187
169
188
public void testListenersInvokedWhenIndexIsDangling () throws Exception {
170
- internalCluster ().ensureAtLeastNumDataNodes (4 );
189
+ final String masterNode = internalCluster ().startMasterOnlyNode ();
190
+ internalCluster ().startDataOnlyNodes (4 );
191
+ ensureStableCluster (4 + 1 , masterNode );
192
+
171
193
final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
172
194
createIndex (indexName , Settings .builder ()
173
- .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , between (4 , 10 ))
195
+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 4 * between (1 , 2 ))
174
196
.put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , between (0 , 1 ))
175
197
.build ());
176
198
177
199
final NumShards numShards = getNumShards (indexName );
178
- ensureGreen (indexName );
179
-
180
- final ClusterState clusterState = clusterService ().state ();
200
+ assertFalse (client ().admin ().cluster ().prepareHealth ()
201
+ .setIndices (indexName )
202
+ .setWaitForGreenStatus ()
203
+ .setWaitForEvents (Priority .LANGUID )
204
+ .setWaitForNoRelocatingShards (true )
205
+ .setWaitForNoInitializingShards (true )
206
+ .get ()
207
+ .isTimedOut ());
208
+
209
+ final ClusterState clusterState = internalCluster ().clusterService (masterNode ).state ();
181
210
final Index index = clusterState .metadata ().index (indexName ).getIndex ();
182
211
final Map <String , List <ShardRouting >> shardsByNodes = shardRoutingsByNodes (clusterState , index );
183
212
assertThat (shardsByNodes .values ().stream ().mapToInt (List ::size ).sum (), equalTo (numShards .totalNumShards ));
184
213
185
214
for (Map .Entry <String , List <ShardRouting >> shardsByNode : shardsByNodes .entrySet ()) {
186
- final String nodeName = shardsByNode .getKey ();
187
- final IndexFoldersDeletionListenerPlugin plugin = plugin (nodeName );
188
- assertTrue ("Expecting no indices deleted on node " + nodeName , plugin .deletedIndices .isEmpty ());
189
- assertTrue ("Expecting no shards deleted on node " + nodeName , plugin .deletedShards .isEmpty ());
215
+ assertNoDeletions (shardsByNode .getKey ());
190
216
}
191
217
192
218
final String stoppedNode = randomFrom (shardsByNodes .keySet ());
193
219
final Settings stoppedNodeDataPathSettings = internalCluster ().dataPathSettings (stoppedNode );
194
220
internalCluster ().stopRandomNode (InternalTestCluster .nameFilter (stoppedNode ));
221
+ ensureStableCluster (3 + 1 , masterNode );
195
222
196
223
assertAcked (client ().admin ().indices ().prepareDelete (indexName ));
197
224
198
225
final String restartedNode = internalCluster ().startNode (stoppedNodeDataPathSettings );
226
+ ensureStableCluster (4 + 1 , masterNode );
227
+ assertPendingDeletesProcessed ();
228
+
199
229
assertBusy (() -> {
200
230
final IndexFoldersDeletionListenerPlugin plugin = plugin (restartedNode );
201
231
assertTrue ("Listener should have been notified of deletion of index " + index + " on node " + restartedNode ,
202
232
plugin .deletedIndices .contains (index ));
203
- });
233
+ }, 30L , TimeUnit . SECONDS );
204
234
}
205
235
206
236
public void testListenersInvokedWhenIndexHasLeftOverShard () throws Exception {
@@ -229,6 +259,7 @@ public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception {
229
259
230
260
logger .debug ("--> stopping data node [{}], the data left on disk will be injected as left-overs in a newer data node" , dataNode );
231
261
internalCluster ().stopRandomNode (InternalTestCluster .nameFilter (dataNode ));
262
+ ensureStableCluster (1 , masterNode );
232
263
233
264
logger .debug ("--> deleting leftover indices" );
234
265
assertAcked (client ().admin ().indices ().prepareDelete ("index-*" ));
@@ -266,6 +297,7 @@ public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception {
266
297
dataPaths .stream ().map (p -> p .toAbsolutePath ().toString ()).collect (Collectors .toList ()))
267
298
.putNull (Environment .PATH_SHARED_DATA_SETTING .getKey ())
268
299
.build ());
300
+ ensureStableCluster (1 + 1 , masterNode );
269
301
270
302
final IndexFoldersDeletionListenerPlugin plugin = plugin (dataNode );
271
303
assertTrue ("Expecting no shards deleted on node " + dataNode , plugin .deletedShards .isEmpty ());
@@ -330,4 +362,12 @@ private static void assertPendingDeletesProcessed() throws Exception {
330
362
services .forEach (indicesService -> assertFalse (indicesService .hasUncompletedPendingDeletes ()));
331
363
});
332
364
}
365
+
366
+ private static void assertNoDeletions (String nodeName ) {
367
+ final IndexFoldersDeletionListenerPlugin plugin = plugin (nodeName );
368
+ assertTrue ("Expecting no indices deleted on node [" + nodeName + "] but got: " + plugin .deletedIndices ,
369
+ plugin .deletedIndices .isEmpty ());
370
+ assertTrue ("Expecting no shards deleted on node [" + nodeName + "] but got: " + plugin .deletedShards ,
371
+ plugin .deletedShards .isEmpty ());
372
+ }
333
373
}
0 commit comments