Skip to content

Commit cd91992

Browse files
authored
Only fetch mapping updates when necessary (#33182)
Today we fetch the mapping from the leader and apply it as a mapping update whenever the index metadata version on the leader changes. Yet, the index metadata can change for many reasons other than a mapping update (e.g., settings updates, adding an alias, or a replica being promoted to a primary among many other reasons). This commit builds on the addition of a mapping version to the index metadata to only fetch mapping updates when the mapping version increases. This reduces the number of these fetches and application of mappings on the follower to the bare minimum.
1 parent 5b11df9 commit cd91992

File tree

9 files changed

+87
-88
lines changed

9 files changed

+87
-88
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,10 @@ public String toString() {
161161

162162
public static final class Response extends ActionResponse {
163163

164-
private long indexMetadataVersion;
164+
private long mappingVersion;
165165

166-
public long getIndexMetadataVersion() {
167-
return indexMetadataVersion;
166+
public long getMappingVersion() {
167+
return mappingVersion;
168168
}
169169

170170
private long globalCheckpoint;
@@ -188,8 +188,8 @@ public Translog.Operation[] getOperations() {
188188
Response() {
189189
}
190190

191-
Response(final long indexMetadataVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) {
192-
this.indexMetadataVersion = indexMetadataVersion;
191+
Response(final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) {
192+
this.mappingVersion = mappingVersion;
193193
this.globalCheckpoint = globalCheckpoint;
194194
this.maxSeqNo = maxSeqNo;
195195
this.operations = operations;
@@ -198,7 +198,7 @@ public Translog.Operation[] getOperations() {
198198
@Override
199199
public void readFrom(final StreamInput in) throws IOException {
200200
super.readFrom(in);
201-
indexMetadataVersion = in.readVLong();
201+
mappingVersion = in.readVLong();
202202
globalCheckpoint = in.readZLong();
203203
maxSeqNo = in.readZLong();
204204
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
@@ -207,7 +207,7 @@ public void readFrom(final StreamInput in) throws IOException {
207207
@Override
208208
public void writeTo(final StreamOutput out) throws IOException {
209209
super.writeTo(out);
210-
out.writeVLong(indexMetadataVersion);
210+
out.writeVLong(mappingVersion);
211211
out.writeZLong(globalCheckpoint);
212212
out.writeZLong(maxSeqNo);
213213
out.writeArray(Translog.Operation::writeOperation, operations);
@@ -218,15 +218,15 @@ public boolean equals(final Object o) {
218218
if (this == o) return true;
219219
if (o == null || getClass() != o.getClass()) return false;
220220
final Response that = (Response) o;
221-
return indexMetadataVersion == that.indexMetadataVersion &&
221+
return mappingVersion == that.mappingVersion &&
222222
globalCheckpoint == that.globalCheckpoint &&
223223
maxSeqNo == that.maxSeqNo &&
224224
Arrays.equals(operations, that.operations);
225225
}
226226

227227
@Override
228228
public int hashCode() {
229-
return Objects.hash(indexMetadataVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations));
229+
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations));
230230
}
231231
}
232232

@@ -252,15 +252,15 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
252252
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
253253
IndexShard indexShard = indexService.getShard(request.getShard().id());
254254
final SeqNoStats seqNoStats = indexShard.seqNoStats();
255-
final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
255+
final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
256256

257257
final Translog.Operation[] operations = getOperations(
258258
indexShard,
259259
seqNoStats.getGlobalCheckpoint(),
260260
request.fromSeqNo,
261261
request.maxOperationCount,
262262
request.maxOperationSizeInBytes);
263-
return new Response(indexMetaDataVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
263+
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
264264
}
265265

266266
@Override

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
8080
private long followerMaxSeqNo = 0;
8181
private int numConcurrentReads = 0;
8282
private int numConcurrentWrites = 0;
83-
private long currentIndexMetadataVersion = 0;
83+
private long currentMappingVersion = 0;
8484
private long totalFetchTimeMillis = 0;
8585
private long numberOfSuccessfulFetches = 0;
8686
private long numberOfFailedFetches = 0;
@@ -131,14 +131,13 @@ void start(
131131
this.lastRequestedSeqNo = followerGlobalCheckpoint;
132132
}
133133

134-
// Forcefully updates follower mapping, this gets us the leader imd version and
135-
// makes sure that leader and follower mapping are identical.
136-
updateMapping(imdVersion -> {
134+
// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
135+
updateMapping(mappingVersion -> {
137136
synchronized (ShardFollowNodeTask.this) {
138-
currentIndexMetadataVersion = imdVersion;
137+
currentMappingVersion = mappingVersion;
139138
}
140-
LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, indexMetaDataVersion={}",
141-
params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, imdVersion);
139+
LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, mappingVersion={}",
140+
params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, mappingVersion);
142141
coordinateReads();
143142
});
144143
}
@@ -258,7 +257,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
258257
}
259258

260259
void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
261-
maybeUpdateMapping(response.getIndexMetadataVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response));
260+
maybeUpdateMapping(response.getMappingVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response));
262261
}
263262

264263
/** Called when some operations are fetched from the leading */
@@ -344,16 +343,16 @@ private synchronized void handleWriteResponse(final BulkShardOperationsResponse
344343
coordinateReads();
345344
}
346345

347-
private synchronized void maybeUpdateMapping(Long minimumRequiredIndexMetadataVersion, Runnable task) {
348-
if (currentIndexMetadataVersion >= minimumRequiredIndexMetadataVersion) {
349-
LOGGER.trace("{} index metadata version [{}] is higher or equal than minimum required index metadata version [{}]",
350-
params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion);
346+
private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, Runnable task) {
347+
if (currentMappingVersion >= minimumRequiredMappingVersion) {
348+
LOGGER.trace("{} mapping version [{}] is higher or equal than minimum required mapping version [{}]",
349+
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
351350
task.run();
352351
} else {
353-
LOGGER.trace("{} updating mapping, index metadata version [{}] is lower than minimum required index metadata version [{}]",
354-
params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion);
355-
updateMapping(imdVersion -> {
356-
currentIndexMetadataVersion = imdVersion;
352+
LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]",
353+
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
354+
updateMapping(mappingVersion -> {
355+
currentMappingVersion = mappingVersion;
357356
task.run();
358357
});
359358
}
@@ -422,7 +421,7 @@ public synchronized Status getStatus() {
422421
numConcurrentReads,
423422
numConcurrentWrites,
424423
buffer.size(),
425-
currentIndexMetadataVersion,
424+
currentMappingVersion,
426425
totalFetchTimeMillis,
427426
numberOfSuccessfulFetches,
428427
numberOfFailedFetches,
@@ -448,7 +447,7 @@ public static class Status implements Task.Status {
448447
static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads");
449448
static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes");
450449
static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes");
451-
static final ParseField INDEX_METADATA_VERSION_FIELD = new ParseField("index_metadata_version");
450+
static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version");
452451
static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis");
453452
static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField("number_of_successful_fetches");
454453
static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField("number_of_failed_fetches");
@@ -504,7 +503,7 @@ public static class Status implements Task.Status {
504503
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD);
505504
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD);
506505
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD);
507-
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD);
506+
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD);
508507
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD);
509508
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD);
510509
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD);
@@ -582,10 +581,10 @@ public int numberOfQueuedWrites() {
582581
return numberOfQueuedWrites;
583582
}
584583

585-
private final long indexMetadataVersion;
584+
private final long mappingVersion;
586585

587-
public long indexMetadataVersion() {
588-
return indexMetadataVersion;
586+
public long mappingVersion() {
587+
return mappingVersion;
589588
}
590589

591590
private final long totalFetchTimeMillis;
@@ -658,7 +657,7 @@ public NavigableMap<Long, ElasticsearchException> fetchExceptions() {
658657
final int numberOfConcurrentReads,
659658
final int numberOfConcurrentWrites,
660659
final int numberOfQueuedWrites,
661-
final long indexMetadataVersion,
660+
final long mappingVersion,
662661
final long totalFetchTimeMillis,
663662
final long numberOfSuccessfulFetches,
664663
final long numberOfFailedFetches,
@@ -678,7 +677,7 @@ public NavigableMap<Long, ElasticsearchException> fetchExceptions() {
678677
this.numberOfConcurrentReads = numberOfConcurrentReads;
679678
this.numberOfConcurrentWrites = numberOfConcurrentWrites;
680679
this.numberOfQueuedWrites = numberOfQueuedWrites;
681-
this.indexMetadataVersion = indexMetadataVersion;
680+
this.mappingVersion = mappingVersion;
682681
this.totalFetchTimeMillis = totalFetchTimeMillis;
683682
this.numberOfSuccessfulFetches = numberOfSuccessfulFetches;
684683
this.numberOfFailedFetches = numberOfFailedFetches;
@@ -701,7 +700,7 @@ public Status(final StreamInput in) throws IOException {
701700
this.numberOfConcurrentReads = in.readVInt();
702701
this.numberOfConcurrentWrites = in.readVInt();
703702
this.numberOfQueuedWrites = in.readVInt();
704-
this.indexMetadataVersion = in.readVLong();
703+
this.mappingVersion = in.readVLong();
705704
this.totalFetchTimeMillis = in.readVLong();
706705
this.numberOfSuccessfulFetches = in.readVLong();
707706
this.numberOfFailedFetches = in.readVLong();
@@ -730,7 +729,7 @@ public void writeTo(final StreamOutput out) throws IOException {
730729
out.writeVInt(numberOfConcurrentReads);
731730
out.writeVInt(numberOfConcurrentWrites);
732731
out.writeVInt(numberOfQueuedWrites);
733-
out.writeVLong(indexMetadataVersion);
732+
out.writeVLong(mappingVersion);
734733
out.writeVLong(totalFetchTimeMillis);
735734
out.writeVLong(numberOfSuccessfulFetches);
736735
out.writeVLong(numberOfFailedFetches);
@@ -756,7 +755,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
756755
builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads);
757756
builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites);
758757
builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites);
759-
builder.field(INDEX_METADATA_VERSION_FIELD.getPreferredName(), indexMetadataVersion);
758+
builder.field(MAPPING_VERSION_FIELD.getPreferredName(), mappingVersion);
760759
builder.humanReadableField(
761760
TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(),
762761
"total_fetch_time",
@@ -815,7 +814,7 @@ public boolean equals(final Object o) {
815814
numberOfConcurrentReads == that.numberOfConcurrentReads &&
816815
numberOfConcurrentWrites == that.numberOfConcurrentWrites &&
817816
numberOfQueuedWrites == that.numberOfQueuedWrites &&
818-
indexMetadataVersion == that.indexMetadataVersion &&
817+
mappingVersion == that.mappingVersion &&
819818
totalFetchTimeMillis == that.totalFetchTimeMillis &&
820819
numberOfSuccessfulFetches == that.numberOfSuccessfulFetches &&
821820
numberOfFailedFetches == that.numberOfFailedFetches &&
@@ -837,7 +836,7 @@ public int hashCode() {
837836
numberOfConcurrentReads,
838837
numberOfConcurrentWrites,
839838
numberOfQueuedWrites,
840-
indexMetadataVersion,
839+
mappingVersion,
841840
totalFetchTimeMillis,
842841
numberOfSuccessfulFetches,
843842
numberOfFailedFetches,

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
115115
putMappingRequest.type(mappingMetaData.type());
116116
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
117117
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
118-
putMappingResponse -> handler.accept(indexMetaData.getVersion()),
118+
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
119119
errorHandler));
120120
}, errorHandler));
121121
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
1212

1313
@Override
1414
protected ShardChangesAction.Response createTestInstance() {
15-
final long indexMetadataVersion = randomNonNegativeLong();
15+
final long mappingVersion = randomNonNegativeLong();
1616
final long leaderGlobalCheckpoint = randomNonNegativeLong();
1717
final long leaderMaxSeqNo = randomLongBetween(leaderGlobalCheckpoint, Long.MAX_VALUE);
1818
final int numOps = randomInt(8);
1919
final Translog.Operation[] operations = new Translog.Operation[numOps];
2020
for (int i = 0; i < numOps; i++) {
2121
operations[i] = new Translog.NoOp(i, 0, "test");
2222
}
23-
return new ShardChangesAction.Response(indexMetadataVersion, leaderGlobalCheckpoint, leaderMaxSeqNo, operations);
23+
return new ShardChangesAction.Response(mappingVersion, leaderGlobalCheckpoint, leaderMaxSeqNo, operations);
2424
}
2525

2626
@Override

0 commit comments

Comments
 (0)