Skip to content

Merge CacheDirectory into SearchableSnapshotDirectory #53917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,42 @@
*/
package org.elasticsearch.index.store;

import org.apache.lucene.store.BaseDirectory;
import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.SingleInstanceLockFactory;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheBufferedIndexInput;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheFile;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheKey;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.IndexInputStats;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
Expand All @@ -42,17 +60,175 @@
* shard files and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by
* Lucene with the one (or the ones) corresponding blob(s) in the snapshot.
*/
public class SearchableSnapshotDirectory extends BaseSearchableSnapshotDirectory {
public class SearchableSnapshotDirectory extends BaseDirectory {

SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobContainer blobContainer) {
super(blobContainer, snapshot);
private final BlobStoreIndexShardSnapshot snapshot;
private final BlobContainer blobContainer;
private final SnapshotId snapshotId;
private final IndexId indexId;
private final ShardId shardId;
private final LongSupplier statsCurrentTimeNanosSupplier;
private final Map<String, IndexInputStats> stats;
private final CacheService cacheService;
private final boolean useCache;
private final Path cacheDir;
private final AtomicBoolean closed;

public SearchableSnapshotDirectory(
BlobContainer blobContainer,
BlobStoreIndexShardSnapshot snapshot,
SnapshotId snapshotId,
IndexId indexId,
ShardId shardId,
Settings indexSettings,
LongSupplier currentTimeNanosSupplier,
CacheService cacheService,
Path cacheDir
) {
super(new SingleInstanceLockFactory());
this.snapshot = Objects.requireNonNull(snapshot);
this.blobContainer = Objects.requireNonNull(blobContainer);
this.snapshotId = Objects.requireNonNull(snapshotId);
this.indexId = Objects.requireNonNull(indexId);
this.shardId = Objects.requireNonNull(shardId);
this.stats = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
this.statsCurrentTimeNanosSupplier = Objects.requireNonNull(currentTimeNanosSupplier);
this.cacheService = Objects.requireNonNull(cacheService);
this.cacheDir = Objects.requireNonNull(cacheDir);
this.closed = new AtomicBoolean(false);
this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings);
}

public BlobContainer blobContainer() {
return blobContainer;
}

public SnapshotId getSnapshotId() {
return snapshotId;
}

public IndexId getIndexId() {
return indexId;
}

public ShardId getShardId() {
return shardId;
}

public Map<String, IndexInputStats> getStats() {
return Collections.unmodifiableMap(stats);
}

@Nullable
public IndexInputStats getStats(String fileName) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comes from the now removed CacheDirectory and made public as cache classes are in a different package

return stats.get(fileName);
}

public long statsCurrentTimeNanos() {
return statsCurrentTimeNanosSupplier.getAsLong();
}

private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws FileNotFoundException {
return snapshot.indexFiles()
.stream()
.filter(fileInfo -> fileInfo.physicalName().equals(name))
.findFirst()
.orElseThrow(() -> new FileNotFoundException(name));
}

@Override
public final String[] listAll() {
ensureOpen();
return snapshot.indexFiles()
.stream()
.map(BlobStoreIndexShardSnapshot.FileInfo::physicalName)
.sorted(String::compareTo)
.toArray(String[]::new);
}

@Override
public final long fileLength(final String name) throws IOException {
ensureOpen();
return fileInfo(name).length();
}

@Override
public Set<String> getPendingDeletions() {
throw unsupportedException();
}

@Override
public void sync(Collection<String> names) {
throw unsupportedException();
}

@Override
public void syncMetaData() {
throw unsupportedException();
}

@Override
public void deleteFile(String name) {
throw unsupportedException();
}

@Override
public IndexOutput createOutput(String name, IOContext context) {
throw unsupportedException();
}

@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
throw unsupportedException();
}

@Override
public void rename(String source, String dest) {
throw unsupportedException();
}

private static UnsupportedOperationException unsupportedException() {
assert false : "this operation is not supported and should have not be called";
return new UnsupportedOperationException("Searchable snapshot directory does not support this operation");
}

@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
isOpen = false;
// Ideally we could let the cache evict/remove cached files by itself after the
// directory has been closed.
clearCache();
}
}

public void clearCache() {
cacheService.removeFromCache(cacheKey -> cacheKey.belongsTo(snapshotId, indexId, shardId));
}

protected IndexInputStats createIndexInputStats(final long fileLength) {
return new IndexInputStats(fileLength);
}

public CacheKey createCacheKey(String fileName) {
return new CacheKey(snapshotId, indexId, shardId, fileName);
}

public CacheFile getCacheFile(CacheKey cacheKey, long fileLength) throws Exception {
return cacheService.get(cacheKey, fileLength, cacheDir);
}

@Override
public IndexInput openInput(final String name, final IOContext context) throws IOException {
ensureOpen();
return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name), context, blobContainer.readBlobPreferredLength(),
BufferedIndexInput.BUFFER_SIZE);
final BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfo(name);
final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length()));
if (useCache) {
return new CacheBufferedIndexInput(this, fileInfo, context, inputStats);
} else {
long preferredLength = blobContainer.readBlobPreferredLength();
return new SearchableSnapshotIndexInput(blobContainer, fileInfo, context, preferredLength, BufferedIndexInput.BUFFER_SIZE);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to rename this one to DirectBufferedIndexInput and to clean it up a bit in a follow up PR

}
}

@Override
Expand All @@ -75,18 +251,27 @@ public static Directory create(RepositoriesService repositories,
final IndexId indexId = new IndexId(indexSettings.getIndex().getName(), SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()));
final BlobContainer blobContainer = blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id());

final SnapshotId snapshotId = new SnapshotId(SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()));
final SnapshotId snapshotId = new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
);
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);

final Directory directory;
if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings.getSettings())) {
final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID());
directory = new CacheDirectory(snapshot, blobContainer, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(),
currentTimeNanosSupplier);
} else {
directory = new SearchableSnapshotDirectory(snapshot, blobContainer);
}
return new InMemoryNoOpCommitDirectory(directory);
final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID());
Files.createDirectories(cacheDir);

return new InMemoryNoOpCommitDirectory(
new SearchableSnapshotDirectory(
blobContainer,
snapshot,
snapshotId,
indexId,
shardPath.getShardId(),
indexSettings.getSettings(),
currentTimeNanosSupplier,
cache,
cacheDir
)
);
}
}
Loading