Skip to content

Commit e79e6d9

Browse files
Remove Redundant Loading of RepositoryData during Restore (#51977)
We can just put the `IndexId` instead of just the index name into the recovery soruce and save one load of `RepositoryData` on each shard restore that way.
1 parent e63ef39 commit e79e6d9

File tree

17 files changed

+88
-40
lines changed

17 files changed

+88
-40
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
package org.elasticsearch.cluster.routing;
2121

2222
import org.elasticsearch.Version;
23+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2324
import org.elasticsearch.common.io.stream.StreamInput;
2425
import org.elasticsearch.common.io.stream.StreamOutput;
2526
import org.elasticsearch.common.io.stream.Writeable;
2627
import org.elasticsearch.common.xcontent.ToXContent;
2728
import org.elasticsearch.common.xcontent.ToXContentObject;
2829
import org.elasticsearch.common.xcontent.XContentBuilder;
30+
import org.elasticsearch.repositories.IndexId;
2931
import org.elasticsearch.snapshots.Snapshot;
3032

3133
import java.io.IOException;
@@ -213,21 +215,25 @@ public String toString() {
213215
public static class SnapshotRecoverySource extends RecoverySource {
214216
private final String restoreUUID;
215217
private final Snapshot snapshot;
216-
private final String index;
218+
private final IndexId index;
217219
private final Version version;
218220

219-
public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, String index) {
221+
public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId) {
220222
this.restoreUUID = restoreUUID;
221223
this.snapshot = Objects.requireNonNull(snapshot);
222224
this.version = Objects.requireNonNull(version);
223-
this.index = Objects.requireNonNull(index);
225+
this.index = Objects.requireNonNull(indexId);
224226
}
225227

226228
SnapshotRecoverySource(StreamInput in) throws IOException {
227229
restoreUUID = in.readString();
228230
snapshot = new Snapshot(in);
229231
version = Version.readVersion(in);
230-
index = in.readString();
232+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
233+
index = new IndexId(in);
234+
} else {
235+
index = new IndexId(in.readString(), IndexMetaData.INDEX_UUID_NA_VALUE);
236+
}
231237
}
232238

233239
public String restoreUUID() {
@@ -238,7 +244,13 @@ public Snapshot snapshot() {
238244
return snapshot;
239245
}
240246

241-
public String index() {
247+
/**
248+
* Gets the {@link IndexId} of the recovery source. May contain {@link IndexMetaData#INDEX_UUID_NA_VALUE} as the index uuid if it
249+
* was created by an older version master in a mixed version cluster.
250+
*
251+
* @return IndexId
252+
*/
253+
public IndexId index() {
242254
return index;
243255
}
244256

@@ -251,7 +263,11 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException {
251263
out.writeString(restoreUUID);
252264
snapshot.writeTo(out);
253265
Version.writeVersion(version, out);
254-
out.writeString(index);
266+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
267+
index.writeTo(out);
268+
} else {
269+
out.writeString(index.getName());
270+
}
255271
}
256272

257273
@Override
@@ -264,7 +280,7 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
264280
builder.field("repository", snapshot.getRepository())
265281
.field("snapshot", snapshot.getSnapshotId().getName())
266282
.field("version", version.toString())
267-
.field("index", index)
283+
.field("index", index.getName())
268284
.field("restoreUUID", restoreUUID);
269285
}
270286

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.lucene.store.IndexInput;
3232
import org.elasticsearch.ExceptionsHelper;
3333
import org.elasticsearch.action.ActionListener;
34+
import org.elasticsearch.action.StepListener;
3435
import org.elasticsearch.cluster.metadata.IndexMetaData;
3536
import org.elasticsearch.cluster.metadata.MappingMetaData;
3637
import org.elasticsearch.cluster.routing.RecoverySource;
@@ -473,20 +474,24 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov
473474
translogState.totalOperationsOnStart(0);
474475
indexShard.prepareForIndexRecovery();
475476
final ShardId snapshotShardId;
476-
final String indexName = restoreSource.index();
477-
if (!shardId.getIndexName().equals(indexName)) {
478-
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
479-
} else {
477+
final IndexId indexId = restoreSource.index();
478+
if (shardId.getIndexName().equals(indexId.getName())) {
480479
snapshotShardId = shardId;
480+
} else {
481+
snapshotShardId = new ShardId(indexId.getName(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
482+
}
483+
final StepListener<IndexId> indexIdListener = new StepListener<>();
484+
// If the index UUID was not found in the recovery source we will have to load RepositoryData and resolve it by index name
485+
if (indexId.getId().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
486+
// BwC path, running against an old version master that did not add the IndexId to the recovery source
487+
repository.getRepositoryData(ActionListener.map(
488+
indexIdListener, repositoryData -> repositoryData.resolveIndexId(indexId.getName())));
489+
} else {
490+
indexIdListener.onResponse(indexId);
481491
}
482-
repository.getRepositoryData(ActionListener.wrap(
483-
repositoryData -> {
484-
final IndexId indexId = repositoryData.resolveIndexId(indexName);
485-
assert indexShard.getEngineOrNull() == null;
486-
repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId,
487-
indexShard.recoveryState(), restoreListener);
488-
}, restoreListener::onFailure
489-
));
492+
assert indexShard.getEngineOrNull() == null;
493+
indexIdListener.whenComplete(idx -> repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(),
494+
idx, snapshotShardId, indexShard.recoveryState(), restoreListener), restoreListener::onFailure);
490495
} catch (Exception e) {
491496
restoreListener.onFailure(e);
492497
}

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ public ClusterState execute(ClusterState currentState) {
250250
for (Map.Entry<String, String> indexEntry : indices.entrySet()) {
251251
String index = indexEntry.getValue();
252252
boolean partial = checkPartial(index);
253-
SnapshotRecoverySource recoverySource =
254-
new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index);
253+
SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(restoreUUID, snapshot,
254+
snapshotInfo.version(), repositoryData.resolveIndexId(index));
255255
String renamedIndexName = indexEntry.getKey();
256256
IndexMetaData snapshotIndexMetaData = metaData.index(index);
257257
snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData,

server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.UUIDs;
2424
import org.elasticsearch.index.Index;
2525
import org.elasticsearch.index.shard.ShardId;
26+
import org.elasticsearch.repositories.IndexId;
2627
import org.elasticsearch.snapshots.SnapshotId;
2728
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2829
import org.elasticsearch.snapshots.Snapshot;
@@ -165,7 +166,8 @@ public void testEqualsIgnoringVersion() {
165166
otherRouting = new ShardRouting(otherRouting.shardId(), otherRouting.currentNodeId(),
166167
otherRouting.relocatingNodeId(), otherRouting.primary(), otherRouting.state(),
167168
new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), new Snapshot("test",
168-
new SnapshotId("s1", UUIDs.randomBase64UUID())), Version.CURRENT, "test"),
169+
new SnapshotId("s1", UUIDs.randomBase64UUID())), Version.CURRENT, new IndexId("test",
170+
UUIDs.randomBase64UUID(random()))),
169171
otherRouting.unassignedInfo(), otherRouting.allocationId(), otherRouting.getExpectedShardSize());
170172
}
171173
break;

server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.settings.Settings;
3939
import org.elasticsearch.common.unit.TimeValue;
4040
import org.elasticsearch.index.Index;
41+
import org.elasticsearch.repositories.IndexId;
4142
import org.elasticsearch.snapshots.Snapshot;
4243
import org.elasticsearch.snapshots.SnapshotId;
4344

@@ -151,7 +152,8 @@ public void testNewIndexRestored() {
151152
.metaData(metaData)
152153
.routingTable(RoutingTable.builder().addAsNewRestore(metaData.index("test"), new SnapshotRecoverySource(
153154
UUIDs.randomBase64UUID(),
154-
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT, "test"),
155+
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT,
156+
new IndexId("test", UUIDs.randomBase64UUID(random()))),
155157
new IntHashSet()).build()).build();
156158
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
157159
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NEW_INDEX_RESTORED));
@@ -168,7 +170,8 @@ public void testExistingIndexRestored() {
168170
.routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"),
169171
new SnapshotRecoverySource(
170172
UUIDs.randomBase64UUID(), new Snapshot("rep1",
171-
new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT, "test")).build()).build();
173+
new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT,
174+
new IndexId("test", UUIDs.randomBase64UUID(random())))).build()).build();
172175
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
173176
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED));
174177
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.common.settings.Settings;
5454
import org.elasticsearch.common.util.set.Sets;
5555
import org.elasticsearch.index.shard.ShardId;
56+
import org.elasticsearch.repositories.IndexId;
5657
import org.elasticsearch.snapshots.Snapshot;
5758
import org.elasticsearch.snapshots.SnapshotId;
5859
import org.elasticsearch.test.VersionUtils;
@@ -366,7 +367,7 @@ public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() {
366367
new SnapshotRecoverySource(
367368
UUIDs.randomBase64UUID(),
368369
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
369-
Version.CURRENT, "test")).build())
370+
Version.CURRENT, new IndexId("test", UUIDs.randomBase64UUID(random())))).build())
370371
.nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2)).build();
371372
AllocationDeciders allocationDeciders = new AllocationDeciders(Arrays.asList(
372373
new ReplicaAfterPrimaryActiveAllocationDecider(),
@@ -480,14 +481,15 @@ public void testMessages() {
480481
assertThat(decision.getExplanation(), is("cannot relocate primary shard from a node with version [" +
481482
newNode.node().getVersion() + "] to a node with older version [" + oldNode.node().getVersion() + "]"));
482483

484+
final IndexId indexId = new IndexId("test", UUIDs.randomBase64UUID(random()));
483485
final SnapshotRecoverySource newVersionSnapshot = new SnapshotRecoverySource(
484486
UUIDs.randomBase64UUID(),
485487
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
486-
newNode.node().getVersion(), "test");
488+
newNode.node().getVersion(), indexId);
487489
final SnapshotRecoverySource oldVersionSnapshot = new SnapshotRecoverySource(
488490
UUIDs.randomBase64UUID(),
489491
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
490-
oldNode.node().getVersion(), "test");
492+
oldNode.node().getVersion(), indexId);
491493

492494
decision = allocationDecider.canAllocate(ShardRoutingHelper.newWithRestoreSource(primaryShard, newVersionSnapshot),
493495
oldNode, routingAllocation);

server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.common.settings.Settings;
4848
import org.elasticsearch.index.Index;
4949
import org.elasticsearch.index.shard.ShardId;
50+
import org.elasticsearch.repositories.IndexId;
5051
import org.elasticsearch.snapshots.Snapshot;
5152
import org.elasticsearch.snapshots.SnapshotId;
5253
import org.elasticsearch.test.gateway.TestGatewayAllocator;
@@ -360,13 +361,15 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat
360361
snapshotIndices.add(index.getName());
361362
routingTableBuilder.addAsNewRestore(indexMetaData,
362363
new SnapshotRecoverySource(
363-
restoreUUID, snapshot, Version.CURRENT, indexMetaData.getIndex().getName()), new IntHashSet());
364+
restoreUUID, snapshot, Version.CURRENT,
365+
new IndexId(indexMetaData.getIndex().getName(), UUIDs.randomBase64UUID(random()))), new IntHashSet());
364366
break;
365367
case 4:
366368
snapshotIndices.add(index.getName());
367369
routingTableBuilder.addAsRestore(indexMetaData,
368370
new SnapshotRecoverySource(
369-
restoreUUID, snapshot, Version.CURRENT, indexMetaData.getIndex().getName()));
371+
restoreUUID, snapshot, Version.CURRENT,
372+
new IndexId(indexMetaData.getIndex().getName(), UUIDs.randomBase64UUID(random()))));
370373
break;
371374
case 5:
372375
routingTableBuilder.addAsNew(indexMetaData);

server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.common.UUIDs;
4242
import org.elasticsearch.common.collect.ImmutableOpenMap;
4343
import org.elasticsearch.index.shard.ShardId;
44+
import org.elasticsearch.repositories.IndexId;
4445
import org.elasticsearch.snapshots.Snapshot;
4546
import org.elasticsearch.snapshots.SnapshotId;
4647

@@ -207,6 +208,7 @@ private Decision executeAllocation(final ClusterState clusterState, final ShardR
207208

208209
private RecoverySource.SnapshotRecoverySource createSnapshotRecoverySource(final String snapshotName) {
209210
Snapshot snapshot = new Snapshot("_repository", new SnapshotId(snapshotName, "_uuid"));
210-
return new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test");
211+
return new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT,
212+
new IndexId("test", UUIDs.randomBase64UUID(random())));
211213
}
212214
}

server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.elasticsearch.common.util.set.Sets;
4949
import org.elasticsearch.env.ShardLockObtainFailedException;
5050
import org.elasticsearch.index.shard.ShardId;
51+
import org.elasticsearch.repositories.IndexId;
5152
import org.elasticsearch.snapshots.Snapshot;
5253
import org.elasticsearch.snapshots.SnapshotId;
5354
import org.junit.Before;
@@ -392,7 +393,8 @@ private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocat
392393
final Snapshot snapshot = new Snapshot("test", new SnapshotId("test", UUIDs.randomBase64UUID()));
393394
RoutingTable routingTable = RoutingTable.builder()
394395
.addAsRestore(metaData.index(shardId.getIndex()),
395-
new SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, shardId.getIndexName()))
396+
new SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT,
397+
new IndexId(shardId.getIndexName(), UUIDs.randomBase64UUID(random()))))
396398
.build();
397399
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
398400
.metaData(metaData)

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2327,7 +2327,8 @@ public void testRestoreShard() throws IOException {
23272327
RecoverySource.ExistingStoreRecoverySource.INSTANCE);
23282328
final Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID()));
23292329
routing = ShardRoutingHelper.newWithRestoreSource(routing,
2330-
new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test"));
2330+
new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT,
2331+
new IndexId("test", UUIDs.randomBase64UUID(random()))));
23312332
target = reinitShard(target, routing);
23322333
Store sourceStore = source.store();
23332334
Store targetStore = target.store();

server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.lucene.analysis.TokenStream;
2323
import org.elasticsearch.ElasticsearchException;
2424
import org.elasticsearch.Version;
25+
import org.elasticsearch.action.ActionRunnable;
2526
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
2627
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
2728
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
@@ -85,6 +86,9 @@
8586
import org.elasticsearch.plugins.AnalysisPlugin;
8687
import org.elasticsearch.plugins.Plugin;
8788
import org.elasticsearch.plugins.PluginsService;
89+
import org.elasticsearch.repositories.RepositoriesService;
90+
import org.elasticsearch.repositories.Repository;
91+
import org.elasticsearch.repositories.RepositoryData;
8892
import org.elasticsearch.snapshots.Snapshot;
8993
import org.elasticsearch.snapshots.SnapshotState;
9094
import org.elasticsearch.test.BackgroundIndexer;
@@ -97,6 +101,7 @@
97101
import org.elasticsearch.test.store.MockFSIndexStore;
98102
import org.elasticsearch.test.transport.MockTransportService;
99103
import org.elasticsearch.test.transport.StubbableTransport;
104+
import org.elasticsearch.threadpool.ThreadPool;
100105
import org.elasticsearch.transport.ConnectTransportException;
101106
import org.elasticsearch.transport.Transport;
102107
import org.elasticsearch.transport.TransportRequest;
@@ -651,6 +656,10 @@ public void testSnapshotRecovery() throws Exception {
651656
logger.info("--> request recoveries");
652657
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
653658

659+
ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class);
660+
Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(REPO_NAME);
661+
final RepositoryData repositoryData = PlainActionFuture.get(f ->
662+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(f, repository::getRepositoryData)));
654663
for (Map.Entry<String, List<RecoveryState>> indexRecoveryStates : response.shardRecoveryStates().entrySet()) {
655664

656665
assertThat(indexRecoveryStates.getKey(), equalTo(INDEX_NAME));
@@ -661,7 +670,7 @@ public void testSnapshotRecovery() throws Exception {
661670
SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(
662671
((SnapshotRecoverySource)recoveryState.getRecoverySource()).restoreUUID(),
663672
new Snapshot(REPO_NAME, createSnapshotResponse.getSnapshotInfo().snapshotId()),
664-
Version.CURRENT, INDEX_NAME);
673+
Version.CURRENT, repositoryData.resolveIndexId(INDEX_NAME));
665674
assertRecoveryState(recoveryState, 0, recoverySource, true, Stage.DONE, null, nodeA);
666675
validateIndexRecoveryState(recoveryState.getIndex());
667676
}

0 commit comments

Comments
 (0)