Skip to content

Track Snapshot Version in RepositoryData (#50930) #50989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) {
* @param <T> The list element type.
* @return A comma-separated string of the first few elements.
*/
static <T> String firstListElementsToCommaDelimitedString(List<T> elements, Function<T, String> formatter, boolean isDebugEnabled) {
public static <T> String firstListElementsToCommaDelimitedString(List<T> elements, Function<T, String> formatter,
boolean isDebugEnabled) {
final int maxNumberOfElements = 10;
if (isDebugEnabled || elements.size() <= maxNumberOfElements) {
return elements.stream().map(formatter).collect(Collectors.joining(", "));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class RepositoryData {
* An instance initialized for an empty repository.
*/
public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN,
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);

/**
* The generational id of the index file from which the repository data was read.
Expand All @@ -92,26 +92,44 @@ public final class RepositoryData {
*/
private final Map<IndexId, Set<SnapshotId>> indexSnapshots;

private final Map<String, Version> snapshotVersions;

/**
* Shard generations.
*/
private final ShardGenerations shardGenerations;

public RepositoryData(long genId, Map<String, SnapshotId> snapshotIds, Map<String, SnapshotState> snapshotStates,
Map<IndexId, Set<SnapshotId>> indexSnapshots, ShardGenerations shardGenerations) {
Map<String, Version> snapshotVersions, Map<IndexId, Set<SnapshotId>> indexSnapshots,
ShardGenerations shardGenerations) {
this.genId = genId;
this.snapshotIds = Collections.unmodifiableMap(snapshotIds);
this.snapshotStates = Collections.unmodifiableMap(snapshotStates);
this.indices = Collections.unmodifiableMap(indexSnapshots.keySet().stream()
.collect(Collectors.toMap(IndexId::getName, Function.identity())));
this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
this.shardGenerations = Objects.requireNonNull(shardGenerations);
this.snapshotVersions = snapshotVersions;
assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
+ shardGenerations.indices() + " but snapshots only reference indices " + indices.values();
}

protected RepositoryData copy() {
return new RepositoryData(genId, snapshotIds, snapshotStates, indexSnapshots, shardGenerations);
return new RepositoryData(genId, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
}

/**
* Creates a copy of this instance that contains updated version data.
* @param versions map of snapshot versions
* @return copy with updated version data
*/
public RepositoryData withVersions(Map<SnapshotId, Version> versions) {
if (versions.isEmpty()) {
return this;
}
final Map<String, Version> newVersions = new HashMap<>(snapshotVersions);
versions.forEach((id, version) -> newVersions.put(id.getUUID(), version));
return new RepositoryData(genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations);
}

public ShardGenerations shardGenerations() {
Expand Down Expand Up @@ -141,6 +159,14 @@ public SnapshotState getSnapshotState(final SnapshotId snapshotId) {
return snapshotStates.get(snapshotId.getUUID());
}

/**
* Returns the {@link Version} for the given snapshot or {@code null} if unknown.
*/
@Nullable
public Version getVersion(SnapshotId snapshotId) {
return snapshotVersions.get(snapshotId.getUUID());
}

/**
* Returns an unmodifiable map of the index names to {@link IndexId} in the repository.
*/
Expand Down Expand Up @@ -173,6 +199,7 @@ public List<IndexId> indicesToUpdateAfterRemovingSnapshot(SnapshotId snapshotId)
*/
public RepositoryData addSnapshot(final SnapshotId snapshotId,
final SnapshotState snapshotState,
final Version version,
final ShardGenerations shardGenerations) {
if (snapshotIds.containsKey(snapshotId.getUUID())) {
// if the snapshot id already exists in the repository data, it means an old master
Expand All @@ -184,11 +211,13 @@ public RepositoryData addSnapshot(final SnapshotId snapshotId,
snapshots.put(snapshotId.getUUID(), snapshotId);
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
newSnapshotStates.put(snapshotId.getUUID(), snapshotState);
Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
newSnapshotVersions.put(snapshotId.getUUID(), version);
Map<IndexId, Set<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
for (final IndexId indexId : shardGenerations.indices()) {
allIndexSnapshots.computeIfAbsent(indexId, k -> new LinkedHashSet<>()).add(snapshotId);
}
return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots,
return new RepositoryData(genId, snapshots, newSnapshotStates, newSnapshotVersions, allIndexSnapshots,
ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build());
}

Expand All @@ -202,7 +231,7 @@ public RepositoryData withGenId(long newGeneration) {
if (newGeneration == genId) {
return this;
}
return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots, this.shardGenerations);
return new RepositoryData(newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
}

/**
Expand All @@ -222,6 +251,8 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGen
}
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
newSnapshotStates.remove(snapshotId.getUUID());
final Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
newSnapshotVersions.remove(snapshotId.getUUID());
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
for (final IndexId indexId : indices.values()) {
Set<SnapshotId> set;
Expand All @@ -241,7 +272,7 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGen
indexSnapshots.put(indexId, set);
}

return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots,
return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots,
ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations)
.retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build()
);
Expand Down Expand Up @@ -269,14 +300,15 @@ public boolean equals(Object obj) {
RepositoryData that = (RepositoryData) obj;
return snapshotIds.equals(that.snapshotIds)
&& snapshotStates.equals(that.snapshotStates)
&& snapshotVersions.equals(that.snapshotVersions)
&& indices.equals(that.indices)
&& indexSnapshots.equals(that.indexSnapshots)
&& shardGenerations.equals(that.shardGenerations);
}

@Override
public int hashCode() {
return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots, shardGenerations);
return Objects.hash(snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations);
}

/**
Expand Down Expand Up @@ -323,6 +355,7 @@ public List<IndexId> resolveNewIndices(final List<String> indicesToResolve) {
private static final String NAME = "name";
private static final String UUID = "uuid";
private static final String STATE = "state";
private static final String VERSION = "version";
private static final String MIN_VERSION = "min_version";

/**
Expand All @@ -339,6 +372,9 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
if (snapshotStates.containsKey(snapshot.getUUID())) {
builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value());
}
if (snapshotVersions.containsKey(snapshot.getUUID())) {
builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString());
}
builder.endObject();
}
builder.endArray();
Expand Down Expand Up @@ -378,6 +414,7 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
public static RepositoryData snapshotsFromXContent(final XContentParser parser, long genId) throws IOException {
final Map<String, SnapshotId> snapshots = new HashMap<>();
final Map<String, SnapshotState> snapshotStates = new HashMap<>();
final Map<String, Version> snapshotVersions = new HashMap<>();
final Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();

Expand All @@ -390,6 +427,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
String name = null;
String uuid = null;
SnapshotState state = null;
Version version = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
Expand All @@ -399,12 +437,17 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
uuid = parser.text();
} else if (STATE.equals(currentFieldName)) {
state = SnapshotState.fromValue(parser.numberValue().byteValue());
} else if (VERSION.equals(currentFieldName)) {
version = Version.fromString(parser.text());
}
}
final SnapshotId snapshotId = new SnapshotId(name, uuid);
if (state != null) {
snapshotStates.put(uuid, state);
}
if (version != null) {
snapshotVersions.put(uuid, version);
}
snapshots.put(snapshotId.getUUID(), snapshotId);
}
} else {
Expand Down Expand Up @@ -488,7 +531,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
} else {
throw new ElasticsearchParseException("start object expected");
}
return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots, shardGenerations.build());
return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
Expand All @@ -46,6 +47,7 @@
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Numbers;
Expand Down Expand Up @@ -122,6 +124,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -886,7 +889,7 @@ public void finalizeSnapshot(final SnapshotId snapshotId,
final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
final RepositoryData updatedRepositoryData =
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations);
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations);
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(v -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
Expand Down Expand Up @@ -1265,8 +1268,42 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});

final StepListener<RepositoryData> filterRepositoryDataStep = new StepListener<>();

// Step 2: Write new index-N blob to repository and update index.latest
setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
// BwC logic: Load snapshot version information if any snapshot is missing a version in RepositoryData so that the new
// RepositoryData contains a version for every snapshot
final List<SnapshotId> snapshotIdsWithoutVersion = repositoryData.getSnapshotIds().stream().filter(
snapshotId -> repositoryData.getVersion(snapshotId) == null).collect(Collectors.toList());
if (snapshotIdsWithoutVersion.isEmpty() == false) {
final Map<SnapshotId, Version> updatedVersionMap = new ConcurrentHashMap<>();
final GroupedActionListener<Void> loadAllVersionsListener = new GroupedActionListener<>(
ActionListener.runAfter(
new ActionListener<Collection<Void>>() {
@Override
public void onResponse(Collection<Void> voids) {
logger.info("Successfully loaded all snapshot's version information for {} from snapshot metadata",
AllocationService.firstListElementsToCommaDelimitedString(
snapshotIdsWithoutVersion, SnapshotId::toString, logger.isDebugEnabled()));
}

@Override
public void onFailure(Exception e) {
logger.warn("Failure when trying to load missing version information from snapshot metadata", e);
}
}, () -> filterRepositoryDataStep.onResponse(repositoryData.withVersions(updatedVersionMap))),
snapshotIdsWithoutVersion.size());
for (SnapshotId snapshotId : snapshotIdsWithoutVersion) {
threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(loadAllVersionsListener, () ->
updatedVersionMap.put(snapshotId, getSnapshotInfo(snapshotId).version())));
}
} else {
filterRepositoryDataStep.onResponse(repositoryData);
}
})), listener::onFailure);
filterRepositoryDataStep.whenComplete(filteredRepositoryData -> {
final long newGen = setPendingStep.result();
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + latestKnownRepoGen.get()
Expand All @@ -1276,7 +1313,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
Expand Down Expand Up @@ -1310,13 +1347,13 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Exception e) {
l.onFailure(
listener.onFailure(
new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e));
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> {
// Delete all now outdated index files up to 1000 blobs back from the new generation.
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
// Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep
Expand All @@ -1333,7 +1370,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}));
}
});
})), listener::onFailure);
}, listener::onFailure);
}

private RepositoryMetaData getRepoMetaData(ClusterState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,12 @@ public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repo
} else {
try {
final Repository repository = repositoriesService.repository(repositoryName);
hasOldFormatSnapshots = snapshotIds.stream().map(repository::getSnapshotInfo).anyMatch(
snapshotInfo -> (excluded == null || snapshotInfo.snapshotId().equals(excluded) == false)
&& snapshotInfo.version().before(SHARD_GEN_IN_REPO_DATA_VERSION));
hasOldFormatSnapshots = snapshotIds.stream().filter(snapshotId -> snapshotId.equals(excluded) == false).anyMatch(
snapshotId -> {
final Version known = repositoryData.getVersion(snapshotId);
return (known == null ? repository.getSnapshotInfo(snapshotId).version() : known)
.before(SHARD_GEN_IN_REPO_DATA_VERSION);
});
} catch (SnapshotMissingException e) {
logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e);
return true;
Expand Down
Loading