17
17
* under the License.
18
18
*/
19
19
20
- package org .elasticsearch .indices . state ;
20
+ package org .elasticsearch .cluster . coordination ;
21
21
22
22
import org .elasticsearch .ElasticsearchParseException ;
23
23
import org .elasticsearch .Version ;
24
24
import org .elasticsearch .action .ActionFuture ;
25
- import org .elasticsearch .action .ActionListener ;
25
+ import org .elasticsearch .action .ActionRequest ;
26
+ import org .elasticsearch .action .ActionRequestBuilder ;
27
+ import org .elasticsearch .action .ActionResponse ;
26
28
import org .elasticsearch .action .index .IndexResponse ;
27
29
import org .elasticsearch .action .support .master .AcknowledgedResponse ;
28
30
import org .elasticsearch .cluster .ClusterState ;
40
42
import org .elasticsearch .common .collect .ImmutableOpenMap ;
41
43
import org .elasticsearch .common .settings .Settings ;
42
44
import org .elasticsearch .common .unit .TimeValue ;
43
- import org .elasticsearch .discovery .DiscoverySettings ;
45
+ import org .elasticsearch .discovery .Discovery ;
44
46
import org .elasticsearch .index .Index ;
45
47
import org .elasticsearch .index .IndexService ;
46
48
import org .elasticsearch .index .mapper .DocumentMapper ;
51
53
import org .elasticsearch .test .disruption .BlockClusterStateProcessing ;
52
54
import org .elasticsearch .test .junit .annotations .TestLogging ;
53
55
54
- import java .util .Arrays ;
55
56
import java .util .List ;
56
57
import java .util .Map ;
57
- import java .util .concurrent .atomic . AtomicReference ;
58
+ import java .util .concurrent .TimeUnit ;
58
59
59
60
import static java .util .Collections .emptyMap ;
60
61
import static java .util .Collections .emptySet ;
@@ -86,7 +87,7 @@ protected int numberOfReplicas() {
86
87
return 0 ;
87
88
}
88
89
89
- public void testAssignmentWithJustAddedNodes () throws Exception {
90
+ public void testAssignmentWithJustAddedNodes () {
90
91
internalCluster ().startNode ();
91
92
final String index = "index" ;
92
93
prepareCreate (index ).setSettings (Settings .builder ().put (IndexMetaData .SETTING_NUMBER_OF_SHARDS , 1 )
@@ -149,22 +150,20 @@ public void onFailure(String source, Exception e) {
149
150
});
150
151
}
151
152
153
+ private <Req extends ActionRequest , Res extends ActionResponse > ActionFuture <Res > executeAndCancelCommittedPublication (
154
+ ActionRequestBuilder <Req , Res > req ) throws Exception {
155
+ ActionFuture <Res > future = req .execute ();
156
+ assertBusy (() -> assertTrue (((Coordinator )internalCluster ().getMasterNodeInstance (Discovery .class )).cancelCommittedPublication ()));
157
+ return future ;
158
+ }
159
+
152
160
public void testDeleteCreateInOneBulk () throws Exception {
153
- internalCluster ().startMasterOnlyNode (Settings .builder ()
154
- .put (TestZenDiscovery .USE_ZEN2 .getKey (), false ) // TODO: convert test to support Zen2
155
- .build ());
156
- String dataNode = internalCluster ().startDataOnlyNode (Settings .builder ()
157
- .put (TestZenDiscovery .USE_ZEN2 .getKey (), false ) // TODO: convert test to support Zen2
158
- .build ());
161
+ internalCluster ().startMasterOnlyNode ();
162
+ String dataNode = internalCluster ().startDataOnlyNode ();
159
163
assertFalse (client ().admin ().cluster ().prepareHealth ().setWaitForNodes ("2" ).get ().isTimedOut ());
160
164
prepareCreate ("test" ).setSettings (Settings .builder ().put (IndexMetaData .SETTING_NUMBER_OF_REPLICAS , 0 )).addMapping ("type" ).get ();
161
165
ensureGreen ("test" );
162
166
163
- // now that the cluster is stable, remove publishing timeout
164
- assertAcked (client ().admin ().cluster ().prepareUpdateSettings ().setTransientSettings (Settings .builder ()
165
- .put (DiscoverySettings .PUBLISH_TIMEOUT_SETTING .getKey (), "0" )
166
- .put (DiscoverySettings .COMMIT_TIMEOUT_SETTING .getKey (), "30s" )));
167
-
168
167
// block none master node.
169
168
BlockClusterStateProcessing disruption = new BlockClusterStateProcessing (dataNode , random ());
170
169
internalCluster ().setDisruptionScheme (disruption );
@@ -173,10 +172,14 @@ public void testDeleteCreateInOneBulk() throws Exception {
173
172
refresh ();
174
173
disruption .startDisrupting ();
175
174
logger .info ("--> delete index and recreate it" );
176
- assertFalse (client ().admin ().indices ().prepareDelete ("test" ).setTimeout ("200ms" ).get ().isAcknowledged ());
177
- assertFalse (prepareCreate ("test" ).setTimeout ("200ms" ).setSettings (Settings .builder ().put (IndexMetaData
178
- .SETTING_NUMBER_OF_REPLICAS , 0 ).put (IndexMetaData .SETTING_WAIT_FOR_ACTIVE_SHARDS .getKey (), "0" )).get ().isAcknowledged ());
175
+ executeAndCancelCommittedPublication (client ().admin ().indices ().prepareDelete ("test" ).setTimeout ("0s" ))
176
+ .get (10 , TimeUnit .SECONDS );
177
+ executeAndCancelCommittedPublication (prepareCreate ("test" ).setSettings (Settings .builder ().put (IndexMetaData
178
+ .SETTING_NUMBER_OF_REPLICAS , 0 ).put (IndexMetaData .SETTING_WAIT_FOR_ACTIVE_SHARDS .getKey (), "0" )).setTimeout ("0s" ))
179
+ .get (10 , TimeUnit .SECONDS );
180
+
179
181
logger .info ("--> letting cluster proceed" );
182
+
180
183
disruption .stopDisrupting ();
181
184
ensureGreen (TimeValue .timeValueMinutes (30 ), "test" );
182
185
// due to publish_timeout of 0, wait for data node to have cluster state fully applied
@@ -196,12 +199,7 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {
196
199
// but the change might not be on the node that performed the indexing
197
200
// operation yet
198
201
199
- Settings settings = Settings .builder ()
200
- .put (DiscoverySettings .COMMIT_TIMEOUT_SETTING .getKey (), "30s" ) // explicitly set so it won't default to publish timeout
201
- .put (DiscoverySettings .PUBLISH_TIMEOUT_SETTING .getKey (), "0s" ) // don't wait post commit as we are blocking things by design
202
- .put (TestZenDiscovery .USE_ZEN2 .getKey (), false ) // TODO: convert test to support Zen2
203
- .build ();
204
- final List <String > nodeNames = internalCluster ().startNodes (2 , settings );
202
+ final List <String > nodeNames = internalCluster ().startNodes (2 );
205
203
assertFalse (client ().admin ().cluster ().prepareHealth ().setWaitForNodes ("2" ).get ().isTimedOut ());
206
204
207
205
final String master = internalCluster ().getMasterName ();
@@ -242,19 +240,10 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {
242
240
disruption .startDisrupting ();
243
241
244
242
// Add a new mapping...
245
- final AtomicReference <Object > putMappingResponse = new AtomicReference <>();
246
- client ().admin ().indices ().preparePutMapping ("index" ).setType ("type" ).setSource ("field" , "type=long" ).execute (
247
- new ActionListener <AcknowledgedResponse >() {
248
- @ Override
249
- public void onResponse (AcknowledgedResponse response ) {
250
- putMappingResponse .set (response );
251
- }
243
+ ActionFuture <AcknowledgedResponse > putMappingResponse =
244
+ executeAndCancelCommittedPublication (client ().admin ().indices ().preparePutMapping ("index" )
245
+ .setType ("type" ).setSource ("field" , "type=long" ));
252
246
253
- @ Override
254
- public void onFailure (Exception e ) {
255
- putMappingResponse .set (e );
256
- }
257
- });
258
247
// ...and wait for mappings to be available on master
259
248
assertBusy (() -> {
260
249
ImmutableOpenMap <String , MappingMetaData > indexMappings = client ().admin ().indices ()
@@ -273,36 +262,24 @@ public void onFailure(Exception e) {
273
262
assertNotNull (fieldMapping );
274
263
});
275
264
276
- final AtomicReference <Object > docIndexResponse = new AtomicReference <>();
277
- client ().prepareIndex ("index" , "type" , "1" ).setSource ("field" , 42 ).execute (new ActionListener <IndexResponse >() {
278
- @ Override
279
- public void onResponse (IndexResponse response ) {
280
- docIndexResponse .set (response );
281
- }
282
-
283
- @ Override
284
- public void onFailure (Exception e ) {
285
- docIndexResponse .set (e );
286
- }
287
- });
265
+ // this request does not change the cluster state, because mapping is already created,
266
+ // we don't await and cancel committed publication
267
+ ActionFuture <IndexResponse > docIndexResponse =
268
+ client ().prepareIndex ("index" , "type" , "1" ).setSource ("field" , 42 ).execute ();
288
269
289
270
// Wait a bit to make sure that the reason why we did not get a response
290
271
// is that cluster state processing is blocked and not just that it takes
291
272
// time to process the indexing request
292
273
Thread .sleep (100 );
293
- assertThat (putMappingResponse .get (), equalTo ( null ));
294
- assertThat (docIndexResponse .get (), equalTo ( null ));
274
+ assertFalse (putMappingResponse .isDone ( ));
275
+ assertFalse (docIndexResponse .isDone ( ));
295
276
296
277
// Now make sure the indexing request finishes successfully
297
278
disruption .stopDisrupting ();
298
279
assertBusy (() -> {
299
- assertThat (putMappingResponse .get (), instanceOf (AcknowledgedResponse .class ));
300
- AcknowledgedResponse resp = (AcknowledgedResponse ) putMappingResponse .get ();
301
- assertTrue (resp .isAcknowledged ());
302
- assertThat (docIndexResponse .get (), instanceOf (IndexResponse .class ));
303
- IndexResponse docResp = (IndexResponse ) docIndexResponse .get ();
304
- assertEquals (Arrays .toString (docResp .getShardInfo ().getFailures ()),
305
- 1 , docResp .getShardInfo ().getTotal ());
280
+ assertTrue (putMappingResponse .get (10 , TimeUnit .SECONDS ).isAcknowledged ());
281
+ assertThat (docIndexResponse .get (10 , TimeUnit .SECONDS ), instanceOf (IndexResponse .class ));
282
+ assertEquals (1 , docIndexResponse .get (10 , TimeUnit .SECONDS ).getShardInfo ().getTotal ());
306
283
});
307
284
}
308
285
@@ -312,12 +289,7 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
312
289
// Here we want to test that everything goes well if the mappings that
313
290
// are needed for a document are not available on the replica at the
314
291
// time of indexing it
315
- final List <String > nodeNames = internalCluster ().startNodes (2 ,
316
- Settings .builder ()
317
- .put (DiscoverySettings .COMMIT_TIMEOUT_SETTING .getKey (), "30s" ) // explicitly set so it won't default to publish timeout
318
- .put (DiscoverySettings .PUBLISH_TIMEOUT_SETTING .getKey (), "0s" ) // don't wait post commit as we are blocking things by design
319
- .put (TestZenDiscovery .USE_ZEN2 .getKey (), false ) // TODO: convert test to support Zen2
320
- .build ());
292
+ final List <String > nodeNames = internalCluster ().startNodes (2 );
321
293
assertFalse (client ().admin ().cluster ().prepareHealth ().setWaitForNodes ("2" ).get ().isTimedOut ());
322
294
323
295
final String master = internalCluster ().getMasterName ();
@@ -359,19 +331,10 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
359
331
BlockClusterStateProcessing disruption = new BlockClusterStateProcessing (otherNode , random ());
360
332
internalCluster ().setDisruptionScheme (disruption );
361
333
disruption .startDisrupting ();
362
- final AtomicReference <Object > putMappingResponse = new AtomicReference <>();
363
- client ().admin ().indices ().preparePutMapping ("index" ).setType ("type" ).setSource ("field" , "type=long" ).execute (
364
- new ActionListener <AcknowledgedResponse >() {
365
- @ Override
366
- public void onResponse (AcknowledgedResponse response ) {
367
- putMappingResponse .set (response );
368
- }
334
+ final ActionFuture <AcknowledgedResponse > putMappingResponse =
335
+ executeAndCancelCommittedPublication (client ().admin ().indices ().preparePutMapping ("index" )
336
+ .setType ("type" ).setSource ("field" , "type=long" ));
369
337
370
- @ Override
371
- public void onFailure (Exception e ) {
372
- putMappingResponse .set (e );
373
- }
374
- });
375
338
final Index index = resolveIndex ("index" );
376
339
// Wait for mappings to be available on master
377
340
assertBusy (() -> {
@@ -384,25 +347,17 @@ public void onFailure(Exception e) {
384
347
assertNotNull (mapper .mappers ().getMapper ("field" ));
385
348
});
386
349
387
- final AtomicReference <Object > docIndexResponse = new AtomicReference <>();
388
- client ().prepareIndex ("index" , "type" , "1" ).setSource ("field" , 42 ).execute (new ActionListener <IndexResponse >() {
389
- @ Override
390
- public void onResponse (IndexResponse response ) {
391
- docIndexResponse .set (response );
392
- }
393
-
394
- @ Override
395
- public void onFailure (Exception e ) {
396
- docIndexResponse .set (e );
397
- }
398
- });
350
+ final ActionFuture <IndexResponse > docIndexResponse = client ().prepareIndex ("index" , "type" , "1" ).setSource ("field" , 42 ).execute ();
399
351
400
352
assertBusy (() -> assertTrue (client ().prepareGet ("index" , "type" , "1" ).get ().isExists ()));
401
353
402
354
// index another document, this time using dynamic mappings.
403
355
// The ack timeout of 0 on dynamic mapping updates makes it possible for the document to be indexed on the primary, even
404
356
// if the dynamic mapping update is not applied on the replica yet.
405
- ActionFuture <IndexResponse > dynamicMappingsFut = client ().prepareIndex ("index" , "type" , "2" ).setSource ("field2" , 42 ).execute ();
357
+ // this request does not change the cluster state, because the mapping is dynamic,
358
+ // we need to await and cancel committed publication
359
+ ActionFuture <IndexResponse > dynamicMappingsFut =
360
+ executeAndCancelCommittedPublication (client ().prepareIndex ("index" , "type" , "2" ).setSource ("field2" , 42 ));
406
361
407
362
// ...and wait for second mapping to be available on master
408
363
assertBusy (() -> {
@@ -421,22 +376,18 @@ public void onFailure(Exception e) {
421
376
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
422
377
// and not just because it takes time to replicate the indexing request to the replica
423
378
Thread .sleep (100 );
424
- assertThat (putMappingResponse .get (), equalTo ( null ));
425
- assertThat (docIndexResponse .get (), equalTo ( null ));
379
+ assertFalse (putMappingResponse .isDone ( ));
380
+ assertFalse (docIndexResponse .isDone ( ));
426
381
427
382
// Now make sure the indexing request finishes successfully
428
383
disruption .stopDisrupting ();
429
384
assertBusy (() -> {
430
- assertThat (putMappingResponse .get (), instanceOf (AcknowledgedResponse .class ));
431
- AcknowledgedResponse resp = (AcknowledgedResponse ) putMappingResponse .get ();
432
- assertTrue (resp .isAcknowledged ());
433
- assertThat (docIndexResponse .get (), instanceOf (IndexResponse .class ));
434
- IndexResponse docResp = (IndexResponse ) docIndexResponse .get ();
435
- assertEquals (Arrays .toString (docResp .getShardInfo ().getFailures ()),
436
- 2 , docResp .getShardInfo ().getTotal ()); // both shards should have succeeded
385
+ assertTrue (putMappingResponse .get (10 , TimeUnit .SECONDS ).isAcknowledged ());
386
+ assertThat (docIndexResponse .get (10 , TimeUnit .SECONDS ), instanceOf (IndexResponse .class ));
387
+ assertEquals (2 , docIndexResponse .get (10 , TimeUnit .SECONDS ).getShardInfo ().getTotal ()); // both shards should have succeeded
437
388
});
438
389
439
- assertThat (dynamicMappingsFut .get ().getResult (), equalTo (CREATED ));
390
+ assertThat (dynamicMappingsFut .get (10 , TimeUnit . SECONDS ).getResult (), equalTo (CREATED ));
440
391
}
441
392
442
393
}
0 commit comments