Skip to content

Commit 4cd5705

Browse files
authored
Update index mappings when ccr restore complete (#36879)
This is related to #35975. When the shard restore process is complete, the index mappings need to be updated to ensure that the data in the files restores is compatible with the follower mappings. This commit implements a mapping update as the final step in a shard restore.
1 parent 2f80d6f commit 4cd5705

File tree

4 files changed

+122
-31
lines changed

4 files changed

+122
-31
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ccr.action;
7+
8+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
9+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
10+
import org.elasticsearch.cluster.metadata.MappingMetaData;
11+
import org.elasticsearch.common.xcontent.XContentType;
12+
13+
public final class CcrRequests {
14+
15+
private CcrRequests() {}
16+
17+
public static ClusterStateRequest metaDataRequest(String leaderIndex) {
18+
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
19+
clusterStateRequest.clear();
20+
clusterStateRequest.metaData(true);
21+
clusterStateRequest.indices(leaderIndex);
22+
return clusterStateRequest;
23+
}
24+
25+
public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) {
26+
PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex);
27+
putMappingRequest.type(mappingMetaData.type());
28+
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
29+
return putMappingRequest;
30+
}
31+
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.common.unit.TimeValue;
3333
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
34-
import org.elasticsearch.common.xcontent.XContentType;
3534
import org.elasticsearch.index.Index;
3635
import org.elasticsearch.index.IndexNotFoundException;
3736
import org.elasticsearch.index.engine.CommitStats;
@@ -123,10 +122,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
123122
Index leaderIndex = params.getLeaderShardId().getIndex();
124123
Index followIndex = params.getFollowShardId().getIndex();
125124

126-
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
127-
clusterStateRequest.clear();
128-
clusterStateRequest.metaData(true);
129-
clusterStateRequest.indices(leaderIndex.getName());
125+
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
130126

131127
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
132128
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
@@ -140,9 +136,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
140136
indexMetaData.getMappings().size() + "]";
141137
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;
142138

143-
PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName());
144-
putMappingRequest.type(mappingMetaData.type());
145-
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
139+
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData);
146140
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
147141
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
148142
errorHandler));
@@ -154,10 +148,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
154148
final Index leaderIndex = params.getLeaderShardId().getIndex();
155149
final Index followIndex = params.getFollowShardId().getIndex();
156150

157-
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
158-
clusterStateRequest.clear();
159-
clusterStateRequest.metaData(true);
160-
clusterStateRequest.indices(leaderIndex.getName());
151+
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
161152

162153
CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
163154
final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@
88

99
import org.apache.lucene.index.IndexCommit;
1010
import org.elasticsearch.Version;
11+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1112
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
1214
import org.elasticsearch.action.support.PlainActionFuture;
1315
import org.elasticsearch.client.Client;
1416
import org.elasticsearch.cluster.metadata.IndexMetaData;
17+
import org.elasticsearch.cluster.metadata.MappingMetaData;
1518
import org.elasticsearch.cluster.metadata.MetaData;
1619
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
1720
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -21,6 +24,7 @@
2124
import org.elasticsearch.common.component.AbstractLifecycleComponent;
2225
import org.elasticsearch.common.settings.Settings;
2326
import org.elasticsearch.index.Index;
27+
import org.elasticsearch.index.IndexSettings;
2428
import org.elasticsearch.index.engine.EngineException;
2529
import org.elasticsearch.index.shard.IndexShard;
2630
import org.elasticsearch.index.shard.IndexShardRecoveryException;
@@ -37,6 +41,7 @@
3741
import org.elasticsearch.snapshots.SnapshotState;
3842
import org.elasticsearch.xpack.ccr.Ccr;
3943
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
44+
import org.elasticsearch.xpack.ccr.action.CcrRequests;
4045
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
4146
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest;
4247
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
@@ -111,15 +116,10 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
111116
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
112117
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
113118
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
114-
ClusterStateResponse response = remoteClient
115-
.admin()
116-
.cluster()
117-
.prepareState()
118-
.clear()
119-
.setMetaData(true)
120-
.setIndices("dummy_index_name") // We set a single dummy index name to avoid fetching all the index data
121-
.get();
122-
return response.getState().metaData();
119+
// We set a single dummy index name to avoid fetching all the index data
120+
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name");
121+
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
122+
return clusterState.getState().metaData();
123123
}
124124

125125
@Override
@@ -128,18 +128,12 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
128128
String leaderIndex = index.getName();
129129
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
130130

131-
ClusterStateResponse response = remoteClient
132-
.admin()
133-
.cluster()
134-
.prepareState()
135-
.clear()
136-
.setMetaData(true)
137-
.setIndices(leaderIndex)
138-
.get();
131+
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex);
132+
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
139133

140134
// Validates whether the leader cluster has been configured properly:
141135
PlainActionFuture<String[]> future = PlainActionFuture.newFuture();
142-
IndexMetaData leaderIndexMetaData = response.getState().metaData().index(leaderIndex);
136+
IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex);
143137
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
144138
String[] leaderHistoryUUIDs = future.actionGet();
145139

@@ -252,7 +246,8 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
252246

253247
Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
254248
String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
255-
ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId());
249+
Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID);
250+
ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId());
256251

257252
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
258253
String sessionUUID = UUIDs.randomBase64UUID();
@@ -261,13 +256,28 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
261256
String nodeId = response.getNodeId();
262257
// TODO: Implement file restore
263258
closeSession(remoteClient, nodeId, sessionUUID);
259+
maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings());
264260
}
265261

266262
@Override
267263
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) {
268264
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
269265
}
270266

267+
private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
268+
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
269+
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
270+
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
271+
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();
272+
273+
if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) {
274+
Index followerIndex = followerIndexSettings.getIndex();
275+
MappingMetaData mappingMetaData = leaderIndexMetadata.mapping();
276+
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
277+
localClient.admin().indices().putMapping(putMappingRequest).actionGet();
278+
}
279+
}
280+
271281
private void closeSession(Client remoteClient, String nodeId, String sessionUUID) {
272282
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId,
273283
new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID));

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,22 @@
88

99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
11+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1112
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1213
import org.elasticsearch.action.support.IndicesOptions;
1314
import org.elasticsearch.action.support.PlainActionFuture;
1415
import org.elasticsearch.cluster.ClusterChangedEvent;
1516
import org.elasticsearch.cluster.ClusterStateListener;
1617
import org.elasticsearch.cluster.RestoreInProgress;
1718
import org.elasticsearch.cluster.metadata.IndexMetaData;
19+
import org.elasticsearch.cluster.metadata.MappingMetaData;
1820
import org.elasticsearch.cluster.service.ClusterService;
1921
import org.elasticsearch.common.collect.ImmutableOpenMap;
2022
import org.elasticsearch.common.settings.Settings;
2123
import org.elasticsearch.common.unit.TimeValue;
2224
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2325
import org.elasticsearch.common.xcontent.XContentType;
26+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
2427
import org.elasticsearch.index.IndexSettings;
2528
import org.elasticsearch.index.shard.ShardId;
2629
import org.elasticsearch.repositories.RepositoriesService;
@@ -35,13 +38,15 @@
3538
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
3639

3740
import java.io.IOException;
41+
import java.util.Locale;
3842
import java.util.Map;
3943
import java.util.Set;
4044
import java.util.concurrent.TimeUnit;
4145

4246
import static java.util.Collections.singletonMap;
4347
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
4448
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
49+
import static org.hamcrest.Matchers.equalTo;
4550

4651
// TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work
4752
// TODO: is completed.
@@ -195,6 +200,60 @@ public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException {
195200
assertEquals(0, restoreInfo.failedShards());
196201
}
197202

203+
public void testFollowerMappingIsUpdated() throws IOException {
204+
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
205+
String leaderIndex = "index1";
206+
String followerIndex = "index2";
207+
208+
final int numberOfPrimaryShards = randomIntBetween(1, 3);
209+
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
210+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
211+
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
212+
ensureLeaderGreen(leaderIndex);
213+
214+
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
215+
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
216+
217+
Settings.Builder settingsBuilder = Settings.builder()
218+
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
219+
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
220+
RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName,
221+
CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions,
222+
"^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false,
223+
false, true, settingsBuilder.build(), new String[0],
224+
"restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]");
225+
226+
// TODO: Eventually when the file recovery work is complete, we should test updated mappings by
227+
// indexing to the leader while the recovery is happening. However, into order to that test mappings
228+
// are updated prior to that work, we index documents in the clear session callback. This will
229+
// ensure a mapping change prior to the final mapping check on the follower side.
230+
for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) {
231+
restoreSourceService.addCloseSessionListener(s -> {
232+
final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1);
233+
leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get();
234+
});
235+
}
236+
237+
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
238+
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
239+
RestoreInfo restoreInfo = future.actionGet();
240+
241+
assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards());
242+
assertEquals(0, restoreInfo.failedShards());
243+
244+
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
245+
clusterStateRequest.clear();
246+
clusterStateRequest.metaData(true);
247+
clusterStateRequest.indices(followerIndex);
248+
ClusterStateResponse clusterState = followerClient().admin().cluster().state(clusterStateRequest).actionGet();
249+
IndexMetaData followerIndexMetadata = clusterState.getState().metaData().index(followerIndex);
250+
assertEquals(2, followerIndexMetadata.getMappingVersion());
251+
252+
MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
253+
.get("index2").get("doc");
254+
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));
255+
}
256+
198257
private ActionListener<RestoreService.RestoreCompletionResponse> waitForRestore(ClusterService clusterService,
199258
ActionListener<RestoreInfo> listener) {
200259
return new ActionListener<RestoreService.RestoreCompletionResponse>() {

0 commit comments

Comments
 (0)