Skip to content

Commit c829079

Browse files
authored
Only link fd* files during source-only snapshot (#53463)
Source-only snapshots currently create a second full source-only copy of the shard on disk to support incrementality during upload. Given that stored fields are occupying a substantial part of a shard's storage, this means that clusters with source-only snapshots can require up to 50% more local storage. Ideally we would only generate source-only parts of the shard for the things that need to be uploaded (i.e. do incrementality checks on original file instead of trimmed-down source-only versions), but that requires much bigger changes to the snapshot infrastructure. This here is an attempt to dramatically cut down on the storage used by the source-only copy of the shard by soft-linking the stored-fields files (fd*) instead of copying them. Relates #50231
1 parent 108f9ca commit c829079

File tree

3 files changed

+281
-78
lines changed

3 files changed

+281
-78
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java

+238-55
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,34 @@
3030
import org.apache.lucene.search.Scorer;
3131
import org.apache.lucene.search.Weight;
3232
import org.apache.lucene.store.Directory;
33+
import org.apache.lucene.store.FilterDirectory;
3334
import org.apache.lucene.store.IOContext;
35+
import org.apache.lucene.store.IndexInput;
3436
import org.apache.lucene.store.IndexOutput;
3537
import org.apache.lucene.store.Lock;
3638
import org.apache.lucene.store.TrackingDirectoryWrapper;
3739
import org.apache.lucene.util.Bits;
3840
import org.apache.lucene.util.BytesRef;
3941
import org.apache.lucene.util.FixedBitSet;
42+
import org.elasticsearch.common.Strings;
4043
import org.elasticsearch.common.lucene.Lucene;
4144
import org.elasticsearch.core.internal.io.IOUtils;
4245

4346
import java.io.ByteArrayOutputStream;
47+
import java.io.FileNotFoundException;
4448
import java.io.IOException;
4549
import java.io.PrintStream;
50+
import java.nio.file.NoSuchFileException;
4651
import java.util.ArrayList;
4752
import java.util.Arrays;
53+
import java.util.Collection;
4854
import java.util.Collections;
4955
import java.util.HashMap;
56+
import java.util.HashSet;
5057
import java.util.List;
5158
import java.util.Map;
59+
import java.util.Set;
60+
import java.util.concurrent.atomic.AtomicInteger;
5261
import java.util.function.Supplier;
5362

5463
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.FIELDS_EXTENSION;
@@ -60,23 +69,23 @@ public class SourceOnlySnapshot {
6069

6170
private static final String FIELDS_INDEX_EXTENSION = INDEX_EXTENSION_PREFIX + FIELDS_INDEX_EXTENSION_SUFFIX;
6271
private static final String FIELDS_META_EXTENSION = INDEX_EXTENSION_PREFIX + FIELDS_META_EXTENSION_SUFFIX;
63-
private final Directory targetDirectory;
72+
private final LinkedFilesDirectory targetDirectory;
6473
private final Supplier<Query> deleteByQuerySupplier;
6574

66-
public SourceOnlySnapshot(Directory targetDirectory, Supplier<Query> deleteByQuerySupplier) {
75+
public SourceOnlySnapshot(LinkedFilesDirectory targetDirectory, Supplier<Query> deleteByQuerySupplier) {
6776
this.targetDirectory = targetDirectory;
6877
this.deleteByQuerySupplier = deleteByQuerySupplier;
6978
}
7079

71-
public SourceOnlySnapshot(Directory targetDirectory) {
80+
public SourceOnlySnapshot(LinkedFilesDirectory targetDirectory) {
7281
this(targetDirectory, null);
7382
}
7483

7584
public synchronized List<String> syncSnapshot(IndexCommit commit) throws IOException {
7685
long generation;
7786
Map<BytesRef, SegmentCommitInfo> existingSegments = new HashMap<>();
78-
if (Lucene.indexExists(targetDirectory)) {
79-
SegmentInfos existingsSegmentInfos = Lucene.readSegmentInfos(targetDirectory);
87+
if (Lucene.indexExists(targetDirectory.getWrapped())) {
88+
SegmentInfos existingsSegmentInfos = Lucene.readSegmentInfos(targetDirectory.getWrapped());
8089
for (SegmentCommitInfo info : existingsSegmentInfos) {
8190
existingSegments.put(new BytesRef(info.info.getId()), info);
8291
}
@@ -191,63 +200,78 @@ DirectoryReader wrapReader(DirectoryReader reader) throws IOException {
191200

192201
private SegmentCommitInfo syncSegment(SegmentCommitInfo segmentCommitInfo, LiveDocs liveDocs, FieldInfos fieldInfos,
193202
Map<BytesRef, SegmentCommitInfo> existingSegments, List<String> createdFiles) throws IOException {
194-
SegmentInfo si = segmentCommitInfo.info;
195-
Codec codec = si.getCodec();
196-
final String segmentSuffix = "";
197-
SegmentCommitInfo newInfo;
198-
final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(targetDirectory);
199-
BytesRef segmentId = new BytesRef(si.getId());
200-
boolean exists = existingSegments.containsKey(segmentId);
201-
if (exists == false) {
202-
SegmentInfo newSegmentInfo = new SegmentInfo(si.dir, si.getVersion(), si.getMinVersion(), si.name, si.maxDoc(), false,
203-
si.getCodec(), si.getDiagnostics(), si.getId(), si.getAttributes(), null);
204-
// we drop the sort on purpose since the field we sorted on doesn't exist in the target index anymore.
205-
newInfo = new SegmentCommitInfo(newSegmentInfo, 0, 0, -1, -1, -1);
206-
List<FieldInfo> fieldInfoCopy = new ArrayList<>(fieldInfos.size());
207-
for (FieldInfo fieldInfo : fieldInfos) {
208-
fieldInfoCopy.add(new FieldInfo(fieldInfo.name, fieldInfo.number,
209-
false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, fieldInfo.attributes(), 0, 0, 0,
210-
fieldInfo.isSoftDeletesField()));
211-
}
212-
FieldInfos newFieldInfos = new FieldInfos(fieldInfoCopy.toArray(new FieldInfo[0]));
213-
codec.fieldInfosFormat().write(trackingDir, newSegmentInfo, segmentSuffix, newFieldInfos, IOContext.DEFAULT);
214-
newInfo.setFieldInfosFiles(trackingDir.getCreatedFiles());
215-
String idxFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_INDEX_EXTENSION);
216-
String dataFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_EXTENSION);
217-
String metaFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_META_EXTENSION);
218-
Directory sourceDir = newSegmentInfo.dir;
203+
Directory toClose = null;
204+
try {
205+
SegmentInfo si = segmentCommitInfo.info;
206+
Codec codec = si.getCodec();
207+
Directory sourceDir = si.dir;
219208
if (si.getUseCompoundFile()) {
220-
sourceDir = codec.compoundFormat().getCompoundReader(sourceDir, si, IOContext.DEFAULT);
209+
sourceDir = new LinkedFilesDirectory.CloseMePleaseWrapper(
210+
codec.compoundFormat().getCompoundReader(sourceDir, si, IOContext.DEFAULT));
211+
toClose = sourceDir;
212+
}
213+
final String segmentSuffix = "";
214+
SegmentCommitInfo newInfo;
215+
final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(targetDirectory);
216+
BytesRef segmentId = new BytesRef(si.getId());
217+
boolean exists = existingSegments.containsKey(segmentId);
218+
if (exists == false) {
219+
SegmentInfo newSegmentInfo = new SegmentInfo(targetDirectory, si.getVersion(), si.getMinVersion(), si.name, si.maxDoc(),
220+
false, si.getCodec(), si.getDiagnostics(), si.getId(), si.getAttributes(), null);
221+
// we drop the sort on purpose since the field we sorted on doesn't exist in the target index anymore.
222+
newInfo = new SegmentCommitInfo(newSegmentInfo, 0, 0, -1, -1, -1);
223+
List<FieldInfo> fieldInfoCopy = new ArrayList<>(fieldInfos.size());
224+
for (FieldInfo fieldInfo : fieldInfos) {
225+
fieldInfoCopy.add(new FieldInfo(fieldInfo.name, fieldInfo.number,
226+
false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, fieldInfo.attributes(), 0, 0, 0,
227+
fieldInfo.isSoftDeletesField()));
228+
}
229+
FieldInfos newFieldInfos = new FieldInfos(fieldInfoCopy.toArray(new FieldInfo[0]));
230+
codec.fieldInfosFormat().write(trackingDir, newSegmentInfo, segmentSuffix, newFieldInfos, IOContext.DEFAULT);
231+
newInfo.setFieldInfosFiles(trackingDir.getCreatedFiles());
232+
} else {
233+
newInfo = existingSegments.get(segmentId);
234+
assert newInfo.info.getUseCompoundFile() == false;
221235
}
236+
237+
// link files for stored fields to target directory
238+
final String idxFile = IndexFileNames.segmentFileName(newInfo.info.name, segmentSuffix, FIELDS_INDEX_EXTENSION);
239+
final String dataFile = IndexFileNames.segmentFileName(newInfo.info.name, segmentSuffix, FIELDS_EXTENSION);
240+
final String metaFile = IndexFileNames.segmentFileName(newInfo.info.name, segmentSuffix, FIELDS_META_EXTENSION);
222241
trackingDir.copyFrom(sourceDir, idxFile, idxFile, IOContext.DEFAULT);
242+
assert targetDirectory.linkedFiles.containsKey(idxFile);
243+
assert trackingDir.getCreatedFiles().contains(idxFile);
223244
trackingDir.copyFrom(sourceDir, dataFile, dataFile, IOContext.DEFAULT);
224-
if (Arrays.asList(sourceDir.listAll()).contains(metaFile)) { // only exists for Lucene 8.5+ indices
225-
trackingDir.copyFrom(sourceDir, metaFile, metaFile, IOContext.DEFAULT);
226-
}
227-
if (sourceDir != newSegmentInfo.dir) {
228-
sourceDir.close();
245+
assert targetDirectory.linkedFiles.containsKey(dataFile);
246+
assert trackingDir.getCreatedFiles().contains(dataFile);
247+
if (Arrays.asList(sourceDir.listAll()).contains(metaFile)) { // only exists for Lucene 8.5+ indices
248+
trackingDir.copyFrom(sourceDir, metaFile, metaFile, IOContext.DEFAULT);
249+
assert targetDirectory.linkedFiles.containsKey(metaFile);
250+
assert trackingDir.getCreatedFiles().contains(metaFile);
229251
}
230-
} else {
231-
newInfo = existingSegments.get(segmentId);
232-
assert newInfo.info.getUseCompoundFile() == false;
233-
}
234-
if (liveDocs.bits != null && liveDocs.numDeletes != 0 && liveDocs.numDeletes != newInfo.getDelCount()) {
235-
if (newInfo.getDelCount() != 0) {
236-
assert assertLiveDocs(liveDocs.bits, liveDocs.numDeletes);
252+
253+
if (liveDocs.bits != null && liveDocs.numDeletes != 0 && liveDocs.numDeletes != newInfo.getDelCount()) {
254+
assert newInfo.getDelCount() == 0 || assertLiveDocs(liveDocs.bits, liveDocs.numDeletes);
255+
codec.liveDocsFormat().writeLiveDocs(liveDocs.bits, trackingDir, newInfo, liveDocs.numDeletes - newInfo.getDelCount(),
256+
IOContext.DEFAULT);
257+
SegmentCommitInfo info = new SegmentCommitInfo(newInfo.info, liveDocs.numDeletes, 0, newInfo.getNextDelGen(), -1, -1);
258+
info.setFieldInfosFiles(newInfo.getFieldInfosFiles());
259+
info.info.setFiles(trackingDir.getCreatedFiles());
260+
newInfo = info;
237261
}
238-
codec.liveDocsFormat().writeLiveDocs(liveDocs.bits, trackingDir, newInfo, liveDocs.numDeletes - newInfo.getDelCount(),
239-
IOContext.DEFAULT);
240-
SegmentCommitInfo info = new SegmentCommitInfo(newInfo.info, liveDocs.numDeletes, 0, newInfo.getNextDelGen(), -1, -1);
241-
info.setFieldInfosFiles(newInfo.getFieldInfosFiles());
242-
info.info.setFiles(trackingDir.getCreatedFiles());
243-
newInfo = info;
244-
}
245-
if (exists == false) {
246-
newInfo.info.setFiles(trackingDir.getCreatedFiles());
247-
codec.segmentInfoFormat().write(trackingDir, newInfo.info, IOContext.DEFAULT);
262+
if (exists == false) {
263+
newInfo.info.setFiles(trackingDir.getCreatedFiles());
264+
codec.segmentInfoFormat().write(trackingDir, newInfo.info, IOContext.DEFAULT);
265+
}
266+
final Set<String> createdFilesForThisSegment = trackingDir.getCreatedFiles();
267+
createdFilesForThisSegment.remove(idxFile);
268+
createdFilesForThisSegment.remove(dataFile);
269+
createdFilesForThisSegment.remove(metaFile);
270+
createdFiles.addAll(createdFilesForThisSegment);
271+
return newInfo;
272+
} finally {
273+
IOUtils.close(toClose);
248274
}
249-
createdFiles.addAll(trackingDir.getCreatedFiles());
250-
return newInfo;
251275
}
252276

253277
private boolean assertLiveDocs(Bits liveDocs, int deletes) {
@@ -270,4 +294,163 @@ private static class LiveDocs {
270294
this.bits = bits;
271295
}
272296
}
297+
298+
public static class LinkedFilesDirectory extends Directory {
299+
300+
private final Directory wrapped;
301+
private final Map<String, Directory> linkedFiles = new HashMap<>();
302+
303+
public LinkedFilesDirectory(Directory wrapped) {
304+
this.wrapped = wrapped;
305+
}
306+
307+
public Directory getWrapped() {
308+
return wrapped;
309+
}
310+
311+
@Override
312+
public String[] listAll() throws IOException {
313+
Set<String> files = new HashSet<>();
314+
Collections.addAll(files, wrapped.listAll());
315+
files.addAll(linkedFiles.keySet());
316+
String[] result = files.toArray(Strings.EMPTY_ARRAY);
317+
Arrays.sort(result);
318+
return result;
319+
}
320+
321+
@Override
322+
public void deleteFile(String name) throws IOException {
323+
final Directory directory = linkedFiles.remove(name);
324+
if (directory == null) {
325+
wrapped.deleteFile(name);
326+
} else {
327+
try (directory) {
328+
wrapped.deleteFile(name);
329+
} catch (NoSuchFileException | FileNotFoundException e) {
330+
// ignore
331+
}
332+
}
333+
}
334+
335+
@Override
336+
public long fileLength(String name) throws IOException {
337+
final Directory linkedDir = linkedFiles.get(name);
338+
if (linkedDir != null) {
339+
return linkedDir.fileLength(name);
340+
} else {
341+
return wrapped.fileLength(name);
342+
}
343+
}
344+
345+
@Override
346+
public IndexOutput createOutput(String name, IOContext context) throws IOException {
347+
if (linkedFiles.containsKey(name)) {
348+
throw new IllegalArgumentException("file cannot be created as linked file with name " + name + " already exists");
349+
} else {
350+
return wrapped.createOutput(name, context);
351+
}
352+
}
353+
354+
@Override
355+
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
356+
return wrapped.createTempOutput(prefix, suffix, context);
357+
}
358+
359+
@Override
360+
public void sync(Collection<String> names) throws IOException {
361+
final List<String> primaryNames = new ArrayList<>();
362+
363+
for (String name : names) {
364+
if (linkedFiles.containsKey(name) == false) {
365+
primaryNames.add(name);
366+
}
367+
}
368+
369+
if (primaryNames.isEmpty() == false) {
370+
wrapped.sync(primaryNames);
371+
}
372+
}
373+
374+
@Override
375+
public void syncMetaData() throws IOException {
376+
wrapped.syncMetaData();
377+
}
378+
379+
@Override
380+
public void rename(String source, String dest) throws IOException {
381+
if (linkedFiles.containsKey(source) || linkedFiles.containsKey(dest)) {
382+
throw new IllegalArgumentException("file cannot be renamed as linked file with name " + source + " or " + dest +
383+
" already exists");
384+
} else {
385+
wrapped.rename(source, dest);
386+
}
387+
}
388+
389+
@Override
390+
public IndexInput openInput(String name, IOContext context) throws IOException {
391+
final Directory linkedDir = linkedFiles.get(name);
392+
if (linkedDir != null) {
393+
return linkedDir.openInput(name, context);
394+
} else {
395+
return wrapped.openInput(name, context);
396+
}
397+
}
398+
399+
@Override
400+
public Lock obtainLock(String name) throws IOException {
401+
return wrapped.obtainLock(name);
402+
}
403+
404+
@Override
405+
public void close() throws IOException {
406+
IOUtils.close(() -> IOUtils.close(linkedFiles.values()), linkedFiles::clear, wrapped);
407+
}
408+
409+
@Override
410+
public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
411+
if (src.equals(dest) == false) {
412+
throw new IllegalArgumentException();
413+
} else {
414+
final Directory previous;
415+
if (from instanceof CloseMePleaseWrapper) {
416+
((CloseMePleaseWrapper) from).incRef();
417+
previous = linkedFiles.put(src, from);
418+
} else {
419+
previous = linkedFiles.put(src, new FilterDirectory(from) {
420+
@Override
421+
public void close() {
422+
// ignore
423+
}
424+
});
425+
}
426+
IOUtils.close(previous);
427+
}
428+
}
429+
430+
static class CloseMePleaseWrapper extends FilterDirectory {
431+
432+
private final AtomicInteger refCount = new AtomicInteger(1);
433+
434+
CloseMePleaseWrapper(Directory in) {
435+
super(in);
436+
}
437+
438+
public void incRef() {
439+
int ref = refCount.incrementAndGet();
440+
assert ref > 1;
441+
}
442+
443+
@Override
444+
public void close() throws IOException {
445+
if (refCount.decrementAndGet() == 0) {
446+
in.close();
447+
}
448+
}
449+
}
450+
451+
@Override
452+
public Set<String> getPendingDeletions() throws IOException {
453+
return wrapped.getPendingDeletions();
454+
}
455+
}
273456
}

x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -138,17 +138,18 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
138138
Path snapPath = dataPath.resolve(SNAPSHOT_DIR_NAME);
139139
final List<Closeable> toClose = new ArrayList<>(3);
140140
try {
141-
FSDirectory directory = new SimpleFSDirectory(snapPath);
142-
toClose.add(directory);
143-
Store tempStore = new Store(store.shardId(), store.indexSettings(), directory, new ShardLock(store.shardId()) {
141+
SourceOnlySnapshot.LinkedFilesDirectory overlayDir = new SourceOnlySnapshot.LinkedFilesDirectory(
142+
new SimpleFSDirectory(snapPath));
143+
toClose.add(overlayDir);
144+
Store tempStore = new Store(store.shardId(), store.indexSettings(), overlayDir, new ShardLock(store.shardId()) {
144145
@Override
145146
protected void closeInternal() {
146147
// do nothing;
147148
}
148149
}, Store.OnClose.EMPTY);
149150
Supplier<Query> querySupplier = mapperService.hasNested() ? Queries::newNestedFilter : null;
150151
// SourceOnlySnapshot will take care of soft- and hard-deletes no special casing needed here
151-
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(tempStore.directory(), querySupplier);
152+
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(overlayDir, querySupplier);
152153
snapshot.syncSnapshot(snapshotIndexCommit);
153154
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
154155
SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo();

0 commit comments

Comments
 (0)