Skip to content

Commit 7e1879f

Browse files
Track Snapshot Version in RepositoryData (elastic#50930)
Add tracking of snapshot versions to RepositoryData to make BwC logic more efficient. Follow up to elastic#50853
1 parent 6e8ea7a commit 7e1879f

File tree

9 files changed

+148
-27
lines changed

9 files changed

+148
-27
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,8 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) {
331331
* @param <T> The list element type.
332332
* @return A comma-separated string of the first few elements.
333333
*/
334-
static <T> String firstListElementsToCommaDelimitedString(List<T> elements, Function<T, String> formatter, boolean isDebugEnabled) {
334+
public static <T> String firstListElementsToCommaDelimitedString(List<T> elements, Function<T, String> formatter,
335+
boolean isDebugEnabled) {
335336
final int maxNumberOfElements = 10;
336337
if (isDebugEnabled || elements.size() <= maxNumberOfElements) {
337338
return elements.stream().map(formatter).collect(Collectors.joining(", "));

server/src/main/java/org/elasticsearch/repositories/RepositoryData.java

+51-8
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public final class RepositoryData {
6969
* An instance initialized for an empty repository.
7070
*/
7171
public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN,
72-
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
72+
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
7373

7474
/**
7575
* The generational id of the index file from which the repository data was read.
@@ -92,26 +92,44 @@ public final class RepositoryData {
9292
*/
9393
private final Map<IndexId, Set<SnapshotId>> indexSnapshots;
9494

95+
private final Map<String, Version> snapshotVersions;
96+
9597
/**
9698
* Shard generations.
9799
*/
98100
private final ShardGenerations shardGenerations;
99101

100102
public RepositoryData(long genId, Map<String, SnapshotId> snapshotIds, Map<String, SnapshotState> snapshotStates,
101-
Map<IndexId, Set<SnapshotId>> indexSnapshots, ShardGenerations shardGenerations) {
103+
Map<String, Version> snapshotVersions, Map<IndexId, Set<SnapshotId>> indexSnapshots,
104+
ShardGenerations shardGenerations) {
102105
this.genId = genId;
103106
this.snapshotIds = Collections.unmodifiableMap(snapshotIds);
104107
this.snapshotStates = Collections.unmodifiableMap(snapshotStates);
105108
this.indices = Collections.unmodifiableMap(indexSnapshots.keySet().stream()
106109
.collect(Collectors.toMap(IndexId::getName, Function.identity())));
107110
this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
108111
this.shardGenerations = Objects.requireNonNull(shardGenerations);
112+
this.snapshotVersions = snapshotVersions;
109113
assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
110114
+ shardGenerations.indices() + " but snapshots only reference indices " + indices.values();
111115
}
112116

113117
protected RepositoryData copy() {
114-
return new RepositoryData(genId, snapshotIds, snapshotStates, indexSnapshots, shardGenerations);
118+
return new RepositoryData(genId, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
119+
}
120+
121+
/**
122+
* Creates a copy of this instance that contains updated version data.
123+
* @param versions map of snapshot versions
124+
* @return copy with updated version data
125+
*/
126+
public RepositoryData withVersions(Map<SnapshotId, Version> versions) {
127+
if (versions.isEmpty()) {
128+
return this;
129+
}
130+
final Map<String, Version> newVersions = new HashMap<>(snapshotVersions);
131+
versions.forEach((id, version) -> newVersions.put(id.getUUID(), version));
132+
return new RepositoryData(genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations);
115133
}
116134

117135
public ShardGenerations shardGenerations() {
@@ -141,6 +159,14 @@ public SnapshotState getSnapshotState(final SnapshotId snapshotId) {
141159
return snapshotStates.get(snapshotId.getUUID());
142160
}
143161

162+
/**
163+
* Returns the {@link Version} for the given snapshot or {@code null} if unknown.
164+
*/
165+
@Nullable
166+
public Version getVersion(SnapshotId snapshotId) {
167+
return snapshotVersions.get(snapshotId.getUUID());
168+
}
169+
144170
/**
145171
* Returns an unmodifiable map of the index names to {@link IndexId} in the repository.
146172
*/
@@ -173,6 +199,7 @@ public List<IndexId> indicesToUpdateAfterRemovingSnapshot(SnapshotId snapshotId)
173199
*/
174200
public RepositoryData addSnapshot(final SnapshotId snapshotId,
175201
final SnapshotState snapshotState,
202+
final Version version,
176203
final ShardGenerations shardGenerations) {
177204
if (snapshotIds.containsKey(snapshotId.getUUID())) {
178205
// if the snapshot id already exists in the repository data, it means an old master
@@ -184,11 +211,13 @@ public RepositoryData addSnapshot(final SnapshotId snapshotId,
184211
snapshots.put(snapshotId.getUUID(), snapshotId);
185212
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
186213
newSnapshotStates.put(snapshotId.getUUID(), snapshotState);
214+
Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
215+
newSnapshotVersions.put(snapshotId.getUUID(), version);
187216
Map<IndexId, Set<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
188217
for (final IndexId indexId : shardGenerations.indices()) {
189218
allIndexSnapshots.computeIfAbsent(indexId, k -> new LinkedHashSet<>()).add(snapshotId);
190219
}
191-
return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots,
220+
return new RepositoryData(genId, snapshots, newSnapshotStates, newSnapshotVersions, allIndexSnapshots,
192221
ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build());
193222
}
194223

@@ -202,7 +231,7 @@ public RepositoryData withGenId(long newGeneration) {
202231
if (newGeneration == genId) {
203232
return this;
204233
}
205-
return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots, this.shardGenerations);
234+
return new RepositoryData(newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
206235
}
207236

208237
/**
@@ -222,6 +251,8 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGen
222251
}
223252
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
224253
newSnapshotStates.remove(snapshotId.getUUID());
254+
final Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
255+
newSnapshotVersions.remove(snapshotId.getUUID());
225256
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
226257
for (final IndexId indexId : indices.values()) {
227258
Set<SnapshotId> set;
@@ -241,7 +272,7 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGen
241272
indexSnapshots.put(indexId, set);
242273
}
243274

244-
return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots,
275+
return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots,
245276
ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations)
246277
.retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build()
247278
);
@@ -269,14 +300,15 @@ public boolean equals(Object obj) {
269300
RepositoryData that = (RepositoryData) obj;
270301
return snapshotIds.equals(that.snapshotIds)
271302
&& snapshotStates.equals(that.snapshotStates)
303+
&& snapshotVersions.equals(that.snapshotVersions)
272304
&& indices.equals(that.indices)
273305
&& indexSnapshots.equals(that.indexSnapshots)
274306
&& shardGenerations.equals(that.shardGenerations);
275307
}
276308

277309
@Override
278310
public int hashCode() {
279-
return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots, shardGenerations);
311+
return Objects.hash(snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations);
280312
}
281313

282314
/**
@@ -323,6 +355,7 @@ public List<IndexId> resolveNewIndices(final List<String> indicesToResolve) {
323355
private static final String NAME = "name";
324356
private static final String UUID = "uuid";
325357
private static final String STATE = "state";
358+
private static final String VERSION = "version";
326359
private static final String MIN_VERSION = "min_version";
327360

328361
/**
@@ -339,6 +372,9 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
339372
if (snapshotStates.containsKey(snapshot.getUUID())) {
340373
builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value());
341374
}
375+
if (snapshotVersions.containsKey(snapshot.getUUID())) {
376+
builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString());
377+
}
342378
builder.endObject();
343379
}
344380
builder.endArray();
@@ -378,6 +414,7 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
378414
public static RepositoryData snapshotsFromXContent(final XContentParser parser, long genId) throws IOException {
379415
final Map<String, SnapshotId> snapshots = new HashMap<>();
380416
final Map<String, SnapshotState> snapshotStates = new HashMap<>();
417+
final Map<String, Version> snapshotVersions = new HashMap<>();
381418
final Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
382419
final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();
383420

@@ -390,6 +427,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
390427
String name = null;
391428
String uuid = null;
392429
SnapshotState state = null;
430+
Version version = null;
393431
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
394432
String currentFieldName = parser.currentName();
395433
parser.nextToken();
@@ -399,12 +437,17 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
399437
uuid = parser.text();
400438
} else if (STATE.equals(currentFieldName)) {
401439
state = SnapshotState.fromValue(parser.numberValue().byteValue());
440+
} else if (VERSION.equals(currentFieldName)) {
441+
version = Version.fromString(parser.text());
402442
}
403443
}
404444
final SnapshotId snapshotId = new SnapshotId(name, uuid);
405445
if (state != null) {
406446
snapshotStates.put(uuid, state);
407447
}
448+
if (version != null) {
449+
snapshotVersions.put(uuid, version);
450+
}
408451
snapshots.put(snapshotId.getUUID(), snapshotId);
409452
}
410453
} else {
@@ -488,7 +531,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
488531
} else {
489532
throw new ElasticsearchParseException("start object expected");
490533
}
491-
return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots, shardGenerations.build());
534+
return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build());
492535
}
493536

494537
}

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+42-5
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.lucene.store.RateLimiter;
3333
import org.apache.lucene.util.SetOnce;
3434
import org.elasticsearch.ExceptionsHelper;
35+
import org.elasticsearch.Version;
3536
import org.elasticsearch.action.ActionListener;
3637
import org.elasticsearch.action.ActionRunnable;
3738
import org.elasticsearch.action.StepListener;
@@ -46,6 +47,7 @@
4647
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
4748
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
4849
import org.elasticsearch.cluster.node.DiscoveryNode;
50+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
4951
import org.elasticsearch.cluster.service.ClusterService;
5052
import org.elasticsearch.common.Nullable;
5153
import org.elasticsearch.common.Numbers;
@@ -122,6 +124,7 @@
122124
import java.util.Map;
123125
import java.util.Set;
124126
import java.util.concurrent.BlockingQueue;
127+
import java.util.concurrent.ConcurrentHashMap;
125128
import java.util.concurrent.Executor;
126129
import java.util.concurrent.LinkedBlockingQueue;
127130
import java.util.concurrent.TimeUnit;
@@ -886,7 +889,7 @@ public void finalizeSnapshot(final SnapshotId snapshotId,
886889
final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
887890
getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
888891
final RepositoryData updatedRepositoryData =
889-
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations);
892+
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations);
890893
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(v -> {
891894
if (writeShardGens) {
892895
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
@@ -1265,8 +1268,42 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
12651268
}
12661269
});
12671270

1271+
final StepListener<RepositoryData> filterRepositoryDataStep = new StepListener<>();
1272+
12681273
// Step 2: Write new index-N blob to repository and update index.latest
12691274
setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
1275+
// BwC logic: Load snapshot version information if any snapshot is missing a version in RepositoryData so that the new
1276+
// RepositoryData contains a version for every snapshot
1277+
final List<SnapshotId> snapshotIdsWithoutVersion = repositoryData.getSnapshotIds().stream().filter(
1278+
snapshotId -> repositoryData.getVersion(snapshotId) == null).collect(Collectors.toList());
1279+
if (snapshotIdsWithoutVersion.isEmpty() == false) {
1280+
final Map<SnapshotId, Version> updatedVersionMap = new ConcurrentHashMap<>();
1281+
final GroupedActionListener<Void> loadAllVersionsListener = new GroupedActionListener<>(
1282+
ActionListener.runAfter(
1283+
new ActionListener<>() {
1284+
@Override
1285+
public void onResponse(Collection<Void> voids) {
1286+
logger.info("Successfully loaded all snapshot's version information for {} from snapshot metadata",
1287+
AllocationService.firstListElementsToCommaDelimitedString(
1288+
snapshotIdsWithoutVersion, SnapshotId::toString, logger.isDebugEnabled()));
1289+
}
1290+
1291+
@Override
1292+
public void onFailure(Exception e) {
1293+
logger.warn("Failure when trying to load missing version information from snapshot metadata", e);
1294+
}
1295+
}, () -> filterRepositoryDataStep.onResponse(repositoryData.withVersions(updatedVersionMap))),
1296+
snapshotIdsWithoutVersion.size());
1297+
for (SnapshotId snapshotId : snapshotIdsWithoutVersion) {
1298+
threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(loadAllVersionsListener, () ->
1299+
updatedVersionMap.put(snapshotId, getSnapshotInfo(snapshotId).version())));
1300+
}
1301+
} else {
1302+
filterRepositoryDataStep.onResponse(repositoryData);
1303+
}
1304+
})), listener::onFailure);
1305+
filterRepositoryDataStep.whenComplete(filteredRepositoryData -> {
1306+
final long newGen = setPendingStep.result();
12701307
if (latestKnownRepoGen.get() >= newGen) {
12711308
throw new IllegalArgumentException(
12721309
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + latestKnownRepoGen.get()
@@ -1276,7 +1313,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
12761313
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
12771314
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
12781315
writeAtomic(indexBlob,
1279-
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
1316+
BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
12801317
// write the current generation to the index-latest file
12811318
final BytesReference genBytes;
12821319
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
@@ -1310,13 +1347,13 @@ public ClusterState execute(ClusterState currentState) {
13101347

13111348
@Override
13121349
public void onFailure(String source, Exception e) {
1313-
l.onFailure(
1350+
listener.onFailure(
13141351
new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e));
13151352
}
13161353

13171354
@Override
13181355
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
1319-
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
1356+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> {
13201357
// Delete all now outdated index files up to 1000 blobs back from the new generation.
13211358
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
13221359
// Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep
@@ -1333,7 +1370,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
13331370
}));
13341371
}
13351372
});
1336-
})), listener::onFailure);
1373+
}, listener::onFailure);
13371374
}
13381375

13391376
private RepositoryMetaData getRepoMetaData(ClusterState state) {

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -371,9 +371,12 @@ public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repo
371371
} else {
372372
try {
373373
final Repository repository = repositoriesService.repository(repositoryName);
374-
hasOldFormatSnapshots = snapshotIds.stream().map(repository::getSnapshotInfo).anyMatch(
375-
snapshotInfo -> (excluded == null || snapshotInfo.snapshotId().equals(excluded) == false)
376-
&& snapshotInfo.version().before(SHARD_GEN_IN_REPO_DATA_VERSION));
374+
hasOldFormatSnapshots = snapshotIds.stream().filter(snapshotId -> snapshotId.equals(excluded) == false).anyMatch(
375+
snapshotId -> {
376+
final Version known = repositoryData.getVersion(snapshotId);
377+
return (known == null ? repository.getSnapshotInfo(snapshotId).version() : known)
378+
.before(SHARD_GEN_IN_REPO_DATA_VERSION);
379+
});
377380
} catch (SnapshotMissingException e) {
378381
logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e);
379382
return true;

0 commit comments

Comments
 (0)