Skip to content

Commit 90eb6a0

Browse files
Remove Redundant Loading of RepositoryData during Restore (#51977) (#52108)
We can just put the `IndexId` instead of just the index name into the recovery soruce and save one load of `RepositoryData` on each shard restore that way.
1 parent 3e7f939 commit 90eb6a0

File tree

17 files changed

+88
-40
lines changed

17 files changed

+88
-40
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java

+23-7
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121

2222
import org.elasticsearch.Version;
2323
import org.elasticsearch.cluster.RestoreInProgress;
24+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2425
import org.elasticsearch.common.io.stream.StreamInput;
2526
import org.elasticsearch.common.io.stream.StreamOutput;
2627
import org.elasticsearch.common.io.stream.Writeable;
2728
import org.elasticsearch.common.xcontent.ToXContent;
2829
import org.elasticsearch.common.xcontent.ToXContentObject;
2930
import org.elasticsearch.common.xcontent.XContentBuilder;
31+
import org.elasticsearch.repositories.IndexId;
3032
import org.elasticsearch.snapshots.Snapshot;
3133

3234
import java.io.IOException;
@@ -220,14 +222,14 @@ public String toString() {
220222
public static class SnapshotRecoverySource extends RecoverySource {
221223
private final String restoreUUID;
222224
private final Snapshot snapshot;
223-
private final String index;
225+
private final IndexId index;
224226
private final Version version;
225227

226-
public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, String index) {
228+
public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId) {
227229
this.restoreUUID = restoreUUID;
228230
this.snapshot = Objects.requireNonNull(snapshot);
229231
this.version = Objects.requireNonNull(version);
230-
this.index = Objects.requireNonNull(index);
232+
this.index = Objects.requireNonNull(indexId);
231233
}
232234

233235
SnapshotRecoverySource(StreamInput in) throws IOException {
@@ -238,7 +240,11 @@ public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version ver
238240
}
239241
snapshot = new Snapshot(in);
240242
version = Version.readVersion(in);
241-
index = in.readString();
243+
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
244+
index = new IndexId(in);
245+
} else {
246+
index = new IndexId(in.readString(), IndexMetaData.INDEX_UUID_NA_VALUE);
247+
}
242248
}
243249

244250
public String restoreUUID() {
@@ -249,7 +255,13 @@ public Snapshot snapshot() {
249255
return snapshot;
250256
}
251257

252-
public String index() {
258+
/**
259+
* Gets the {@link IndexId} of the recovery source. May contain {@link IndexMetaData#INDEX_UUID_NA_VALUE} as the index uuid if it
260+
* was created by an older version master in a mixed version cluster.
261+
*
262+
* @return IndexId
263+
*/
264+
public IndexId index() {
253265
return index;
254266
}
255267

@@ -264,7 +276,11 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException {
264276
}
265277
snapshot.writeTo(out);
266278
Version.writeVersion(version, out);
267-
out.writeString(index);
279+
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
280+
index.writeTo(out);
281+
} else {
282+
out.writeString(index.getName());
283+
}
268284
}
269285

270286
@Override
@@ -277,7 +293,7 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
277293
builder.field("repository", snapshot.getRepository())
278294
.field("snapshot", snapshot.getSnapshotId().getName())
279295
.field("version", version.toString())
280-
.field("index", index)
296+
.field("index", index.getName())
281297
.field("restoreUUID", restoreUUID);
282298
}
283299

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

+17-12
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.ExceptionsHelper;
3434
import org.elasticsearch.Version;
3535
import org.elasticsearch.action.ActionListener;
36+
import org.elasticsearch.action.StepListener;
3637
import org.elasticsearch.cluster.metadata.IndexMetaData;
3738
import org.elasticsearch.cluster.metadata.MappingMetaData;
3839
import org.elasticsearch.cluster.routing.RecoverySource;
@@ -484,20 +485,24 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov
484485
translogState.totalOperationsOnStart(0);
485486
indexShard.prepareForIndexRecovery();
486487
final ShardId snapshotShardId;
487-
final String indexName = restoreSource.index();
488-
if (!shardId.getIndexName().equals(indexName)) {
489-
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
490-
} else {
488+
final IndexId indexId = restoreSource.index();
489+
if (shardId.getIndexName().equals(indexId.getName())) {
491490
snapshotShardId = shardId;
491+
} else {
492+
snapshotShardId = new ShardId(indexId.getName(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
493+
}
494+
final StepListener<IndexId> indexIdListener = new StepListener<>();
495+
// If the index UUID was not found in the recovery source we will have to load RepositoryData and resolve it by index name
496+
if (indexId.getId().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
497+
// BwC path, running against an old version master that did not add the IndexId to the recovery source
498+
repository.getRepositoryData(ActionListener.map(
499+
indexIdListener, repositoryData -> repositoryData.resolveIndexId(indexId.getName())));
500+
} else {
501+
indexIdListener.onResponse(indexId);
492502
}
493-
repository.getRepositoryData(ActionListener.wrap(
494-
repositoryData -> {
495-
final IndexId indexId = repositoryData.resolveIndexId(indexName);
496-
assert indexShard.getEngineOrNull() == null;
497-
repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId,
498-
indexShard.recoveryState(), restoreListener);
499-
}, restoreListener::onFailure
500-
));
503+
assert indexShard.getEngineOrNull() == null;
504+
indexIdListener.whenComplete(idx -> repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(),
505+
idx, snapshotShardId, indexShard.recoveryState(), restoreListener), restoreListener::onFailure);
501506
} catch (Exception e) {
502507
restoreListener.onFailure(e);
503508
}

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,8 @@ public ClusterState execute(ClusterState currentState) {
258258
for (Map.Entry<String, String> indexEntry : indices.entrySet()) {
259259
String index = indexEntry.getValue();
260260
boolean partial = checkPartial(index);
261-
SnapshotRecoverySource recoverySource =
262-
new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index);
261+
SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(restoreUUID, snapshot,
262+
snapshotInfo.version(), repositoryData.resolveIndexId(index));
263263
String renamedIndexName = indexEntry.getKey();
264264
IndexMetaData snapshotIndexMetaData = metaData.index(index);
265265
snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData,

server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.UUIDs;
2424
import org.elasticsearch.index.Index;
2525
import org.elasticsearch.index.shard.ShardId;
26+
import org.elasticsearch.repositories.IndexId;
2627
import org.elasticsearch.snapshots.SnapshotId;
2728
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2829
import org.elasticsearch.snapshots.Snapshot;
@@ -165,7 +166,8 @@ public void testEqualsIgnoringVersion() {
165166
otherRouting = new ShardRouting(otherRouting.shardId(), otherRouting.currentNodeId(),
166167
otherRouting.relocatingNodeId(), otherRouting.primary(), otherRouting.state(),
167168
new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), new Snapshot("test",
168-
new SnapshotId("s1", UUIDs.randomBase64UUID())), Version.CURRENT, "test"),
169+
new SnapshotId("s1", UUIDs.randomBase64UUID())), Version.CURRENT, new IndexId("test",
170+
UUIDs.randomBase64UUID(random()))),
169171
otherRouting.unassignedInfo(), otherRouting.allocationId(), otherRouting.getExpectedShardSize());
170172
}
171173
break;

server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.common.settings.Settings;
4040
import org.elasticsearch.common.unit.TimeValue;
4141
import org.elasticsearch.index.Index;
42+
import org.elasticsearch.repositories.IndexId;
4243
import org.elasticsearch.snapshots.Snapshot;
4344
import org.elasticsearch.snapshots.SnapshotId;
4445
import org.elasticsearch.test.VersionUtils;
@@ -175,7 +176,8 @@ public void testNewIndexRestored() {
175176
.metaData(metaData)
176177
.routingTable(RoutingTable.builder().addAsNewRestore(metaData.index("test"), new SnapshotRecoverySource(
177178
UUIDs.randomBase64UUID(),
178-
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT, "test"),
179+
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT,
180+
new IndexId("test", UUIDs.randomBase64UUID(random()))),
179181
new IntHashSet()).build()).build();
180182
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
181183
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NEW_INDEX_RESTORED));
@@ -192,7 +194,8 @@ public void testExistingIndexRestored() {
192194
.routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"),
193195
new SnapshotRecoverySource(
194196
UUIDs.randomBase64UUID(), new Snapshot("rep1",
195-
new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT, "test")).build()).build();
197+
new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT,
198+
new IndexId("test", UUIDs.randomBase64UUID(random())))).build()).build();
196199
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
197200
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED));
198201
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.common.settings.Settings;
5454
import org.elasticsearch.common.util.set.Sets;
5555
import org.elasticsearch.index.shard.ShardId;
56+
import org.elasticsearch.repositories.IndexId;
5657
import org.elasticsearch.snapshots.Snapshot;
5758
import org.elasticsearch.snapshots.SnapshotId;
5859
import org.elasticsearch.test.VersionUtils;
@@ -366,7 +367,7 @@ public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() {
366367
new SnapshotRecoverySource(
367368
UUIDs.randomBase64UUID(),
368369
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
369-
Version.CURRENT, "test")).build())
370+
Version.CURRENT, new IndexId("test", UUIDs.randomBase64UUID(random())))).build())
370371
.nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2)).build();
371372
AllocationDeciders allocationDeciders = new AllocationDeciders(Arrays.asList(
372373
new ReplicaAfterPrimaryActiveAllocationDecider(),
@@ -480,14 +481,15 @@ public void testMessages() {
480481
assertThat(decision.getExplanation(), is("cannot relocate primary shard from a node with version [" +
481482
newNode.node().getVersion() + "] to a node with older version [" + oldNode.node().getVersion() + "]"));
482483

484+
final IndexId indexId = new IndexId("test", UUIDs.randomBase64UUID(random()));
483485
final SnapshotRecoverySource newVersionSnapshot = new SnapshotRecoverySource(
484486
UUIDs.randomBase64UUID(),
485487
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
486-
newNode.node().getVersion(), "test");
488+
newNode.node().getVersion(), indexId);
487489
final SnapshotRecoverySource oldVersionSnapshot = new SnapshotRecoverySource(
488490
UUIDs.randomBase64UUID(),
489491
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
490-
oldNode.node().getVersion(), "test");
492+
oldNode.node().getVersion(), indexId);
491493

492494
decision = allocationDecider.canAllocate(ShardRoutingHelper.newWithRestoreSource(primaryShard, newVersionSnapshot),
493495
oldNode, routingAllocation);

server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.common.settings.Settings;
4848
import org.elasticsearch.index.Index;
4949
import org.elasticsearch.index.shard.ShardId;
50+
import org.elasticsearch.repositories.IndexId;
5051
import org.elasticsearch.snapshots.Snapshot;
5152
import org.elasticsearch.snapshots.SnapshotId;
5253
import org.elasticsearch.test.gateway.TestGatewayAllocator;
@@ -360,13 +361,15 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat
360361
snapshotIndices.add(index.getName());
361362
routingTableBuilder.addAsNewRestore(indexMetaData,
362363
new SnapshotRecoverySource(
363-
restoreUUID, snapshot, Version.CURRENT, indexMetaData.getIndex().getName()), new IntHashSet());
364+
restoreUUID, snapshot, Version.CURRENT,
365+
new IndexId(indexMetaData.getIndex().getName(), UUIDs.randomBase64UUID(random()))), new IntHashSet());
364366
break;
365367
case 4:
366368
snapshotIndices.add(index.getName());
367369
routingTableBuilder.addAsRestore(indexMetaData,
368370
new SnapshotRecoverySource(
369-
restoreUUID, snapshot, Version.CURRENT, indexMetaData.getIndex().getName()));
371+
restoreUUID, snapshot, Version.CURRENT,
372+
new IndexId(indexMetaData.getIndex().getName(), UUIDs.randomBase64UUID(random()))));
370373
break;
371374
case 5:
372375
routingTableBuilder.addAsNew(indexMetaData);

server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.common.UUIDs;
4242
import org.elasticsearch.common.collect.ImmutableOpenMap;
4343
import org.elasticsearch.index.shard.ShardId;
44+
import org.elasticsearch.repositories.IndexId;
4445
import org.elasticsearch.snapshots.Snapshot;
4546
import org.elasticsearch.snapshots.SnapshotId;
4647

@@ -205,6 +206,7 @@ private Decision executeAllocation(final ClusterState clusterState, final ShardR
205206

206207
private RecoverySource.SnapshotRecoverySource createSnapshotRecoverySource(final String snapshotName) {
207208
Snapshot snapshot = new Snapshot("_repository", new SnapshotId(snapshotName, "_uuid"));
208-
return new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test");
209+
return new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT,
210+
new IndexId("test", UUIDs.randomBase64UUID(random())));
209211
}
210212
}

server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.elasticsearch.common.util.set.Sets;
4949
import org.elasticsearch.env.ShardLockObtainFailedException;
5050
import org.elasticsearch.index.shard.ShardId;
51+
import org.elasticsearch.repositories.IndexId;
5152
import org.elasticsearch.snapshots.Snapshot;
5253
import org.elasticsearch.snapshots.SnapshotId;
5354
import org.junit.Before;
@@ -392,7 +393,8 @@ private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocat
392393
final Snapshot snapshot = new Snapshot("test", new SnapshotId("test", UUIDs.randomBase64UUID()));
393394
RoutingTable routingTable = RoutingTable.builder()
394395
.addAsRestore(metaData.index(shardId.getIndex()),
395-
new SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, shardId.getIndexName()))
396+
new SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT,
397+
new IndexId(shardId.getIndexName(), UUIDs.randomBase64UUID(random()))))
396398
.build();
397399
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
398400
.metaData(metaData)

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -2342,7 +2342,8 @@ public void testRestoreShard() throws IOException {
23422342
RecoverySource.ExistingStoreRecoverySource.INSTANCE);
23432343
final Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID()));
23442344
routing = ShardRoutingHelper.newWithRestoreSource(routing,
2345-
new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test"));
2345+
new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT,
2346+
new IndexId("test", UUIDs.randomBase64UUID(random()))));
23462347
target = reinitShard(target, routing);
23472348
Store sourceStore = source.store();
23482349
Store targetStore = target.store();

server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.lucene.analysis.TokenStream;
2323
import org.elasticsearch.ElasticsearchException;
2424
import org.elasticsearch.Version;
25+
import org.elasticsearch.action.ActionRunnable;
2526
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
2627
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
2728
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
@@ -84,6 +85,9 @@
8485
import org.elasticsearch.plugins.AnalysisPlugin;
8586
import org.elasticsearch.plugins.Plugin;
8687
import org.elasticsearch.plugins.PluginsService;
88+
import org.elasticsearch.repositories.RepositoriesService;
89+
import org.elasticsearch.repositories.Repository;
90+
import org.elasticsearch.repositories.RepositoryData;
8791
import org.elasticsearch.snapshots.Snapshot;
8892
import org.elasticsearch.snapshots.SnapshotState;
8993
import org.elasticsearch.test.BackgroundIndexer;
@@ -96,6 +100,7 @@
96100
import org.elasticsearch.test.store.MockFSIndexStore;
97101
import org.elasticsearch.test.transport.MockTransportService;
98102
import org.elasticsearch.test.transport.StubbableTransport;
103+
import org.elasticsearch.threadpool.ThreadPool;
99104
import org.elasticsearch.transport.ConnectTransportException;
100105
import org.elasticsearch.transport.Transport;
101106
import org.elasticsearch.transport.TransportRequest;
@@ -644,6 +649,10 @@ public void testSnapshotRecovery() throws Exception {
644649
logger.info("--> request recoveries");
645650
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
646651

652+
ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class);
653+
Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(REPO_NAME);
654+
final RepositoryData repositoryData = PlainActionFuture.get(f ->
655+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(f, repository::getRepositoryData)));
647656
for (Map.Entry<String, List<RecoveryState>> indexRecoveryStates : response.shardRecoveryStates().entrySet()) {
648657

649658
assertThat(indexRecoveryStates.getKey(), equalTo(INDEX_NAME));
@@ -654,7 +663,7 @@ public void testSnapshotRecovery() throws Exception {
654663
SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(
655664
((SnapshotRecoverySource)recoveryState.getRecoverySource()).restoreUUID(),
656665
new Snapshot(REPO_NAME, createSnapshotResponse.getSnapshotInfo().snapshotId()),
657-
Version.CURRENT, INDEX_NAME);
666+
Version.CURRENT, repositoryData.resolveIndexId(INDEX_NAME));
658667
assertRecoveryState(recoveryState, 0, recoverySource, true, Stage.DONE, null, nodeA);
659668
validateIndexRecoveryState(recoveryState.getIndex());
660669
}

0 commit comments

Comments
 (0)