Skip to content

Commit cecfa5b

Browse files
authored
Tighten mapping syncing in ccr remote restore (#38071)
There are two issues regarding the way that we sync mapping from leader to follower when a ccr restore is completed: 1. The returned mapping from a cluster service might not be up to date as the mapping of the restored index commit. 2. We should not compare the mapping version of the follower and the leader. They are not related to one another. Moreover, I think we should only ensure that once the restore is done, the mapping on the follower should be at least the mapping of the copied index commit. We don't have to sync the mapping which is updated after we have opened a session. Relates #36879 Closes #37887
1 parent 5a33816 commit cecfa5b

File tree

5 files changed

+79
-70
lines changed

5 files changed

+79
-70
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,23 @@
66
package org.elasticsearch.xpack.ccr.action;
77

88
import org.elasticsearch.ElasticsearchStatusException;
9+
import org.elasticsearch.action.ActionListener;
910
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1011
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
1112
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
13+
import org.elasticsearch.client.Client;
1214
import org.elasticsearch.cluster.metadata.IndexMetaData;
1315
import org.elasticsearch.cluster.metadata.MappingMetaData;
16+
import org.elasticsearch.cluster.metadata.MetaData;
17+
import org.elasticsearch.common.unit.TimeValue;
1418
import org.elasticsearch.common.xcontent.XContentType;
1519
import org.elasticsearch.index.Index;
1620
import org.elasticsearch.rest.RestStatus;
1721
import org.elasticsearch.xpack.ccr.CcrSettings;
1822

1923
import java.util.Arrays;
2024
import java.util.List;
25+
import java.util.function.Supplier;
2126
import java.util.stream.Collectors;
2227

2328
public final class CcrRequests {
@@ -40,6 +45,39 @@ public static PutMappingRequest putMappingRequest(String followerIndex, MappingM
4045
return putMappingRequest;
4146
}
4247

48+
/**
49+
* Gets an {@link IndexMetaData} of the given index. The mapping version and metadata version of the returned {@link IndexMetaData}
50+
* must be at least the provided {@code mappingVersion} and {@code metadataVersion} respectively.
51+
*/
52+
public static void getIndexMetadata(Client client, Index index, long mappingVersion, long metadataVersion,
53+
Supplier<TimeValue> timeoutSupplier, ActionListener<IndexMetaData> listener) {
54+
final ClusterStateRequest request = CcrRequests.metaDataRequest(index.getName());
55+
if (metadataVersion > 0) {
56+
request.waitForMetaDataVersion(metadataVersion).waitForTimeout(timeoutSupplier.get());
57+
}
58+
client.admin().cluster().state(request, ActionListener.wrap(
59+
response -> {
60+
if (response.getState() == null) {
61+
assert metadataVersion > 0 : metadataVersion;
62+
throw new IllegalStateException("timeout to get cluster state with" +
63+
" metadata version [" + metadataVersion + "], mapping version [" + mappingVersion + "]");
64+
}
65+
final MetaData metaData = response.getState().metaData();
66+
final IndexMetaData indexMetaData = metaData.getIndexSafe(index);
67+
if (indexMetaData.getMappingVersion() >= mappingVersion) {
68+
listener.onResponse(indexMetaData);
69+
return;
70+
}
71+
if (timeoutSupplier.get().nanos() < 0) {
72+
throw new IllegalStateException("timeout to get cluster state with mapping version [" + mappingVersion + "]");
73+
}
74+
// ask for the next version.
75+
getIndexMetadata(client, index, mappingVersion, metaData.version() + 1, timeoutSupplier, listener);
76+
},
77+
listener::onFailure
78+
));
79+
}
80+
4381
public static final MappingRequestValidator CCR_PUT_MAPPING_REQUEST_VALIDATOR = (request, state, indices) -> {
4482
if (request.origin() == null) {
4583
return null; // a put-mapping-request on old versions does not have origin.

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.cluster.ClusterState;
2525
import org.elasticsearch.cluster.metadata.IndexMetaData;
2626
import org.elasticsearch.cluster.metadata.MappingMetaData;
27-
import org.elasticsearch.cluster.metadata.MetaData;
2827
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2928
import org.elasticsearch.cluster.service.ClusterService;
3029
import org.elasticsearch.common.CheckedConsumer;
@@ -59,6 +58,7 @@
5958
import java.util.function.BiConsumer;
6059
import java.util.function.Consumer;
6160
import java.util.function.LongConsumer;
61+
import java.util.function.Supplier;
6262

6363
import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient;
6464
import static org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction.extractLeaderShardHistoryUUIDs;
@@ -111,7 +111,9 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
111111
@Override
112112
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
113113
final Index followerIndex = params.getFollowShardId().getIndex();
114-
getIndexMetadata(minRequiredMappingVersion, 0L, params, ActionListener.wrap(
114+
final Index leaderIndex = params.getLeaderShardId().getIndex();
115+
final Supplier<TimeValue> timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut;
116+
CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap(
115117
indexMetaData -> {
116118
if (indexMetaData.getMappings().isEmpty()) {
117119
assert indexMetaData.getMappingVersion() == 1;
@@ -246,39 +248,6 @@ private Client remoteClient(ShardFollowTask params) {
246248
return wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
247249
}
248250

249-
private void getIndexMetadata(long minRequiredMappingVersion, long minRequiredMetadataVersion,
250-
ShardFollowTask params, ActionListener<IndexMetaData> listener) {
251-
final Index leaderIndex = params.getLeaderShardId().getIndex();
252-
final ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
253-
if (minRequiredMetadataVersion > 0) {
254-
clusterStateRequest.waitForMetaDataVersion(minRequiredMetadataVersion).waitForTimeout(waitForMetadataTimeOut);
255-
}
256-
try {
257-
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(
258-
r -> {
259-
// if wait_for_metadata_version timeout, the response is empty
260-
if (r.getState() == null) {
261-
assert minRequiredMetadataVersion > 0;
262-
getIndexMetadata(minRequiredMappingVersion, minRequiredMetadataVersion, params, listener);
263-
return;
264-
}
265-
final MetaData metaData = r.getState().metaData();
266-
final IndexMetaData indexMetaData = metaData.getIndexSafe(leaderIndex);
267-
if (indexMetaData.getMappingVersion() < minRequiredMappingVersion) {
268-
// ask for the next version.
269-
getIndexMetadata(minRequiredMappingVersion, metaData.version() + 1, params, listener);
270-
} else {
271-
assert metaData.version() >= minRequiredMetadataVersion : metaData.version() + " < " + minRequiredMetadataVersion;
272-
listener.onResponse(indexMetaData);
273-
}
274-
},
275-
listener::onFailure
276-
));
277-
} catch (Exception e) {
278-
listener.onFailure(e);
279-
}
280-
}
281-
282251
interface FollowerStatsInfoHandler {
283252
void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo);
284253
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques
7272
throw new ShardNotFoundException(shardId);
7373
}
7474
Store.MetadataSnapshot storeFileMetaData = ccrRestoreService.openSession(request.getSessionUUID(), indexShard);
75-
return new PutCcrRestoreSessionResponse(clusterService.localNode(), storeFileMetaData);
75+
long mappingVersion = indexShard.indexSettings().getIndexMetaData().getMappingVersion();
76+
return new PutCcrRestoreSessionResponse(clusterService.localNode(), storeFileMetaData, mappingVersion);
7677
}
7778

7879
@Override
@@ -97,33 +98,38 @@ public static class PutCcrRestoreSessionResponse extends ActionResponse {
9798

9899
private DiscoveryNode node;
99100
private Store.MetadataSnapshot storeFileMetaData;
101+
private long mappingVersion;
100102

101103
PutCcrRestoreSessionResponse() {
102104
}
103105

104-
PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetaData) {
106+
PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetaData, long mappingVersion) {
105107
this.node = node;
106108
this.storeFileMetaData = storeFileMetaData;
109+
this.mappingVersion = mappingVersion;
107110
}
108111

109112
PutCcrRestoreSessionResponse(StreamInput in) throws IOException {
110113
super(in);
111114
node = new DiscoveryNode(in);
112115
storeFileMetaData = new Store.MetadataSnapshot(in);
116+
mappingVersion = in.readVLong();
113117
}
114118

115119
@Override
116120
public void readFrom(StreamInput in) throws IOException {
117121
super.readFrom(in);
118122
node = new DiscoveryNode(in);
119123
storeFileMetaData = new Store.MetadataSnapshot(in);
124+
mappingVersion = in.readVLong();
120125
}
121126

122127
@Override
123128
public void writeTo(StreamOutput out) throws IOException {
124129
super.writeTo(out);
125130
node.writeTo(out);
126131
storeFileMetaData.writeTo(out);
132+
out.writeVLong(mappingVersion);
127133
}
128134

129135
public DiscoveryNode getNode() {
@@ -133,5 +139,9 @@ public DiscoveryNode getNode() {
133139
public Store.MetadataSnapshot getStoreFileMetaData() {
134140
return storeFileMetaData;
135141
}
142+
143+
public long getMappingVersion() {
144+
return mappingVersion;
145+
}
136146
}
137147
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import org.elasticsearch.common.metrics.CounterMetric;
3030
import org.elasticsearch.common.settings.Settings;
3131
import org.elasticsearch.common.unit.ByteSizeValue;
32+
import org.elasticsearch.common.unit.TimeValue;
3233
import org.elasticsearch.common.util.CombinedRateLimiter;
3334
import org.elasticsearch.index.Index;
34-
import org.elasticsearch.index.IndexSettings;
3535
import org.elasticsearch.index.engine.EngineException;
3636
import org.elasticsearch.index.shard.IndexShard;
3737
import org.elasticsearch.index.shard.IndexShardRecoveryException;
@@ -72,6 +72,8 @@
7272
import java.util.Map;
7373
import java.util.Set;
7474
import java.util.function.LongConsumer;
75+
import java.util.function.Supplier;
76+
7577

7678
/**
7779
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
@@ -288,30 +290,31 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
288290
String name = metadata.name();
289291
try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
290292
restoreSession.restoreFiles();
293+
updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, indexShard.routingEntry().index());
291294
} catch (Exception e) {
292295
throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
293296
}
294-
295-
maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings());
296297
}
297298

298299
@Override
299300
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) {
300301
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
301302
}
302303

303-
private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
304-
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
305-
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
306-
.actionGet(ccrSettings.getRecoveryActionTimeout());
307-
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
308-
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();
309-
310-
if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) {
311-
Index followerIndex = followerIndexSettings.getIndex();
312-
MappingMetaData mappingMetaData = leaderIndexMetadata.mapping();
313-
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
314-
localClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
304+
private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
305+
Client followerClient, Index followerIndex) {
306+
final PlainActionFuture<IndexMetaData> indexMetadataFuture = new PlainActionFuture<>();
307+
final long startTimeInNanos = System.nanoTime();
308+
final Supplier<TimeValue> timeout = () -> {
309+
final long elapsedInNanos = System.nanoTime() - startTimeInNanos;
310+
return TimeValue.timeValueNanos(ccrSettings.getRecoveryActionTimeout().nanos() - elapsedInNanos);
311+
};
312+
CcrRequests.getIndexMetadata(leaderClient, leaderIndex, leaderMappingVersion, 0L, timeout, indexMetadataFuture);
313+
final IndexMetaData leaderIndexMetadata = indexMetadataFuture.actionGet(ccrSettings.getRecoveryActionTimeout());
314+
final MappingMetaData mappingMetaData = leaderIndexMetadata.mapping();
315+
if (mappingMetaData != null) {
316+
final PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
317+
followerClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
315318
}
316319
}
317320

@@ -321,7 +324,7 @@ private RestoreSession openSession(String repositoryName, Client remoteClient, S
321324
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
322325
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout());
323326
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
324-
response.getStoreFileMetaData(), ccrSettings, throttledTime::inc);
327+
response.getStoreFileMetaData(), response.getMappingVersion(), ccrSettings, throttledTime::inc);
325328
}
326329

327330
private static class RestoreSession extends FileRestoreContext implements Closeable {
@@ -332,17 +335,19 @@ private static class RestoreSession extends FileRestoreContext implements Closea
332335
private final String sessionUUID;
333336
private final DiscoveryNode node;
334337
private final Store.MetadataSnapshot sourceMetaData;
338+
private final long mappingVersion;
335339
private final CcrSettings ccrSettings;
336340
private final LongConsumer throttleListener;
337341

338342
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
339-
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CcrSettings ccrSettings,
340-
LongConsumer throttleListener) {
343+
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion,
344+
CcrSettings ccrSettings, LongConsumer throttleListener) {
341345
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
342346
this.remoteClient = remoteClient;
343347
this.sessionUUID = sessionUUID;
344348
this.node = node;
345349
this.sourceMetaData = sourceMetaData;
350+
this.mappingVersion = mappingVersion;
346351
this.ccrSettings = ccrSettings;
347352
this.throttleListener = throttleListener;
348353
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,6 @@ public void testIndividualActionsTimeout() throws Exception {
390390
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
391391
}
392392

393-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37887")
394393
public void testFollowerMappingIsUpdated() throws IOException {
395394
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
396395
String leaderIndex = "index1";
@@ -413,16 +412,8 @@ public void testFollowerMappingIsUpdated() throws IOException {
413412
.renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
414413
.indexSettings(settingsBuilder);
415414

416-
// TODO: Eventually when the file recovery work is complete, we should test updated mappings by
417-
// indexing to the leader while the recovery is happening. However, into order to that test mappings
418-
// are updated prior to that work, we index documents in the clear session callback. This will
419-
// ensure a mapping change prior to the final mapping check on the follower side.
420-
for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) {
421-
restoreSourceService.addCloseSessionListener(s -> {
422-
final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1);
423-
leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get();
424-
});
425-
}
415+
final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1);
416+
leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get();
426417

427418
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
428419
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
@@ -435,10 +426,6 @@ public void testFollowerMappingIsUpdated() throws IOException {
435426
clusterStateRequest.clear();
436427
clusterStateRequest.metaData(true);
437428
clusterStateRequest.indices(followerIndex);
438-
ClusterStateResponse clusterState = followerClient().admin().cluster().state(clusterStateRequest).actionGet();
439-
IndexMetaData followerIndexMetadata = clusterState.getState().metaData().index(followerIndex);
440-
assertEquals(2, followerIndexMetadata.getMappingVersion());
441-
442429
MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
443430
.get("index2").get("doc");
444431
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));

0 commit comments

Comments
 (0)