Skip to content

Remove Redundant Loading of RepositoryData during Restore #51977

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;

import java.io.IOException;
Expand Down Expand Up @@ -213,21 +215,25 @@ public String toString() {
public static class SnapshotRecoverySource extends RecoverySource {
private final String restoreUUID;
private final Snapshot snapshot;
private final String index;
private final IndexId index;
private final Version version;

public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, String index) {
public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId) {
this.restoreUUID = restoreUUID;
this.snapshot = Objects.requireNonNull(snapshot);
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(index);
this.index = Objects.requireNonNull(indexId);
}

SnapshotRecoverySource(StreamInput in) throws IOException {
restoreUUID = in.readString();
snapshot = new Snapshot(in);
version = Version.readVersion(in);
index = in.readString();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the approach here does not allow us to later switch to just reading directly IndexId from stream after backport to 7.x.

Can you change things so that you conditionally do in.readString(); or new IndexId(in)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tell me if I'm missing something here. But doesn't this break in the edge case of:

  1. Running mixed cluster with old version master
  2. Fail over to new version master
  3. New version master sends the existing recovery source over the wire because it still has null for the uuid because the old master didn't add that when creating the RecoverySource? (am I missing a spot where this is upgraded/recreated during master fail-over?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should differentiate the in.readString() / new IndexId(in) serialization part and make the logic only rely on IndexId, both on master and 7.x, and INDEX_UUID_NA_VALUE to resolve the index id later when needed?

index = new IndexId(in);
} else {
index = new IndexId(in.readString(), IndexMetaData.INDEX_UUID_NA_VALUE);
}
}

public String restoreUUID() {
Expand All @@ -238,7 +244,13 @@ public Snapshot snapshot() {
return snapshot;
}

public String index() {
/**
* Gets the {@link IndexId} of the recovery source. May contain {@link IndexMetaData#INDEX_UUID_NA_VALUE} as the index uuid if it
* was created by an older version master in a mixed version cluster.
*
* @return IndexId
*/
public IndexId index() {
return index;
}

Expand All @@ -251,7 +263,11 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeString(restoreUUID);
snapshot.writeTo(out);
Version.writeVersion(version, out);
out.writeString(index);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
index.writeTo(out);
} else {
out.writeString(index.getName());
}
}

@Override
Expand All @@ -264,7 +280,7 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
builder.field("repository", snapshot.getRepository())
.field("snapshot", snapshot.getSnapshotId().getName())
.field("version", version.toString())
.field("index", index)
.field("index", index.getName())
.field("restoreUUID", restoreUUID);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -473,20 +474,24 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov
translogState.totalOperationsOnStart(0);
indexShard.prepareForIndexRecovery();
final ShardId snapshotShardId;
final String indexName = restoreSource.index();
if (!shardId.getIndexName().equals(indexName)) {
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
} else {
final IndexId indexId = restoreSource.index();
if (shardId.getIndexName().equals(indexId.getName())) {
snapshotShardId = shardId;
} else {
snapshotShardId = new ShardId(indexId.getName(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
}
final StepListener<IndexId> indexIdListener = new StepListener<>();
// If the index UUID was not found in the recovery source we will have to load RepositoryData and resolve it by index name
if (indexId.getId().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
// BwC path, running against an old version master that did not add the IndexId to the recovery source
repository.getRepositoryData(ActionListener.map(
indexIdListener, repositoryData -> repositoryData.resolveIndexId(indexId.getName())));
} else {
indexIdListener.onResponse(indexId);
}
repository.getRepositoryData(ActionListener.wrap(
repositoryData -> {
final IndexId indexId = repositoryData.resolveIndexId(indexName);
assert indexShard.getEngineOrNull() == null;
repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId,
indexShard.recoveryState(), restoreListener);
}, restoreListener::onFailure
));
assert indexShard.getEngineOrNull() == null;
indexIdListener.whenComplete(idx -> repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(),
idx, snapshotShardId, indexShard.recoveryState(), restoreListener), restoreListener::onFailure);
} catch (Exception e) {
restoreListener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ public ClusterState execute(ClusterState currentState) {
for (Map.Entry<String, String> indexEntry : indices.entrySet()) {
String index = indexEntry.getValue();
boolean partial = checkPartial(index);
SnapshotRecoverySource recoverySource =
new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index);
SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(restoreUUID, snapshot,
snapshotInfo.version(), repositoryData.resolveIndexId(index));
String renamedIndexName = indexEntry.getKey();
IndexMetaData snapshotIndexMetaData = metaData.index(index);
snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.snapshots.Snapshot;
Expand Down Expand Up @@ -165,7 +166,8 @@ public void testEqualsIgnoringVersion() {
otherRouting = new ShardRouting(otherRouting.shardId(), otherRouting.currentNodeId(),
otherRouting.relocatingNodeId(), otherRouting.primary(), otherRouting.state(),
new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), new Snapshot("test",
new SnapshotId("s1", UUIDs.randomBase64UUID())), Version.CURRENT, "test"),
new SnapshotId("s1", UUIDs.randomBase64UUID())), Version.CURRENT, new IndexId("test",
UUIDs.randomBase64UUID(random()))),
otherRouting.unassignedInfo(), otherRouting.allocationId(), otherRouting.getExpectedShardSize());
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;

Expand Down Expand Up @@ -151,7 +152,8 @@ public void testNewIndexRestored() {
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNewRestore(metaData.index("test"), new SnapshotRecoverySource(
UUIDs.randomBase64UUID(),
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT, "test"),
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT,
new IndexId("test", UUIDs.randomBase64UUID(random()))),
new IntHashSet()).build()).build();
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NEW_INDEX_RESTORED));
Expand All @@ -168,7 +170,8 @@ public void testExistingIndexRestored() {
.routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"),
new SnapshotRecoverySource(
UUIDs.randomBase64UUID(), new Snapshot("rep1",
new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT, "test")).build()).build();
new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT,
new IndexId("test", UUIDs.randomBase64UUID(random())))).build()).build();
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.VersionUtils;
Expand Down Expand Up @@ -366,7 +367,7 @@ public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() {
new SnapshotRecoverySource(
UUIDs.randomBase64UUID(),
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
Version.CURRENT, "test")).build())
Version.CURRENT, new IndexId("test", UUIDs.randomBase64UUID(random())))).build())
.nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2)).build();
AllocationDeciders allocationDeciders = new AllocationDeciders(Arrays.asList(
new ReplicaAfterPrimaryActiveAllocationDecider(),
Expand Down Expand Up @@ -480,14 +481,15 @@ public void testMessages() {
assertThat(decision.getExplanation(), is("cannot relocate primary shard from a node with version [" +
newNode.node().getVersion() + "] to a node with older version [" + oldNode.node().getVersion() + "]"));

final IndexId indexId = new IndexId("test", UUIDs.randomBase64UUID(random()));
final SnapshotRecoverySource newVersionSnapshot = new SnapshotRecoverySource(
UUIDs.randomBase64UUID(),
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
newNode.node().getVersion(), "test");
newNode.node().getVersion(), indexId);
final SnapshotRecoverySource oldVersionSnapshot = new SnapshotRecoverySource(
UUIDs.randomBase64UUID(),
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
oldNode.node().getVersion(), "test");
oldNode.node().getVersion(), indexId);

decision = allocationDecider.canAllocate(ShardRoutingHelper.newWithRestoreSource(primaryShard, newVersionSnapshot),
oldNode, routingAllocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
Expand Down Expand Up @@ -360,13 +361,15 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat
snapshotIndices.add(index.getName());
routingTableBuilder.addAsNewRestore(indexMetaData,
new SnapshotRecoverySource(
restoreUUID, snapshot, Version.CURRENT, indexMetaData.getIndex().getName()), new IntHashSet());
restoreUUID, snapshot, Version.CURRENT,
new IndexId(indexMetaData.getIndex().getName(), UUIDs.randomBase64UUID(random()))), new IntHashSet());
break;
case 4:
snapshotIndices.add(index.getName());
routingTableBuilder.addAsRestore(indexMetaData,
new SnapshotRecoverySource(
restoreUUID, snapshot, Version.CURRENT, indexMetaData.getIndex().getName()));
restoreUUID, snapshot, Version.CURRENT,
new IndexId(indexMetaData.getIndex().getName(), UUIDs.randomBase64UUID(random()))));
break;
case 5:
routingTableBuilder.addAsNew(indexMetaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;

Expand Down Expand Up @@ -207,6 +208,7 @@ private Decision executeAllocation(final ClusterState clusterState, final ShardR

private RecoverySource.SnapshotRecoverySource createSnapshotRecoverySource(final String snapshotName) {
Snapshot snapshot = new Snapshot("_repository", new SnapshotId(snapshotName, "_uuid"));
return new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test");
return new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT,
new IndexId("test", UUIDs.randomBase64UUID(random())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.junit.Before;
Expand Down Expand Up @@ -392,7 +393,8 @@ private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocat
final Snapshot snapshot = new Snapshot("test", new SnapshotId("test", UUIDs.randomBase64UUID()));
RoutingTable routingTable = RoutingTable.builder()
.addAsRestore(metaData.index(shardId.getIndex()),
new SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, shardId.getIndexName()))
new SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT,
new IndexId(shardId.getIndexName(), UUIDs.randomBase64UUID(random()))))
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2327,7 +2327,8 @@ public void testRestoreShard() throws IOException {
RecoverySource.ExistingStoreRecoverySource.INSTANCE);
final Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID()));
routing = ShardRoutingHelper.newWithRestoreSource(routing,
new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test"));
new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT,
new IndexId("test", UUIDs.randomBase64UUID(random()))));
target = reinitShard(target, routing);
Store sourceStore = source.store();
Store targetStore = target.store();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.analysis.TokenStream;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand Down Expand Up @@ -85,6 +86,9 @@
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.BackgroundIndexer;
Expand All @@ -97,6 +101,7 @@
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportRequest;
Expand Down Expand Up @@ -651,6 +656,10 @@ public void testSnapshotRecovery() throws Exception {
logger.info("--> request recoveries");
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();

ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class);
Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(REPO_NAME);
final RepositoryData repositoryData = PlainActionFuture.get(f ->
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(f, repository::getRepositoryData)));
for (Map.Entry<String, List<RecoveryState>> indexRecoveryStates : response.shardRecoveryStates().entrySet()) {

assertThat(indexRecoveryStates.getKey(), equalTo(INDEX_NAME));
Expand All @@ -661,7 +670,7 @@ public void testSnapshotRecovery() throws Exception {
SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(
((SnapshotRecoverySource)recoveryState.getRecoverySource()).restoreUUID(),
new Snapshot(REPO_NAME, createSnapshotResponse.getSnapshotInfo().snapshotId()),
Version.CURRENT, INDEX_NAME);
Version.CURRENT, repositoryData.resolveIndexId(INDEX_NAME));
assertRecoveryState(recoveryState, 0, recoverySource, true, Stage.DONE, null, nodeA);
validateIndexRecoveryState(recoveryState.getIndex());
}
Expand Down
Loading