Skip to content

Commit 97d5a75

Browse files
fcofdezDaveCTurner
andauthored
Respect generational files in recoveryDiff (#78707)
Today `MetadataSnapshot#recoveryDiff` considers the `.liv` file as per-commit rather than per-segment and often transfers them during peer recoveries and snapshot restores. It also considers differences in `.fnm`, `.dvd` and `.dvm` files as indicating a difference in the whole segment, even though these files may be adjusted without changing the segment itself. This commit adjusts this logic to attach these generational files to the segments themselves, allowing Elasticsearch only to transfer them if they are genuinely needed. Closes #55142 Backport of #77695 Co-authored-by: David Turner <[email protected]>
1 parent c11118a commit 97d5a75

File tree

9 files changed

+387
-134
lines changed

9 files changed

+387
-134
lines changed

server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java

+21-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.Objects;
2929
import java.util.stream.IntStream;
3030

31+
import static org.elasticsearch.index.store.StoreFileMetadata.UNAVAILABLE_WRITER_UUID;
32+
3133
/**
3234
* Shard snapshot metadata
3335
*/
@@ -37,6 +39,7 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
3739
* Information about snapshotted file
3840
*/
3941
public static class FileInfo implements Writeable {
42+
public static final String SERIALIZE_WRITER_UUID = "serialize_writer_uuid";
4043

4144
private final String name;
4245
private final ByteSizeValue partSize;
@@ -239,6 +242,7 @@ public boolean isSame(FileInfo fileInfo) {
239242
static final String PART_SIZE = "part_size";
240243
static final String WRITTEN_BY = "written_by";
241244
static final String META_HASH = "meta_hash";
245+
static final String WRITER_UUID = "writer_uuid";
242246

243247
/**
244248
* Serializes file info into JSON
@@ -261,10 +265,19 @@ public static void toXContent(FileInfo file, XContentBuilder builder, ToXContent
261265
builder.field(WRITTEN_BY, file.metadata.writtenBy());
262266
}
263267

264-
if (file.metadata.hash() != null && file.metadata().hash().length > 0) {
265-
BytesRef br = file.metadata.hash();
266-
builder.field(META_HASH, br.bytes, br.offset, br.length);
268+
final BytesRef hash = file.metadata.hash();
269+
if (hash != null && hash.length > 0) {
270+
builder.field(META_HASH, hash.bytes, hash.offset, hash.length);
271+
}
272+
273+
final BytesRef writerUuid = file.metadata.writerUuid();
274+
// We serialize by default when SERIALIZE_WRITER_UUID is not present since in deletes/clones
275+
// we read the serialized files from the blob store and we enforce the version invariants when
276+
// the snapshot was done
277+
if (writerUuid.length > 0 && params.paramAsBoolean(SERIALIZE_WRITER_UUID, true)) {
278+
builder.field(WRITER_UUID, writerUuid.bytes, writerUuid.offset, writerUuid.length);
267279
}
280+
268281
builder.endObject();
269282
}
270283

@@ -283,6 +296,7 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
283296
ByteSizeValue partSize = null;
284297
String writtenBy = null;
285298
BytesRef metaHash = new BytesRef();
299+
BytesRef writerUuid = UNAVAILABLE_WRITER_UUID;
286300
XContentParserUtils.ensureExpectedToken(token, XContentParser.Token.START_OBJECT, parser);
287301
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
288302
if (token == XContentParser.Token.FIELD_NAME) {
@@ -305,6 +319,9 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
305319
metaHash.bytes = parser.binaryValue();
306320
metaHash.offset = 0;
307321
metaHash.length = metaHash.bytes.length;
322+
} else if (WRITER_UUID.equals(currentFieldName)) {
323+
writerUuid = new BytesRef(parser.binaryValue());
324+
assert writerUuid.length > 0;
308325
} else {
309326
XContentParserUtils.throwUnknownField(currentFieldName, parser.getTokenLocation());
310327
}
@@ -328,7 +345,7 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
328345
} else if (checksum == null) {
329346
throw new ElasticsearchParseException("missing checksum for name [" + name + "]");
330347
}
331-
return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash), partSize);
348+
return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash, writerUuid), partSize);
332349
}
333350

334351
@Override

server/src/main/java/org/elasticsearch/index/store/Store.java

+98-67
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@
5252
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
5353
import org.elasticsearch.common.settings.Setting;
5454
import org.elasticsearch.common.settings.Setting.Property;
55-
import org.elasticsearch.core.TimeValue;
5655
import org.elasticsearch.core.AbstractRefCounted;
5756
import org.elasticsearch.core.RefCounted;
58-
import org.elasticsearch.common.util.iterable.Iterables;
57+
import org.elasticsearch.core.TimeValue;
58+
import org.elasticsearch.core.Tuple;
5959
import org.elasticsearch.core.internal.io.IOUtils;
6060
import org.elasticsearch.env.NodeEnvironment;
6161
import org.elasticsearch.env.ShardLock;
@@ -83,13 +83,14 @@
8383
import java.util.Iterator;
8484
import java.util.List;
8585
import java.util.Map;
86-
import java.util.Set;
8786
import java.util.Optional;
87+
import java.util.Set;
8888
import java.util.concurrent.TimeUnit;
8989
import java.util.concurrent.atomic.AtomicBoolean;
9090
import java.util.concurrent.locks.ReentrantReadWriteLock;
9191
import java.util.function.Consumer;
9292
import java.util.function.LongUnaryOperator;
93+
import java.util.function.Predicate;
9394
import java.util.zip.CRC32;
9495
import java.util.zip.Checksum;
9596

@@ -850,16 +851,22 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
850851
if (version.onOrAfter(maxVersion)) {
851852
maxVersion = version;
852853
}
854+
855+
final BytesRef segmentInfoId = StoreFileMetadata.toWriterUuid(info.info.getId());
856+
final BytesRef segmentCommitInfoId = StoreFileMetadata.toWriterUuid(info.getId());
857+
853858
for (String file : info.files()) {
854859
checksumFromLuceneFile(directory, file, builder, logger, version.toString(),
855-
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)));
860+
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)),
861+
IndexFileNames.parseGeneration(file) == 0 ? segmentInfoId : segmentCommitInfoId);
856862
}
857863
}
858864
if (maxVersion == null) {
859865
maxVersion = org.elasticsearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
860866
}
861867
final String segmentsFile = segmentCommitInfos.getSegmentsFileName();
862-
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion.toString(), true);
868+
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion.toString(), true,
869+
StoreFileMetadata.toWriterUuid(segmentCommitInfos.getId()));
863870
} catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
864871
// we either know the index is corrupted or it's just not there
865872
throw ex;
@@ -885,7 +892,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
885892
}
886893

887894
private static void checksumFromLuceneFile(Directory directory, String file, Map<String, StoreFileMetadata> builder,
888-
Logger logger, String version, boolean readFileAsHash) throws IOException {
895+
Logger logger, String version, boolean readFileAsHash, BytesRef writerUuid) throws IOException {
889896
final String checksum;
890897
final BytesRefBuilder fileHash = new BytesRefBuilder();
891898
try (IndexInput in = directory.openInput(file, READONCE_CHECKSUM)) {
@@ -910,7 +917,7 @@ private static void checksumFromLuceneFile(Directory directory, String file, Map
910917
logger.debug(() -> new ParameterizedMessage("Can retrieve checksum from file [{}]", file), ex);
911918
throw ex;
912919
}
913-
builder.put(file, new StoreFileMetadata(file, length, checksum, version, fileHash.get()));
920+
builder.put(file, new StoreFileMetadata(file, length, checksum, version, fileHash.get(), writerUuid));
914921
}
915922
}
916923

@@ -939,8 +946,6 @@ public Map<String, StoreFileMetadata> asMap() {
939946
return metadata;
940947
}
941948

942-
private static final String DEL_FILE_EXTENSION = "del"; // legacy delete file
943-
private static final String LIV_FILE_EXTENSION = "liv"; // lucene 5 delete file
944949
private static final String SEGMENT_INFO_EXTENSION = "si";
945950

946951
/**
@@ -951,80 +956,107 @@ public Map<String, StoreFileMetadata> asMap() {
951956
* <li>different: they exist in both snapshots but their they are not identical</li>
952957
* <li>missing: files that exist in the source but not in the target</li>
953958
* </ul>
954-
* This method groups file into per-segment files and per-commit files. A file is treated as
955-
* identical if and on if all files in it's group are identical. On a per-segment level files for a segment are treated
956-
* as identical iff:
957-
* <ul>
958-
* <li>all files in this segment have the same checksum</li>
959-
* <li>all files in this segment have the same length</li>
960-
* <li>the segments {@code .si} files hashes are byte-identical Note: This is a using a perfect hash function,
961-
* The metadata transfers the {@code .si} file content as it's hash</li>
962-
* </ul>
963959
* <p>
964-
* The {@code .si} file contains a lot of diagnostics including a timestamp etc. in the future there might be
965-
* unique segment identifiers in there hardening this method further.
960+
* Individual files are compared by name, length, checksum and (if present) a UUID that was assigned when the file was originally
961+
* written. The segment info ({@code *.si}) files and the segments file ({@code segments_N}) are also checked to be a byte-for-byte
962+
* match.
966963
* <p>
967-
* The per-commit files handles very similar. A commit is composed of the {@code segments_N} files as well as generational files
968-
* like deletes ({@code _x_y.del}) or field-info ({@code _x_y.fnm}) files. On a per-commit level files for a commit are treated
969-
* as identical iff:
970-
* <ul>
971-
* <li>all files belonging to this commit have the same checksum</li>
972-
* <li>all files belonging to this commit have the same length</li>
973-
* <li>the segments file {@code segments_N} files hashes are byte-identical Note: This is a using a perfect hash function,
974-
* The metadata transfers the {@code segments_N} file content as it's hash</li>
975-
* </ul>
964+
* Files are collected together into a group for each segment plus one group of "per-commit" ({@code segments_N}) files. Each
965+
* per-segment group is subdivided into a nongenerational group (most of them) and a generational group (e.g. {@code *.liv},
966+
* {@code *.fnm}, {@code *.dvm}, {@code *.dvd} that have been updated by subsequent commits).
976967
* <p>
977-
* NOTE: this diff will not contain the {@code segments.gen} file. This file is omitted on recovery.
968+
* For each segment, if any nongenerational files are different then the whole segment is considered to be different and will be
969+
* recovered in full. If all the nongenerational files are the same but any generational files are different then all the
970+
* generational files are considered to be different and will be recovered in full, but the nongenerational files are left alone.
971+
* Finally, if any file is different then all the per-commit files are recovered too.
978972
*/
979-
public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
973+
public RecoveryDiff recoveryDiff(final MetadataSnapshot targetSnapshot) {
974+
final List<StoreFileMetadata> perCommitSourceFiles = new ArrayList<>();
975+
final Map<String, Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>>> perSegmentSourceFiles = new HashMap<>();
976+
// per segment, a tuple of <<non-generational files, generational files>>
977+
978+
for (StoreFileMetadata sourceFile : this) {
979+
if (sourceFile.name().startsWith("_")) {
980+
final String segmentId = IndexFileNames.parseSegmentName(sourceFile.name());
981+
final boolean isGenerationalFile = IndexFileNames.parseGeneration(sourceFile.name()) > 0L;
982+
final Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>> perSegmentTuple = perSegmentSourceFiles
983+
.computeIfAbsent(segmentId, k -> Tuple.tuple(new ArrayList<>(), new ArrayList<>()));
984+
(isGenerationalFile ? perSegmentTuple.v2() : perSegmentTuple.v1()).add(sourceFile);
985+
} else {
986+
assert sourceFile.name().startsWith(IndexFileNames.SEGMENTS + "_") : "unexpected " + sourceFile;
987+
perCommitSourceFiles.add(sourceFile);
988+
}
989+
}
990+
980991
final List<StoreFileMetadata> identical = new ArrayList<>();
981992
final List<StoreFileMetadata> different = new ArrayList<>();
982993
final List<StoreFileMetadata> missing = new ArrayList<>();
983-
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
984-
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();
985994

986-
for (StoreFileMetadata meta : this) {
987-
if (IndexFileNames.OLD_SEGMENTS_GEN.equals(meta.name())) { // legacy
988-
continue; // we don't need that file at all
995+
final List<StoreFileMetadata> tmpIdentical = new ArrayList<>(); // confirm whole group is identical before adding to 'identical'
996+
final Predicate<List<StoreFileMetadata>> groupComparer = sourceGroup -> {
997+
assert tmpIdentical.isEmpty() : "not cleaned up: " + tmpIdentical;
998+
boolean groupIdentical = true;
999+
for (StoreFileMetadata sourceFile : sourceGroup) {
1000+
final StoreFileMetadata targetFile = targetSnapshot.get(sourceFile.name());
1001+
if (targetFile == null) {
1002+
groupIdentical = false;
1003+
missing.add(sourceFile);
1004+
} else if (groupIdentical && targetFile.isSame(sourceFile)) {
1005+
tmpIdentical.add(sourceFile);
1006+
} else {
1007+
groupIdentical = false;
1008+
different.add(sourceFile);
1009+
}
9891010
}
990-
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
991-
final String extension = IndexFileNames.getExtension(meta.name());
992-
if (IndexFileNames.SEGMENTS.equals(segmentId) ||
993-
DEL_FILE_EXTENSION.equals(extension) || LIV_FILE_EXTENSION.equals(extension)) {
994-
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
995-
perCommitStoreFiles.add(meta);
1011+
if (groupIdentical) {
1012+
identical.addAll(tmpIdentical);
9961013
} else {
997-
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
1014+
different.addAll(tmpIdentical);
9981015
}
999-
}
1000-
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
1001-
for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
1002-
identicalFiles.clear();
1003-
boolean consistent = true;
1004-
for (StoreFileMetadata meta : segmentFiles) {
1005-
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
1006-
if (storeFileMetadata == null) {
1007-
consistent = false;
1008-
missing.add(meta);
1009-
} else if (storeFileMetadata.isSame(meta) == false) {
1010-
consistent = false;
1011-
different.add(meta);
1016+
tmpIdentical.clear();
1017+
return groupIdentical;
1018+
};
1019+
final Consumer<List<StoreFileMetadata>> allDifferent = sourceGroup -> {
1020+
for (StoreFileMetadata sourceFile : sourceGroup) {
1021+
final StoreFileMetadata targetFile = targetSnapshot.get(sourceFile.name());
1022+
if (targetFile == null) {
1023+
missing.add(sourceFile);
10121024
} else {
1013-
identicalFiles.add(meta);
1025+
different.add(sourceFile);
10141026
}
10151027
}
1016-
if (consistent) {
1017-
identical.addAll(identicalFiles);
1028+
};
1029+
1030+
boolean segmentsIdentical = true;
1031+
1032+
for (Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>> segmentFiles : perSegmentSourceFiles.values()) {
1033+
final List<StoreFileMetadata> nonGenerationalFiles = segmentFiles.v1();
1034+
final List<StoreFileMetadata> generationalFiles = segmentFiles.v2();
1035+
1036+
if (groupComparer.test(nonGenerationalFiles)) {
1037+
// non-generational files are identical, now check the generational files
1038+
segmentsIdentical = groupComparer.test(generationalFiles) && segmentsIdentical;
10181039
} else {
1019-
// make sure all files are added - this can happen if only the deletes are different
1020-
different.addAll(identicalFiles);
1040+
// non-generational files were different, so consider the whole segment as different
1041+
segmentsIdentical = false;
1042+
allDifferent.accept(generationalFiles);
10211043
}
10221044
}
1023-
RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical),
1024-
Collections.unmodifiableList(different), Collections.unmodifiableList(missing));
1025-
assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1 : 0)
1026-
: "some files are missing recoveryDiff size: [" + recoveryDiff.size() + "] metadata size: [" +
1027-
this.metadata.size() + "] contains segments.gen: [" + metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) + "]";
1045+
1046+
if (segmentsIdentical) {
1047+
// segments were the same, check the per-commit files
1048+
groupComparer.test(perCommitSourceFiles);
1049+
} else {
1050+
// at least one segment was different, so treat all the per-commit files as different too
1051+
allDifferent.accept(perCommitSourceFiles);
1052+
}
1053+
1054+
final RecoveryDiff recoveryDiff = new RecoveryDiff(
1055+
Collections.unmodifiableList(identical),
1056+
Collections.unmodifiableList(different),
1057+
Collections.unmodifiableList(missing));
1058+
assert recoveryDiff.size() == metadata.size() : "some files are missing: recoveryDiff is [" + recoveryDiff
1059+
+ "] comparing: [" + metadata + "] to [" + targetSnapshot.metadata + "]";
10281060
return recoveryDiff;
10291061
}
10301062

@@ -1128,7 +1160,6 @@ public String toString() {
11281160
}
11291161
}
11301162

1131-
11321163
/**
11331164
* Returns true if the file is auto-generated by the store and shouldn't be deleted during cleanup.
11341165
* This includes write lock files

0 commit comments

Comments
 (0)