-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Uncouple CacheDirectory from SearchableSnapshotDirectory #53860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
tlrx
merged 3 commits into
elastic:feature/searchable-snapshots
from
tlrx:cache-directory-use-blob-container
Mar 20, 2020
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
105 changes: 105 additions & 0 deletions
105
...napshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.index.store; | ||
|
||
import org.apache.lucene.store.BaseDirectory; | ||
import org.apache.lucene.store.IOContext; | ||
import org.apache.lucene.store.IndexOutput; | ||
import org.apache.lucene.store.SingleInstanceLockFactory; | ||
import org.elasticsearch.common.blobstore.BlobContainer; | ||
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; | ||
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; | ||
|
||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.util.Collection; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
public abstract class BaseSearchableSnapshotDirectory extends BaseDirectory { | ||
|
||
protected final BlobStoreIndexShardSnapshot snapshot; | ||
protected final BlobContainer blobContainer; | ||
private final AtomicBoolean closed; | ||
|
||
public BaseSearchableSnapshotDirectory(BlobContainer blobContainer, BlobStoreIndexShardSnapshot snapshot) { | ||
super(new SingleInstanceLockFactory()); | ||
this.snapshot = Objects.requireNonNull(snapshot); | ||
this.blobContainer = Objects.requireNonNull(blobContainer); | ||
this.closed = new AtomicBoolean(false); | ||
} | ||
|
||
protected final FileInfo fileInfo(final String name) throws FileNotFoundException { | ||
return snapshot.indexFiles() | ||
.stream() | ||
.filter(fileInfo -> fileInfo.physicalName().equals(name)) | ||
.findFirst() | ||
.orElseThrow(() -> new FileNotFoundException(name)); | ||
} | ||
|
||
@Override | ||
public final String[] listAll() { | ||
ensureOpen(); | ||
return snapshot.indexFiles().stream().map(FileInfo::physicalName).sorted(String::compareTo).toArray(String[]::new); | ||
} | ||
|
||
@Override | ||
public final long fileLength(final String name) throws IOException { | ||
ensureOpen(); | ||
return fileInfo(name).length(); | ||
} | ||
|
||
@Override | ||
public Set<String> getPendingDeletions() { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public void sync(Collection<String> names) { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public void syncMetaData() { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public void deleteFile(String name) { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public IndexOutput createOutput(String name, IOContext context) { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public void rename(String source, String dest) { | ||
throw unsupportedException(); | ||
} | ||
|
||
private static UnsupportedOperationException unsupportedException() { | ||
assert false : "this operation is not supported and should have not be called"; | ||
return new UnsupportedOperationException("Searchable snapshot directory does not support this operation"); | ||
tlrx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
@Override | ||
public final void close() { | ||
if (closed.compareAndSet(false, true)) { | ||
isOpen = false; | ||
innerClose(); | ||
} | ||
} | ||
|
||
protected void innerClose() {} | ||
} |
101 changes: 101 additions & 0 deletions
101
...apshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.index.store; | ||
|
||
import org.apache.lucene.store.BufferedIndexInput; | ||
import org.apache.lucene.store.IOContext; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.elasticsearch.common.blobstore.BlobContainer; | ||
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; | ||
import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.Objects; | ||
|
||
public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput { | ||
|
||
protected final BlobContainer blobContainer; | ||
protected final FileInfo fileInfo; | ||
protected final IOContext context; | ||
|
||
public BaseSearchableSnapshotIndexInput(String resourceDesc, BlobContainer blobContainer, FileInfo fileInfo, IOContext context) { | ||
super(resourceDesc, context); | ||
this.blobContainer = Objects.requireNonNull(blobContainer); | ||
this.fileInfo = Objects.requireNonNull(fileInfo); | ||
this.context = Objects.requireNonNull(context); | ||
} | ||
|
||
public BaseSearchableSnapshotIndexInput( | ||
String resourceDesc, | ||
BlobContainer blobContainer, | ||
FileInfo fileInfo, | ||
IOContext context, | ||
int bufferSize | ||
) { | ||
this(resourceDesc, blobContainer, fileInfo, context); | ||
setBufferSize(bufferSize); | ||
} | ||
|
||
protected InputStream openInputStream(final long position, final long length) throws IOException { | ||
// TODO move this at the Directory level | ||
DaveCTurner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (fileInfo.metadata().hashEqualsContents()) { | ||
// extract blob content from metadata hash | ||
final BytesRef data = fileInfo.metadata().hash(); | ||
if ((position < 0L) || (length < 0L) || (position + length > data.bytes.length)) { | ||
throw new IllegalArgumentException( | ||
"Invalid arguments (pos=" + position + ", length=" + length + ") for hash content (length=" + data.bytes.length + ')' | ||
); | ||
} | ||
return new ByteArrayInputStream(data.bytes, Math.toIntExact(position), Math.toIntExact(length)); | ||
} | ||
|
||
final long startPart = getPartNumberForPosition(position); | ||
final long endPart = getPartNumberForPosition(position + length); | ||
if ((startPart == endPart) || fileInfo.numberOfParts() == 1L) { | ||
return blobContainer.readBlob(fileInfo.partName(startPart), getRelativePositionInPart(position), length); | ||
} else { | ||
return new SlicedInputStream(endPart - startPart + 1L) { | ||
@Override | ||
protected InputStream openSlice(long slice) throws IOException { | ||
final long currentPart = startPart + slice; | ||
return blobContainer.readBlob( | ||
fileInfo.partName(currentPart), | ||
(currentPart == startPart) ? getRelativePositionInPart(position) : 0L, | ||
(currentPart == endPart) ? getRelativePositionInPart(length) : getLengthOfPart(currentPart) | ||
); | ||
} | ||
}; | ||
} | ||
} | ||
|
||
private long getPartNumberForPosition(long position) { | ||
ensureValidPosition(position); | ||
final long part = position / fileInfo.partSize().getBytes(); | ||
assert part <= fileInfo.numberOfParts() : "part number [" + part + "] exceeds number of parts: " + fileInfo.numberOfParts(); | ||
assert part >= 0L : "part number [" + part + "] is negative"; | ||
return part; | ||
} | ||
|
||
private long getRelativePositionInPart(long position) { | ||
ensureValidPosition(position); | ||
final long pos = position % fileInfo.partSize().getBytes(); | ||
assert pos < fileInfo.partBytes((int) getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length"; | ||
assert pos >= 0L : "position in part [" + pos + "] is negative"; | ||
return pos; | ||
} | ||
|
||
private long getLengthOfPart(long part) { | ||
return fileInfo.partBytes(Math.toIntExact(part)); | ||
} | ||
|
||
private void ensureValidPosition(long position) { | ||
if (position < 0L || position > fileInfo.length()) { | ||
throw new IllegalArgumentException("Position [" + position + "] is invalid"); | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,18 +5,14 @@ | |
*/ | ||
package org.elasticsearch.index.store; | ||
|
||
import org.apache.lucene.store.BaseDirectory; | ||
import org.apache.lucene.store.BufferedIndexInput; | ||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.IOContext; | ||
import org.apache.lucene.store.IndexInput; | ||
import org.apache.lucene.store.IndexOutput; | ||
import org.apache.lucene.store.SingleInstanceLockFactory; | ||
import org.elasticsearch.common.blobstore.BlobContainer; | ||
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.index.shard.ShardPath; | ||
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; | ||
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; | ||
import org.elasticsearch.repositories.IndexId; | ||
import org.elasticsearch.repositories.RepositoriesService; | ||
import org.elasticsearch.repositories.Repository; | ||
|
@@ -25,12 +21,8 @@ | |
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory; | ||
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; | ||
|
||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
import java.util.Collection; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.function.LongSupplier; | ||
|
||
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; | ||
|
@@ -50,95 +42,24 @@ | |
* shard files and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by | ||
* Lucene with the one (or the ones) corresponding blob(s) in the snapshot. | ||
*/ | ||
public class SearchableSnapshotDirectory extends BaseDirectory { | ||
|
||
private final BlobStoreIndexShardSnapshot snapshot; | ||
private final BlobContainer blobContainer; | ||
public class SearchableSnapshotDirectory extends BaseSearchableSnapshotDirectory { | ||
DaveCTurner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobContainer blobContainer) { | ||
super(new SingleInstanceLockFactory()); | ||
this.snapshot = Objects.requireNonNull(snapshot); | ||
this.blobContainer = Objects.requireNonNull(blobContainer); | ||
} | ||
|
||
private FileInfo fileInfo(final String name) throws FileNotFoundException { | ||
return snapshot.indexFiles().stream() | ||
.filter(fileInfo -> fileInfo.physicalName().equals(name)) | ||
.findFirst() | ||
.orElseThrow(() -> new FileNotFoundException(name)); | ||
} | ||
|
||
@Override | ||
public String[] listAll() throws IOException { | ||
ensureOpen(); | ||
return snapshot.indexFiles().stream() | ||
.map(FileInfo::physicalName) | ||
.sorted(String::compareTo) | ||
.toArray(String[]::new); | ||
} | ||
|
||
@Override | ||
public long fileLength(final String name) throws IOException { | ||
ensureOpen(); | ||
return fileInfo(name).length(); | ||
super(blobContainer, snapshot); | ||
} | ||
|
||
@Override | ||
public IndexInput openInput(final String name, final IOContext context) throws IOException { | ||
ensureOpen(); | ||
return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name), blobContainer.readBlobPreferredLength(), | ||
return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name), context, blobContainer.readBlobPreferredLength(), | ||
BufferedIndexInput.BUFFER_SIZE); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
isOpen = false; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return this.getClass().getSimpleName() + "@" + snapshot.snapshot() + " lockFactory=" + lockFactory; | ||
} | ||
|
||
@Override | ||
public Set<String> getPendingDeletions() { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public void sync(Collection<String> names) { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public void syncMetaData() { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public void deleteFile(String name) { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public IndexOutput createOutput(String name, IOContext context) { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { | ||
throw unsupportedException(); | ||
} | ||
|
||
@Override | ||
public void rename(String source, String dest) { | ||
throw unsupportedException(); | ||
} | ||
|
||
private static UnsupportedOperationException unsupportedException() { | ||
return new UnsupportedOperationException("Searchable snapshot directory does not support this operation"); | ||
} | ||
|
||
public static Directory create(RepositoriesService repositories, | ||
CacheService cache, | ||
IndexSettings indexSettings, | ||
|
@@ -158,11 +79,13 @@ public static Directory create(RepositoriesService repositories, | |
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())); | ||
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId); | ||
|
||
Directory directory = new SearchableSnapshotDirectory(snapshot, blobContainer); | ||
final Directory directory; | ||
if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings.getSettings())) { | ||
final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID()); | ||
directory = new CacheDirectory(directory, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(), | ||
directory = new CacheDirectory(snapshot, blobContainer, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With this pull request, CacheDirectory reads blobs using the BlobContainer and does not need to wrap the SearchableSnapshotDirectory anymore. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
currentTimeNanosSupplier); | ||
} else { | ||
directory = new SearchableSnapshotDirectory(snapshot, blobContainer); | ||
} | ||
return new InMemoryNoOpCommitDirectory(directory); | ||
} | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This abstract class has been introduced in order to contain common attributes of existing directories and in order to unify the existing constructors. It should become a concrete
SearchableSnapshotDirectory
when the cache logic and the searchable snapshot logic will be merged together.