@@ -117,7 +117,8 @@ private SecurityIndexManager(Client client, ClusterService clusterService, Strin
117
117
clusterService .addListener (this );
118
118
}
119
119
120
- private SecurityIndexManager (Client client , String aliasName , String internalIndexName , int internalIndexFormat ,
120
+ // protected for testing
121
+ protected SecurityIndexManager (Client client , String aliasName , String internalIndexName , int internalIndexFormat ,
121
122
Supplier <byte []> mappingSourceSupplier , State indexState ) {
122
123
this .aliasName = aliasName ;
123
124
this .internalIndexName = internalIndexName ;
@@ -351,65 +352,68 @@ public void checkIndexVersionThenExecute(final Consumer<Exception> consumer, fin
351
352
*/
352
353
public void prepareIndexIfNeededThenExecute (final Consumer <Exception > consumer , final Runnable andThen ) {
353
354
final State indexState = this .indexState ; // use a local copy so all checks execute against the same state!
354
- // TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
355
- if (indexState == State .UNRECOVERED_STATE ) {
356
- consumer .accept (new ElasticsearchStatusException (
357
- "Cluster state has not been recovered yet, cannot write to the [" + indexState .concreteIndexName + "] index" ,
358
- RestStatus .SERVICE_UNAVAILABLE ));
359
- } else if (indexState .indexExists () && indexState .isIndexUpToDate == false ) {
360
- consumer .accept (new IllegalStateException (
361
- "Index [" + indexState .concreteIndexName + "] is not on the current version."
362
- + "Security features relying on the index will not be available until the upgrade API is run on the index" ));
363
- } else if (indexState .indexExists () == false ) {
364
- assert indexState .concreteIndexName != null ;
365
- logger .info ("security index does not exist. Creating [{}] with alias [{}]" , indexState .concreteIndexName , this .aliasName );
366
- final byte [] mappingSource = mappingSourceSupplier .get ();
367
- final Tuple <String , Settings > mappingAndSettings = parseMappingAndSettingsFromTemplateBytes (mappingSource );
368
- CreateIndexRequest request = new CreateIndexRequest (indexState .concreteIndexName )
369
- .alias (new Alias (this .aliasName ))
370
- .mapping (MapperService .SINGLE_MAPPING_NAME , mappingAndSettings .v1 (), XContentType .JSON )
371
- .waitForActiveShards (ActiveShardCount .ALL )
372
- .settings (mappingAndSettings .v2 ());
373
- executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , request ,
374
- new ActionListener <CreateIndexResponse >() {
375
- @ Override
376
- public void onResponse (CreateIndexResponse createIndexResponse ) {
377
- if (createIndexResponse .isAcknowledged ()) {
378
- andThen .run ();
379
- } else {
380
- consumer .accept (new ElasticsearchException ("Failed to create security index" ));
381
- }
382
- }
383
-
384
- @ Override
385
- public void onFailure (Exception e ) {
386
- final Throwable cause = ExceptionsHelper .unwrapCause (e );
387
- if (cause instanceof ResourceAlreadyExistsException ) {
388
- // the index already exists - it was probably just created so this
389
- // node hasn't yet received the cluster state update with the index
390
- andThen .run ();
391
- } else {
392
- consumer .accept (e );
393
- }
355
+ try {
356
+ // TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
357
+ if (indexState == State .UNRECOVERED_STATE ) {
358
+ throw new ElasticsearchStatusException (
359
+ "Cluster state has not been recovered yet, cannot write to the [" + indexState .concreteIndexName + "] index" ,
360
+ RestStatus .SERVICE_UNAVAILABLE );
361
+ } else if (indexState .indexExists () && indexState .isIndexUpToDate == false ) {
362
+ throw new IllegalStateException ("Index [" + indexState .concreteIndexName + "] is not on the current version."
363
+ + "Security features relying on the index will not be available until the upgrade API is run on the index" );
364
+ } else if (indexState .indexExists () == false ) {
365
+ assert indexState .concreteIndexName != null ;
366
+ logger .info ("security index does not exist. Creating [{}] with alias [{}]" , indexState .concreteIndexName , this .aliasName );
367
+ final byte [] mappingSource = mappingSourceSupplier .get ();
368
+ final Tuple <String , Settings > mappingAndSettings = parseMappingAndSettingsFromTemplateBytes (mappingSource );
369
+ CreateIndexRequest request = new CreateIndexRequest (indexState .concreteIndexName )
370
+ .alias (new Alias (this .aliasName ))
371
+ .mapping (MapperService .SINGLE_MAPPING_NAME , mappingAndSettings .v1 (), XContentType .JSON )
372
+ .waitForActiveShards (ActiveShardCount .ALL )
373
+ .settings (mappingAndSettings .v2 ());
374
+ executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , request ,
375
+ new ActionListener <CreateIndexResponse >() {
376
+ @ Override
377
+ public void onResponse (CreateIndexResponse createIndexResponse ) {
378
+ if (createIndexResponse .isAcknowledged ()) {
379
+ andThen .run ();
380
+ } else {
381
+ consumer .accept (new ElasticsearchException ("Failed to create security index" ));
394
382
}
395
- }, client .admin ().indices ()::create );
396
- } else if (indexState .mappingUpToDate == false ) {
397
- logger .info ("Index [{}] (alias [{}]) is not up to date. Updating mapping" , indexState .concreteIndexName , this .aliasName );
398
- final byte [] mappingSource = mappingSourceSupplier .get ();
399
- final Tuple <String , Settings > mappingAndSettings = parseMappingAndSettingsFromTemplateBytes (mappingSource );
400
- PutMappingRequest request = new PutMappingRequest (indexState .concreteIndexName )
401
- .source (mappingAndSettings .v1 (), XContentType .JSON )
402
- .type (MapperService .SINGLE_MAPPING_NAME );
403
- executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , request ,
404
- ActionListener .<AcknowledgedResponse >wrap (putMappingResponse -> {
405
- if (putMappingResponse .isAcknowledged ()) {
383
+ }
384
+
385
+ @ Override
386
+ public void onFailure (Exception e ) {
387
+ final Throwable cause = ExceptionsHelper .unwrapCause (e );
388
+ if (cause instanceof ResourceAlreadyExistsException ) {
389
+ // the index already exists - it was probably just created so this
390
+ // node hasn't yet received the cluster state update with the index
406
391
andThen .run ();
407
392
} else {
408
- consumer .accept (new IllegalStateException ( "put mapping request was not acknowledged" ) );
393
+ consumer .accept (e );
409
394
}
410
- }, consumer ), client .admin ().indices ()::putMapping );
411
- } else {
412
- andThen .run ();
395
+ }
396
+ }, client .admin ().indices ()::create );
397
+ } else if (indexState .mappingUpToDate == false ) {
398
+ logger .info ("Index [{}] (alias [{}]) is not up to date. Updating mapping" , indexState .concreteIndexName , this .aliasName );
399
+ final byte [] mappingSource = mappingSourceSupplier .get ();
400
+ final Tuple <String , Settings > mappingAndSettings = parseMappingAndSettingsFromTemplateBytes (mappingSource );
401
+ PutMappingRequest request = new PutMappingRequest (indexState .concreteIndexName )
402
+ .source (mappingAndSettings .v1 (), XContentType .JSON )
403
+ .type (MapperService .SINGLE_MAPPING_NAME );
404
+ executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , request ,
405
+ ActionListener .<AcknowledgedResponse >wrap (putMappingResponse -> {
406
+ if (putMappingResponse .isAcknowledged ()) {
407
+ andThen .run ();
408
+ } else {
409
+ consumer .accept (new IllegalStateException ("put mapping request was not acknowledged" ));
410
+ }
411
+ }, consumer ), client .admin ().indices ()::putMapping );
412
+ } else {
413
+ andThen .run ();
414
+ }
415
+ } catch (Exception e ) {
416
+ consumer .accept (e );
413
417
}
414
418
}
415
419
@@ -433,7 +437,7 @@ private static byte[] readTemplateAsBytes(String templateName) {
433
437
SecurityIndexManager .TEMPLATE_VERSION_PATTERN ).getBytes (StandardCharsets .UTF_8 );
434
438
}
435
439
436
- private static Tuple <String , Settings > parseMappingAndSettingsFromTemplateBytes (byte [] template ) {
440
+ private static Tuple <String , Settings > parseMappingAndSettingsFromTemplateBytes (byte [] template ) throws IOException {
437
441
final PutIndexTemplateRequest request = new PutIndexTemplateRequest ("name_is_not_important" ).source (template , XContentType .JSON );
438
442
final String mappingSource = request .mappings ().get (MapperService .SINGLE_MAPPING_NAME );
439
443
try (XContentParser parser = XContentType .JSON .xContent ().createParser (NamedXContentRegistry .EMPTY ,
@@ -446,8 +450,6 @@ private static Tuple<String, Settings> parseMappingAndSettingsFromTemplateBytes(
446
450
XContentBuilder builder = JsonXContent .contentBuilder ();
447
451
builder .generator ().copyCurrentStructure (parser );
448
452
return new Tuple <>(Strings .toString (builder ), request .settings ());
449
- } catch (IOException e ) {
450
- throw ExceptionsHelper .convertToRuntime (e );
451
453
}
452
454
}
453
455
0 commit comments