20
20
import org .elasticsearch .action .admin .indices .alias .Alias ;
21
21
import org .elasticsearch .action .admin .indices .create .CreateIndexRequest ;
22
22
import org .elasticsearch .action .admin .indices .create .CreateIndexResponse ;
23
+ import org .elasticsearch .action .admin .indices .mapping .put .PutMappingRequest ;
23
24
import org .elasticsearch .action .support .ActiveShardCount ;
25
+ import org .elasticsearch .action .support .master .AcknowledgedResponse ;
24
26
import org .elasticsearch .client .Client ;
25
27
import org .elasticsearch .cluster .ClusterChangedEvent ;
26
28
import org .elasticsearch .cluster .ClusterState ;
33
35
import org .elasticsearch .cluster .metadata .Metadata ;
34
36
import org .elasticsearch .cluster .routing .IndexRoutingTable ;
35
37
import org .elasticsearch .cluster .service .ClusterService ;
38
+ import org .elasticsearch .common .xcontent .XContentType ;
36
39
import org .elasticsearch .gateway .GatewayService ;
37
40
import org .elasticsearch .index .Index ;
38
41
import org .elasticsearch .index .IndexNotFoundException ;
49
52
import java .util .concurrent .CopyOnWriteArrayList ;
50
53
import java .util .function .BiConsumer ;
51
54
import java .util .function .Consumer ;
55
+ import java .util .function .Predicate ;
52
56
import java .util .stream .Collectors ;
53
57
54
58
import static org .elasticsearch .cluster .metadata .IndexMetadata .INDEX_FORMAT_SETTING ;
@@ -118,6 +122,10 @@ public boolean isAvailable() {
118
122
return this .indexState .indexAvailable ;
119
123
}
120
124
125
+ public boolean isMappingUpToDate () {
126
+ return this .indexState .mappingUpToDate ;
127
+ }
128
+
121
129
public boolean isStateRecovered () {
122
130
return this .indexState != State .UNRECOVERED_STATE ;
123
131
}
@@ -161,6 +169,7 @@ public void clusterChanged(ClusterChangedEvent event) {
161
169
final boolean isIndexUpToDate = indexMetadata == null ||
162
170
INDEX_FORMAT_SETTING .get (indexMetadata .getSettings ()) == systemIndexDescriptor .getIndexFormat ();
163
171
final boolean indexAvailable = checkIndexAvailable (event .state ());
172
+ final boolean mappingIsUpToDate = indexMetadata == null || checkIndexMappingUpToDate (event .state ());
164
173
final Version mappingVersion = oldestIndexMappingVersion (event .state ());
165
174
final String concreteIndexName = indexMetadata == null
166
175
? systemIndexDescriptor .getPrimaryIndex ()
@@ -180,8 +189,8 @@ public void clusterChanged(ClusterChangedEvent event) {
180
189
final IndexRoutingTable routingTable = event .state ().getRoutingTable ().index (indexMetadata .getIndex ());
181
190
indexHealth = new ClusterIndexHealth (indexMetadata , routingTable ).getStatus ();
182
191
}
183
- final State newState = new State (creationTime , isIndexUpToDate , indexAvailable , mappingVersion ,
184
- concreteIndexName , indexHealth , indexState );
192
+ final State newState = new State (creationTime , isIndexUpToDate , indexAvailable , mappingIsUpToDate , mappingVersion ,
193
+ concreteIndexName , indexHealth , indexState , event . state (). nodes (). getMinNodeVersion () );
185
194
this .indexState = newState ;
186
195
187
196
if (newState .equals (previousState ) == false ) {
@@ -211,6 +220,26 @@ private boolean checkIndexAvailable(ClusterState state) {
211
220
}
212
221
}
213
222
223
+ private boolean checkIndexMappingUpToDate (ClusterState clusterState ) {
224
+ /*
225
+ * The method reference looks wrong here, but it's just counter-intuitive. It expands to:
226
+ *
227
+ * mappingVersion -> Version.CURRENT.onOrBefore(mappingVersion)
228
+ *
229
+ * ...which is true if the mappings have been updated.
230
+ */
231
+ return checkIndexMappingVersionMatches (clusterState , Version .CURRENT ::onOrBefore );
232
+ }
233
+
234
+ private boolean checkIndexMappingVersionMatches (ClusterState clusterState , Predicate <Version > predicate ) {
235
+ return checkIndexMappingVersionMatches (this .systemIndexDescriptor .getAliasName (), clusterState , logger , predicate );
236
+ }
237
+
238
+ public static boolean checkIndexMappingVersionMatches (String indexName , ClusterState clusterState , Logger logger ,
239
+ Predicate <Version > predicate ) {
240
+ return loadIndexMappingVersions (indexName , clusterState , logger ).stream ().allMatch (predicate );
241
+ }
242
+
214
243
private Version oldestIndexMappingVersion (ClusterState clusterState ) {
215
244
final Set <Version > versions = loadIndexMappingVersions (systemIndexDescriptor .getAliasName (), clusterState , logger );
216
245
return versions .stream ().min (Version ::compareTo ).orElse (null );
@@ -220,9 +249,9 @@ private static Set<Version> loadIndexMappingVersions(String aliasName, ClusterSt
220
249
Set <Version > versions = new HashSet <>();
221
250
IndexMetadata indexMetadata = resolveConcreteIndex (aliasName , clusterState .metadata ());
222
251
if (indexMetadata != null ) {
223
- MappingMetadata mmd = indexMetadata .mapping ();
224
- if (mmd != null ) {
225
- versions .add (readMappingVersion (aliasName , mmd , logger ));
252
+ MappingMetadata mappingMetadata = indexMetadata .mapping ();
253
+ if (mappingMetadata != null ) {
254
+ versions .add (readMappingVersion (aliasName , mappingMetadata , logger ));
226
255
}
227
256
}
228
257
return versions ;
@@ -335,6 +364,29 @@ public void onFailure(Exception e) {
335
364
}
336
365
}
337
366
}, client .admin ().indices ()::create );
367
+ } else if (indexState .mappingUpToDate == false ) {
368
+ final String error = systemIndexDescriptor .checkMinimumNodeVersion ("create index" , indexState .minimumNodeVersion );
369
+ if (error != null ) {
370
+ consumer .accept (new IllegalStateException (error ));
371
+ } else {
372
+ logger .info (
373
+ "Index [{}] (alias [{}]) is not up to date. Updating mapping" ,
374
+ indexState .concreteIndexName ,
375
+ systemIndexDescriptor .getAliasName ()
376
+ );
377
+ PutMappingRequest request = new PutMappingRequest (indexState .concreteIndexName ).source (
378
+ systemIndexDescriptor .getMappings (),
379
+ XContentType .JSON
380
+ ).origin (systemIndexDescriptor .getOrigin ());
381
+ executeAsyncWithOrigin (client .threadPool ().getThreadContext (), systemIndexDescriptor .getOrigin (), request ,
382
+ ActionListener .<AcknowledgedResponse >wrap (putMappingResponse -> {
383
+ if (putMappingResponse .isAcknowledged ()) {
384
+ andThen .run ();
385
+ } else {
386
+ consumer .accept (new IllegalStateException ("put mapping request was not acknowledged" ));
387
+ }
388
+ }, consumer ), client .admin ().indices ()::putMapping );
389
+ }
338
390
} else {
339
391
andThen .run ();
340
392
}
@@ -362,24 +414,29 @@ public static boolean isIndexDeleted(State previousState, State currentState) {
362
414
* State of the security index.
363
415
*/
364
416
public static class State {
365
- public static final State UNRECOVERED_STATE = new State (null , false , false , null , null , null , null );
417
+ public static final State UNRECOVERED_STATE = new State (null , false , false , false , null , null , null , null , null );
366
418
public final Instant creationTime ;
367
419
public final boolean isIndexUpToDate ;
368
420
public final boolean indexAvailable ;
421
+ public final boolean mappingUpToDate ;
369
422
public final Version mappingVersion ;
370
423
public final String concreteIndexName ;
371
424
public final ClusterHealthStatus indexHealth ;
372
425
public final IndexMetadata .State indexState ;
426
+ public final Version minimumNodeVersion ;
373
427
374
428
public State (Instant creationTime , boolean isIndexUpToDate , boolean indexAvailable ,
375
- Version mappingVersion , String concreteIndexName , ClusterHealthStatus indexHealth , IndexMetadata .State indexState ) {
429
+ boolean mappingUpToDate , Version mappingVersion , String concreteIndexName , ClusterHealthStatus indexHealth ,
430
+ IndexMetadata .State indexState , Version minimumNodeVersion ) {
376
431
this .creationTime = creationTime ;
377
432
this .isIndexUpToDate = isIndexUpToDate ;
378
433
this .indexAvailable = indexAvailable ;
434
+ this .mappingUpToDate = mappingUpToDate ;
379
435
this .mappingVersion = mappingVersion ;
380
436
this .concreteIndexName = concreteIndexName ;
381
437
this .indexHealth = indexHealth ;
382
438
this .indexState = indexState ;
439
+ this .minimumNodeVersion = minimumNodeVersion ;
383
440
}
384
441
385
442
@ Override
@@ -390,10 +447,12 @@ public boolean equals(Object o) {
390
447
return Objects .equals (creationTime , state .creationTime ) &&
391
448
isIndexUpToDate == state .isIndexUpToDate &&
392
449
indexAvailable == state .indexAvailable &&
450
+ mappingUpToDate == state .mappingUpToDate &&
393
451
Objects .equals (mappingVersion , state .mappingVersion ) &&
394
452
Objects .equals (concreteIndexName , state .concreteIndexName ) &&
395
453
indexHealth == state .indexHealth &&
396
- indexState == state .indexState ;
454
+ indexState == state .indexState &&
455
+ Objects .equals (minimumNodeVersion , state .minimumNodeVersion );
397
456
}
398
457
399
458
public boolean indexExists () {
@@ -402,8 +461,8 @@ public boolean indexExists() {
402
461
403
462
@ Override
404
463
public int hashCode () {
405
- return Objects .hash (creationTime , isIndexUpToDate , indexAvailable , mappingVersion , concreteIndexName ,
406
- indexHealth );
464
+ return Objects .hash (creationTime , isIndexUpToDate , indexAvailable , mappingUpToDate , mappingVersion , concreteIndexName ,
465
+ indexHealth , minimumNodeVersion );
407
466
}
408
467
}
409
468
}
0 commit comments