Skip to content

Only link fd* files during source-only snapshot #53463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,34 @@
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.FIELDS_EXTENSION;
Expand All @@ -60,23 +69,23 @@ public class SourceOnlySnapshot {

private static final String FIELDS_INDEX_EXTENSION = INDEX_EXTENSION_PREFIX + FIELDS_INDEX_EXTENSION_SUFFIX;
private static final String FIELDS_META_EXTENSION = INDEX_EXTENSION_PREFIX + FIELDS_META_EXTENSION_SUFFIX;
private final Directory targetDirectory;
private final LinkedFilesDirectory targetDirectory;
private final Supplier<Query> deleteByQuerySupplier;

public SourceOnlySnapshot(Directory targetDirectory, Supplier<Query> deleteByQuerySupplier) {
public SourceOnlySnapshot(LinkedFilesDirectory targetDirectory, Supplier<Query> deleteByQuerySupplier) {
this.targetDirectory = targetDirectory;
this.deleteByQuerySupplier = deleteByQuerySupplier;
}

public SourceOnlySnapshot(Directory targetDirectory) {
public SourceOnlySnapshot(LinkedFilesDirectory targetDirectory) {
this(targetDirectory, null);
}

public synchronized List<String> syncSnapshot(IndexCommit commit) throws IOException {
long generation;
Map<BytesRef, SegmentCommitInfo> existingSegments = new HashMap<>();
if (Lucene.indexExists(targetDirectory)) {
SegmentInfos existingsSegmentInfos = Lucene.readSegmentInfos(targetDirectory);
if (Lucene.indexExists(targetDirectory.getWrapped())) {
SegmentInfos existingsSegmentInfos = Lucene.readSegmentInfos(targetDirectory.getWrapped());
for (SegmentCommitInfo info : existingsSegmentInfos) {
existingSegments.put(new BytesRef(info.info.getId()), info);
}
Expand Down Expand Up @@ -191,63 +200,78 @@ DirectoryReader wrapReader(DirectoryReader reader) throws IOException {

private SegmentCommitInfo syncSegment(SegmentCommitInfo segmentCommitInfo, LiveDocs liveDocs, FieldInfos fieldInfos,
Map<BytesRef, SegmentCommitInfo> existingSegments, List<String> createdFiles) throws IOException {
SegmentInfo si = segmentCommitInfo.info;
Codec codec = si.getCodec();
final String segmentSuffix = "";
SegmentCommitInfo newInfo;
final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(targetDirectory);
BytesRef segmentId = new BytesRef(si.getId());
boolean exists = existingSegments.containsKey(segmentId);
if (exists == false) {
SegmentInfo newSegmentInfo = new SegmentInfo(si.dir, si.getVersion(), si.getMinVersion(), si.name, si.maxDoc(), false,
si.getCodec(), si.getDiagnostics(), si.getId(), si.getAttributes(), null);
// we drop the sort on purpose since the field we sorted on doesn't exist in the target index anymore.
newInfo = new SegmentCommitInfo(newSegmentInfo, 0, 0, -1, -1, -1);
List<FieldInfo> fieldInfoCopy = new ArrayList<>(fieldInfos.size());
for (FieldInfo fieldInfo : fieldInfos) {
fieldInfoCopy.add(new FieldInfo(fieldInfo.name, fieldInfo.number,
false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, fieldInfo.attributes(), 0, 0, 0,
fieldInfo.isSoftDeletesField()));
}
FieldInfos newFieldInfos = new FieldInfos(fieldInfoCopy.toArray(new FieldInfo[0]));
codec.fieldInfosFormat().write(trackingDir, newSegmentInfo, segmentSuffix, newFieldInfos, IOContext.DEFAULT);
newInfo.setFieldInfosFiles(trackingDir.getCreatedFiles());
String idxFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_INDEX_EXTENSION);
String dataFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_EXTENSION);
String metaFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_META_EXTENSION);
Directory sourceDir = newSegmentInfo.dir;
Directory toClose = null;
try {
SegmentInfo si = segmentCommitInfo.info;
Codec codec = si.getCodec();
Directory sourceDir = si.dir;
if (si.getUseCompoundFile()) {
sourceDir = codec.compoundFormat().getCompoundReader(sourceDir, si, IOContext.DEFAULT);
sourceDir = new LinkedFilesDirectory.CloseMePleaseWrapper(
codec.compoundFormat().getCompoundReader(sourceDir, si, IOContext.DEFAULT));
toClose = sourceDir;
}
final String segmentSuffix = "";
SegmentCommitInfo newInfo;
final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(targetDirectory);
BytesRef segmentId = new BytesRef(si.getId());
boolean exists = existingSegments.containsKey(segmentId);
if (exists == false) {
SegmentInfo newSegmentInfo = new SegmentInfo(targetDirectory, si.getVersion(), si.getMinVersion(), si.name, si.maxDoc(),
false, si.getCodec(), si.getDiagnostics(), si.getId(), si.getAttributes(), null);
// we drop the sort on purpose since the field we sorted on doesn't exist in the target index anymore.
newInfo = new SegmentCommitInfo(newSegmentInfo, 0, 0, -1, -1, -1);
List<FieldInfo> fieldInfoCopy = new ArrayList<>(fieldInfos.size());
for (FieldInfo fieldInfo : fieldInfos) {
fieldInfoCopy.add(new FieldInfo(fieldInfo.name, fieldInfo.number,
false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, fieldInfo.attributes(), 0, 0, 0,
fieldInfo.isSoftDeletesField()));
}
FieldInfos newFieldInfos = new FieldInfos(fieldInfoCopy.toArray(new FieldInfo[0]));
codec.fieldInfosFormat().write(trackingDir, newSegmentInfo, segmentSuffix, newFieldInfos, IOContext.DEFAULT);
newInfo.setFieldInfosFiles(trackingDir.getCreatedFiles());
} else {
newInfo = existingSegments.get(segmentId);
assert newInfo.info.getUseCompoundFile() == false;
}

// link files for stored fields to target directory
final String idxFile = IndexFileNames.segmentFileName(newInfo.info.name, segmentSuffix, FIELDS_INDEX_EXTENSION);
final String dataFile = IndexFileNames.segmentFileName(newInfo.info.name, segmentSuffix, FIELDS_EXTENSION);
final String metaFile = IndexFileNames.segmentFileName(newInfo.info.name, segmentSuffix, FIELDS_META_EXTENSION);
trackingDir.copyFrom(sourceDir, idxFile, idxFile, IOContext.DEFAULT);
assert targetDirectory.linkedFiles.containsKey(idxFile);
assert trackingDir.getCreatedFiles().contains(idxFile);
trackingDir.copyFrom(sourceDir, dataFile, dataFile, IOContext.DEFAULT);
if (Arrays.asList(sourceDir.listAll()).contains(metaFile)) { // only exists for Lucene 8.5+ indices
trackingDir.copyFrom(sourceDir, metaFile, metaFile, IOContext.DEFAULT);
}
if (sourceDir != newSegmentInfo.dir) {
sourceDir.close();
assert targetDirectory.linkedFiles.containsKey(dataFile);
assert trackingDir.getCreatedFiles().contains(dataFile);
if (Arrays.asList(sourceDir.listAll()).contains(metaFile)) { // only exists for Lucene 8.5+ indices
trackingDir.copyFrom(sourceDir, metaFile, metaFile, IOContext.DEFAULT);
assert targetDirectory.linkedFiles.containsKey(metaFile);
assert trackingDir.getCreatedFiles().contains(metaFile);
}
} else {
newInfo = existingSegments.get(segmentId);
assert newInfo.info.getUseCompoundFile() == false;
}
if (liveDocs.bits != null && liveDocs.numDeletes != 0 && liveDocs.numDeletes != newInfo.getDelCount()) {
if (newInfo.getDelCount() != 0) {
assert assertLiveDocs(liveDocs.bits, liveDocs.numDeletes);

if (liveDocs.bits != null && liveDocs.numDeletes != 0 && liveDocs.numDeletes != newInfo.getDelCount()) {
assert newInfo.getDelCount() == 0 || assertLiveDocs(liveDocs.bits, liveDocs.numDeletes);
codec.liveDocsFormat().writeLiveDocs(liveDocs.bits, trackingDir, newInfo, liveDocs.numDeletes - newInfo.getDelCount(),
IOContext.DEFAULT);
SegmentCommitInfo info = new SegmentCommitInfo(newInfo.info, liveDocs.numDeletes, 0, newInfo.getNextDelGen(), -1, -1);
info.setFieldInfosFiles(newInfo.getFieldInfosFiles());
info.info.setFiles(trackingDir.getCreatedFiles());
newInfo = info;
}
codec.liveDocsFormat().writeLiveDocs(liveDocs.bits, trackingDir, newInfo, liveDocs.numDeletes - newInfo.getDelCount(),
IOContext.DEFAULT);
SegmentCommitInfo info = new SegmentCommitInfo(newInfo.info, liveDocs.numDeletes, 0, newInfo.getNextDelGen(), -1, -1);
info.setFieldInfosFiles(newInfo.getFieldInfosFiles());
info.info.setFiles(trackingDir.getCreatedFiles());
newInfo = info;
}
if (exists == false) {
newInfo.info.setFiles(trackingDir.getCreatedFiles());
codec.segmentInfoFormat().write(trackingDir, newInfo.info, IOContext.DEFAULT);
if (exists == false) {
newInfo.info.setFiles(trackingDir.getCreatedFiles());
codec.segmentInfoFormat().write(trackingDir, newInfo.info, IOContext.DEFAULT);
}
final Set<String> createdFilesForThisSegment = trackingDir.getCreatedFiles();
createdFilesForThisSegment.remove(idxFile);
createdFilesForThisSegment.remove(dataFile);
createdFilesForThisSegment.remove(metaFile);
createdFiles.addAll(createdFilesForThisSegment);
return newInfo;
} finally {
IOUtils.close(toClose);
}
createdFiles.addAll(trackingDir.getCreatedFiles());
return newInfo;
}

private boolean assertLiveDocs(Bits liveDocs, int deletes) {
Expand All @@ -270,4 +294,163 @@ private static class LiveDocs {
this.bits = bits;
}
}

public static class LinkedFilesDirectory extends Directory {

private final Directory wrapped;
private final Map<String, Directory> linkedFiles = new HashMap<>();

public LinkedFilesDirectory(Directory wrapped) {
this.wrapped = wrapped;
}

public Directory getWrapped() {
return wrapped;
}

@Override
public String[] listAll() throws IOException {
Set<String> files = new HashSet<>();
Collections.addAll(files, wrapped.listAll());
files.addAll(linkedFiles.keySet());
String[] result = files.toArray(Strings.EMPTY_ARRAY);
Arrays.sort(result);
return result;
}

@Override
public void deleteFile(String name) throws IOException {
final Directory directory = linkedFiles.remove(name);
if (directory == null) {
wrapped.deleteFile(name);
} else {
try (directory) {
wrapped.deleteFile(name);
} catch (NoSuchFileException | FileNotFoundException e) {
// ignore
}
}
}

@Override
public long fileLength(String name) throws IOException {
final Directory linkedDir = linkedFiles.get(name);
if (linkedDir != null) {
return linkedDir.fileLength(name);
} else {
return wrapped.fileLength(name);
}
}

@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
if (linkedFiles.containsKey(name)) {
throw new IllegalArgumentException("file cannot be created as linked file with name " + name + " already exists");
} else {
return wrapped.createOutput(name, context);
}
}

@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
return wrapped.createTempOutput(prefix, suffix, context);
}

@Override
public void sync(Collection<String> names) throws IOException {
final List<String> primaryNames = new ArrayList<>();

for (String name : names) {
if (linkedFiles.containsKey(name) == false) {
primaryNames.add(name);
}
}

if (primaryNames.isEmpty() == false) {
wrapped.sync(primaryNames);
}
}

@Override
public void syncMetaData() throws IOException {
wrapped.syncMetaData();
}

@Override
public void rename(String source, String dest) throws IOException {
if (linkedFiles.containsKey(source) || linkedFiles.containsKey(dest)) {
throw new IllegalArgumentException("file cannot be renamed as linked file with name " + source + " or " + dest +
" already exists");
} else {
wrapped.rename(source, dest);
}
}

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
final Directory linkedDir = linkedFiles.get(name);
if (linkedDir != null) {
return linkedDir.openInput(name, context);
} else {
return wrapped.openInput(name, context);
}
}

@Override
public Lock obtainLock(String name) throws IOException {
return wrapped.obtainLock(name);
}

@Override
public void close() throws IOException {
IOUtils.close(() -> IOUtils.close(linkedFiles.values()), linkedFiles::clear, wrapped);
}

@Override
public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
if (src.equals(dest) == false) {
throw new IllegalArgumentException();
} else {
final Directory previous;
if (from instanceof CloseMePleaseWrapper) {
((CloseMePleaseWrapper) from).incRef();
previous = linkedFiles.put(src, from);
} else {
previous = linkedFiles.put(src, new FilterDirectory(from) {
@Override
public void close() {
// ignore
}
});
}
IOUtils.close(previous);
}
}

static class CloseMePleaseWrapper extends FilterDirectory {

private final AtomicInteger refCount = new AtomicInteger(1);

CloseMePleaseWrapper(Directory in) {
super(in);
}

public void incRef() {
int ref = refCount.incrementAndGet();
assert ref > 1;
}

@Override
public void close() throws IOException {
if (refCount.decrementAndGet() == 0) {
in.close();
}
}
}

@Override
public Set<String> getPendingDeletions() throws IOException {
return wrapped.getPendingDeletions();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,18 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
Path snapPath = dataPath.resolve(SNAPSHOT_DIR_NAME);
final List<Closeable> toClose = new ArrayList<>(3);
try {
FSDirectory directory = new SimpleFSDirectory(snapPath);
toClose.add(directory);
Store tempStore = new Store(store.shardId(), store.indexSettings(), directory, new ShardLock(store.shardId()) {
SourceOnlySnapshot.LinkedFilesDirectory overlayDir = new SourceOnlySnapshot.LinkedFilesDirectory(
new SimpleFSDirectory(snapPath));
toClose.add(overlayDir);
Store tempStore = new Store(store.shardId(), store.indexSettings(), overlayDir, new ShardLock(store.shardId()) {
@Override
protected void closeInternal() {
// do nothing;
}
}, Store.OnClose.EMPTY);
Supplier<Query> querySupplier = mapperService.hasNested() ? Queries::newNestedFilter : null;
// SourceOnlySnapshot will take care of soft- and hard-deletes no special casing needed here
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(tempStore.directory(), querySupplier);
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(overlayDir, querySupplier);
snapshot.syncSnapshot(snapshotIndexCommit);
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo();
Expand Down
Loading