Skip to content

Commit 610a9c8

Browse files
committed
Introduce mapping version to index metadata (#33147)
This commit introduces mapping version to index metadata. This value is monotonically increasing and is updated on mapping updates. This will be useful in cross-cluster replication so that we can request mapping updates from the leader only when there is a mapping update as opposed to the strategy we employ today which is to request a mapping update any time there is an index metadata update. As index metadata updates can occur for many reasons other than mapping updates, this leads to some unnecessary requests and work in cross-cluster replication.
1 parent 35b4bda commit 610a9c8

File tree

12 files changed

+188
-17
lines changed

12 files changed

+188
-17
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ public String toString() {
284284
final String TAB = " ";
285285
for (IndexMetaData indexMetaData : metaData) {
286286
sb.append(TAB).append(indexMetaData.getIndex());
287-
sb.append(": v[").append(indexMetaData.getVersion()).append("]\n");
287+
sb.append(": v[").append(indexMetaData.getVersion()).append("], mv[").append(indexMetaData.getMappingVersion()).append("]\n");
288288
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
289289
sb.append(TAB).append(TAB).append(shard).append(": ");
290290
sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], ");

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.carrotsearch.hppc.cursors.IntObjectCursor;
2424
import com.carrotsearch.hppc.cursors.ObjectCursor;
2525
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
26+
import org.elasticsearch.Assertions;
2627
import org.elasticsearch.Version;
2728
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
2829
import org.elasticsearch.action.support.ActiveShardCount;
@@ -291,6 +292,7 @@ public Iterator<Setting<Integer>> settings() {
291292

292293
public static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations";
293294
static final String KEY_VERSION = "version";
295+
static final String KEY_MAPPING_VERSION = "mapping_version";
294296
static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
295297
static final String KEY_SETTINGS = "settings";
296298
static final String KEY_STATE = "state";
@@ -309,6 +311,9 @@ public Iterator<Setting<Integer>> settings() {
309311

310312
private final Index index;
311313
private final long version;
314+
315+
private final long mappingVersion;
316+
312317
private final long[] primaryTerms;
313318

314319
private final State state;
@@ -336,7 +341,7 @@ public Iterator<Setting<Integer>> settings() {
336341
private final ActiveShardCount waitForActiveShards;
337342
private final ImmutableOpenMap<String, RolloverInfo> rolloverInfos;
338343

339-
private IndexMetaData(Index index, long version, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
344+
private IndexMetaData(Index index, long version, long mappingVersion, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
340345
ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
341346
ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> inSyncAllocationIds,
342347
DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
@@ -345,6 +350,8 @@ private IndexMetaData(Index index, long version, long[] primaryTerms, State stat
345350

346351
this.index = index;
347352
this.version = version;
353+
assert mappingVersion >= 0 : mappingVersion;
354+
this.mappingVersion = mappingVersion;
348355
this.primaryTerms = primaryTerms;
349356
assert primaryTerms.length == numberOfShards;
350357
this.state = state;
@@ -394,6 +401,9 @@ public long getVersion() {
394401
return this.version;
395402
}
396403

404+
public long getMappingVersion() {
405+
return mappingVersion;
406+
}
397407

398408
/**
399409
* The term of the current selected primary. This is a non-negative number incremented when
@@ -644,6 +654,7 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
644654
private final String index;
645655
private final int routingNumShards;
646656
private final long version;
657+
private final long mappingVersion;
647658
private final long[] primaryTerms;
648659
private final State state;
649660
private final Settings settings;
@@ -656,6 +667,7 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
656667
IndexMetaDataDiff(IndexMetaData before, IndexMetaData after) {
657668
index = after.index.getName();
658669
version = after.version;
670+
mappingVersion = after.mappingVersion;
659671
routingNumShards = after.routingNumShards;
660672
state = after.state;
661673
settings = after.settings;
@@ -672,6 +684,11 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
672684
index = in.readString();
673685
routingNumShards = in.readInt();
674686
version = in.readLong();
687+
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
688+
mappingVersion = in.readVLong();
689+
} else {
690+
mappingVersion = 1;
691+
}
675692
state = State.fromId(in.readByte());
676693
settings = Settings.readSettingsFromStream(in);
677694
primaryTerms = in.readVLongArray();
@@ -708,6 +725,9 @@ public void writeTo(StreamOutput out) throws IOException {
708725
out.writeString(index);
709726
out.writeInt(routingNumShards);
710727
out.writeLong(version);
728+
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
729+
out.writeVLong(mappingVersion);
730+
}
711731
out.writeByte(state.id);
712732
Settings.writeSettingsToStream(settings, out);
713733
out.writeVLongArray(primaryTerms);
@@ -724,6 +744,7 @@ public void writeTo(StreamOutput out) throws IOException {
724744
public IndexMetaData apply(IndexMetaData part) {
725745
Builder builder = builder(index);
726746
builder.version(version);
747+
builder.mappingVersion(mappingVersion);
727748
builder.setRoutingNumShards(routingNumShards);
728749
builder.state(state);
729750
builder.settings(settings);
@@ -740,6 +761,11 @@ public IndexMetaData apply(IndexMetaData part) {
740761
public static IndexMetaData readFrom(StreamInput in) throws IOException {
741762
Builder builder = new Builder(in.readString());
742763
builder.version(in.readLong());
764+
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
765+
builder.mappingVersion(in.readVLong());
766+
} else {
767+
builder.mappingVersion(1);
768+
}
743769
builder.setRoutingNumShards(in.readInt());
744770
builder.state(State.fromId(in.readByte()));
745771
builder.settings(readSettingsFromStream(in));
@@ -779,6 +805,9 @@ public static IndexMetaData readFrom(StreamInput in) throws IOException {
779805
public void writeTo(StreamOutput out) throws IOException {
780806
out.writeString(index.getName()); // uuid will come as part of settings
781807
out.writeLong(version);
808+
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
809+
out.writeVLong(mappingVersion);
810+
}
782811
out.writeInt(routingNumShards);
783812
out.writeByte(state.id());
784813
writeSettingsToStream(settings, out);
@@ -822,6 +851,7 @@ public static class Builder {
822851
private String index;
823852
private State state = State.OPEN;
824853
private long version = 1;
854+
private long mappingVersion = 1;
825855
private long[] primaryTerms = null;
826856
private Settings settings = Settings.Builder.EMPTY_SETTINGS;
827857
private final ImmutableOpenMap.Builder<String, MappingMetaData> mappings;
@@ -844,6 +874,7 @@ public Builder(IndexMetaData indexMetaData) {
844874
this.index = indexMetaData.getIndex().getName();
845875
this.state = indexMetaData.state;
846876
this.version = indexMetaData.version;
877+
this.mappingVersion = indexMetaData.mappingVersion;
847878
this.settings = indexMetaData.getSettings();
848879
this.primaryTerms = indexMetaData.primaryTerms.clone();
849880
this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings);
@@ -1010,6 +1041,15 @@ public Builder version(long version) {
10101041
return this;
10111042
}
10121043

1044+
public long mappingVersion() {
1045+
return mappingVersion;
1046+
}
1047+
1048+
public Builder mappingVersion(final long mappingVersion) {
1049+
this.mappingVersion = mappingVersion;
1050+
return this;
1051+
}
1052+
10131053
/**
10141054
* returns the primary term for the given shard.
10151055
* See {@link IndexMetaData#primaryTerm(int)} for more information.
@@ -1137,7 +1177,7 @@ public IndexMetaData build() {
11371177

11381178
final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
11391179

1140-
return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
1180+
return new IndexMetaData(new Index(index, uuid), version, mappingVersion, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
11411181
tmpAliases.build(), customs.build(), filledInSyncAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
11421182
indexCreatedVersion, indexUpgradedVersion, getRoutingNumShards(), routingPartitionSize, waitForActiveShards, rolloverInfos.build());
11431183
}
@@ -1146,6 +1186,7 @@ public static void toXContent(IndexMetaData indexMetaData, XContentBuilder build
11461186
builder.startObject(indexMetaData.getIndex().getName());
11471187

11481188
builder.field(KEY_VERSION, indexMetaData.getVersion());
1189+
builder.field(KEY_MAPPING_VERSION, indexMetaData.getMappingVersion());
11491190
builder.field(KEY_ROUTING_NUM_SHARDS, indexMetaData.getRoutingNumShards());
11501191
builder.field(KEY_STATE, indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH));
11511192

@@ -1219,6 +1260,7 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
12191260
if (token != XContentParser.Token.START_OBJECT) {
12201261
throw new IllegalArgumentException("expected object but got a " + token);
12211262
}
1263+
boolean mappingVersion = false;
12221264
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
12231265
if (token == XContentParser.Token.FIELD_NAME) {
12241266
currentFieldName = parser.currentName();
@@ -1317,6 +1359,9 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
13171359
builder.state(State.fromString(parser.text()));
13181360
} else if (KEY_VERSION.equals(currentFieldName)) {
13191361
builder.version(parser.longValue());
1362+
} else if (KEY_MAPPING_VERSION.equals(currentFieldName)) {
1363+
mappingVersion = true;
1364+
builder.mappingVersion(parser.longValue());
13201365
} else if (KEY_ROUTING_NUM_SHARDS.equals(currentFieldName)) {
13211366
builder.setRoutingNumShards(parser.intValue());
13221367
} else {
@@ -1326,6 +1371,9 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
13261371
throw new IllegalArgumentException("Unexpected token " + token);
13271372
}
13281373
}
1374+
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(Version.V_6_5_0)) {
1375+
assert mappingVersion : "mapping version should be present for indices created on or after 6.5.0";
1376+
}
13291377
return builder.build();
13301378
}
13311379
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@
2121

2222
import com.carrotsearch.hppc.cursors.ObjectCursor;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
24-
import org.elasticsearch.core.internal.io.IOUtils;
2524
import org.elasticsearch.action.ActionListener;
2625
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
2726
import org.elasticsearch.cluster.AckedClusterStateTaskListener;
28-
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2927
import org.elasticsearch.cluster.ClusterState;
3028
import org.elasticsearch.cluster.ClusterStateTaskConfig;
29+
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
3130
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
3231
import org.elasticsearch.cluster.node.DiscoveryNode;
3332
import org.elasticsearch.cluster.service.ClusterService;
@@ -38,6 +37,7 @@
3837
import org.elasticsearch.common.inject.Inject;
3938
import org.elasticsearch.common.settings.Settings;
4039
import org.elasticsearch.common.unit.TimeValue;
40+
import org.elasticsearch.core.internal.io.IOUtils;
4141
import org.elasticsearch.index.Index;
4242
import org.elasticsearch.index.IndexService;
4343
import org.elasticsearch.index.mapper.DocumentMapper;
@@ -300,6 +300,7 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
300300
MetaData.Builder builder = MetaData.builder(metaData);
301301
boolean updated = false;
302302
for (IndexMetaData indexMetaData : updateList) {
303+
boolean updatedMapping = false;
303304
// do the actual merge here on the master, and update the mapping source
304305
// we use the exact same indexService and metadata we used to validate above here to actually apply the update
305306
final Index index = indexMetaData.getIndex();
@@ -316,7 +317,7 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
316317
if (existingSource.equals(updatedSource)) {
317318
// same source, no changes, ignore it
318319
} else {
319-
updated = true;
320+
updatedMapping = true;
320321
// use the merged mapping source
321322
if (logger.isDebugEnabled()) {
322323
logger.debug("{} update_mapping [{}] with source [{}]", index, mergedMapper.type(), updatedSource);
@@ -326,7 +327,7 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
326327

327328
}
328329
} else {
329-
updated = true;
330+
updatedMapping = true;
330331
if (logger.isDebugEnabled()) {
331332
logger.debug("{} create_mapping [{}] with source [{}]", index, mappingType, updatedSource);
332333
} else if (logger.isInfoEnabled()) {
@@ -340,7 +341,16 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
340341
for (DocumentMapper mapper : mapperService.docMappers(true)) {
341342
indexMetaDataBuilder.putMapping(new MappingMetaData(mapper.mappingSource()));
342343
}
344+
if (updatedMapping) {
345+
indexMetaDataBuilder.mappingVersion(1 + indexMetaDataBuilder.mappingVersion());
346+
}
347+
/*
348+
* This implicitly increments the index metadata version and builds the index metadata. This means that we need to have
349+
* already incremented the mapping version if necessary. Therefore, the mapping version increment must remain before this
350+
* statement.
351+
*/
343352
builder.put(indexMetaDataBuilder);
353+
updated |= updatedMapping;
344354
}
345355
if (updated) {
346356
return ClusterState.builder(currentState).metaData(builder).build();

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,8 +521,8 @@ List<SearchOperationListener> getSearchOperationListener() { // pkg private for
521521
}
522522

523523
@Override
524-
public boolean updateMapping(IndexMetaData indexMetaData) throws IOException {
525-
return mapperService().updateMapping(indexMetaData);
524+
public boolean updateMapping(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) throws IOException {
525+
return mapperService().updateMapping(currentIndexMetaData, newIndexMetaData);
526526
}
527527

528528
private class StoreCloseListener implements Store.OnClose {

0 commit comments

Comments
 (0)