diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java deleted file mode 100644 index 40f697b0876be..0000000000000 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.index.store; - -import org.apache.lucene.store.BaseDirectory; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.SingleInstanceLockFactory; -import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -public abstract class BaseSearchableSnapshotDirectory extends BaseDirectory { - - protected final BlobStoreIndexShardSnapshot snapshot; - protected final BlobContainer blobContainer; - private final AtomicBoolean closed; - - public BaseSearchableSnapshotDirectory(BlobContainer blobContainer, BlobStoreIndexShardSnapshot snapshot) { - super(new SingleInstanceLockFactory()); - this.snapshot = Objects.requireNonNull(snapshot); - this.blobContainer = Objects.requireNonNull(blobContainer); - this.closed = new AtomicBoolean(false); - } - - public BlobContainer blobContainer() { - return blobContainer; - } - - protected final FileInfo fileInfo(final String name) throws FileNotFoundException { - return snapshot.indexFiles() - .stream() - .filter(fileInfo -> fileInfo.physicalName().equals(name)) - .findFirst() - .orElseThrow(() -> new FileNotFoundException(name)); - } - - @Override - public final String[] listAll() { - ensureOpen(); - return snapshot.indexFiles().stream().map(FileInfo::physicalName).sorted(String::compareTo).toArray(String[]::new); - } - - @Override - public final long fileLength(final String name) throws IOException { - ensureOpen(); - return fileInfo(name).length(); - } - - @Override - public Set getPendingDeletions() { - throw unsupportedException(); - } - - @Override - public void sync(Collection names) { - throw unsupportedException(); - } - - @Override - public void syncMetaData() { - throw unsupportedException(); - } - - @Override - public void deleteFile(String name) { - throw unsupportedException(); - } - - @Override - public IndexOutput createOutput(String name, IOContext context) { - throw unsupportedException(); - } - - @Override - public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { - throw unsupportedException(); - } - - @Override - public void rename(String source, String dest) { - throw unsupportedException(); - } - - private static UnsupportedOperationException unsupportedException() { - assert false : "this operation is not supported and should have not be called"; - return new UnsupportedOperationException("Searchable snapshot directory does not support this operation"); - } - - @Override - public final void close() { - if (closed.compareAndSet(false, true)) { - isOpen = false; - innerClose(); - } - } - - protected void innerClose() {} -} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index cfd5d8d8a16b2..7c677254f0bfb 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -5,12 +5,19 @@ */ package org.elasticsearch.index.store; +import org.apache.lucene.store.BaseDirectory; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.SingleInstanceLockFactory; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.repositories.IndexId; @@ -18,11 +25,22 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheBufferedIndexInput; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheFile; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheKey; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; +import org.elasticsearch.xpack.searchablesnapshots.cache.IndexInputStats; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; @@ -42,17 +60,175 @@ * shard files and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by * Lucene with the one (or the ones) corresponding blob(s) in the snapshot. */ -public class SearchableSnapshotDirectory extends BaseSearchableSnapshotDirectory { +public class SearchableSnapshotDirectory extends BaseDirectory { - SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobContainer blobContainer) { - super(blobContainer, snapshot); + private final BlobStoreIndexShardSnapshot snapshot; + private final BlobContainer blobContainer; + private final SnapshotId snapshotId; + private final IndexId indexId; + private final ShardId shardId; + private final LongSupplier statsCurrentTimeNanosSupplier; + private final Map stats; + private final CacheService cacheService; + private final boolean useCache; + private final Path cacheDir; + private final AtomicBoolean closed; + + public SearchableSnapshotDirectory( + BlobContainer blobContainer, + BlobStoreIndexShardSnapshot snapshot, + SnapshotId snapshotId, + IndexId indexId, + ShardId shardId, + Settings indexSettings, + LongSupplier currentTimeNanosSupplier, + CacheService cacheService, + Path cacheDir + ) { + super(new SingleInstanceLockFactory()); + this.snapshot = Objects.requireNonNull(snapshot); + this.blobContainer = Objects.requireNonNull(blobContainer); + this.snapshotId = Objects.requireNonNull(snapshotId); + this.indexId = Objects.requireNonNull(indexId); + this.shardId = Objects.requireNonNull(shardId); + this.stats = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + this.statsCurrentTimeNanosSupplier = Objects.requireNonNull(currentTimeNanosSupplier); + this.cacheService = Objects.requireNonNull(cacheService); + this.cacheDir = Objects.requireNonNull(cacheDir); + this.closed = new AtomicBoolean(false); + this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings); + } + + public BlobContainer blobContainer() { + return blobContainer; + } + + public SnapshotId getSnapshotId() { + return snapshotId; + } + + public IndexId getIndexId() { + return indexId; + } + + public ShardId getShardId() { + return shardId; + } + + public Map getStats() { + return Collections.unmodifiableMap(stats); + } + + @Nullable + public IndexInputStats getStats(String fileName) { + return stats.get(fileName); + } + + public long statsCurrentTimeNanos() { + return statsCurrentTimeNanosSupplier.getAsLong(); + } + + private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws FileNotFoundException { + return snapshot.indexFiles() + .stream() + .filter(fileInfo -> fileInfo.physicalName().equals(name)) + .findFirst() + .orElseThrow(() -> new FileNotFoundException(name)); + } + + @Override + public final String[] listAll() { + ensureOpen(); + return snapshot.indexFiles() + .stream() + .map(BlobStoreIndexShardSnapshot.FileInfo::physicalName) + .sorted(String::compareTo) + .toArray(String[]::new); + } + + @Override + public final long fileLength(final String name) throws IOException { + ensureOpen(); + return fileInfo(name).length(); + } + + @Override + public Set getPendingDeletions() { + throw unsupportedException(); + } + + @Override + public void sync(Collection names) { + throw unsupportedException(); + } + + @Override + public void syncMetaData() { + throw unsupportedException(); + } + + @Override + public void deleteFile(String name) { + throw unsupportedException(); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) { + throw unsupportedException(); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { + throw unsupportedException(); + } + + @Override + public void rename(String source, String dest) { + throw unsupportedException(); + } + + private static UnsupportedOperationException unsupportedException() { + assert false : "this operation is not supported and should have not be called"; + return new UnsupportedOperationException("Searchable snapshot directory does not support this operation"); + } + + @Override + public final void close() { + if (closed.compareAndSet(false, true)) { + isOpen = false; + // Ideally we could let the cache evict/remove cached files by itself after the + // directory has been closed. + clearCache(); + } + } + + public void clearCache() { + cacheService.removeFromCache(cacheKey -> cacheKey.belongsTo(snapshotId, indexId, shardId)); + } + + protected IndexInputStats createIndexInputStats(final long fileLength) { + return new IndexInputStats(fileLength); + } + + public CacheKey createCacheKey(String fileName) { + return new CacheKey(snapshotId, indexId, shardId, fileName); + } + + public CacheFile getCacheFile(CacheKey cacheKey, long fileLength) throws Exception { + return cacheService.get(cacheKey, fileLength, cacheDir); } @Override public IndexInput openInput(final String name, final IOContext context) throws IOException { ensureOpen(); - return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name), context, blobContainer.readBlobPreferredLength(), - BufferedIndexInput.BUFFER_SIZE); + final BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfo(name); + final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length())); + if (useCache) { + return new CacheBufferedIndexInput(this, fileInfo, context, inputStats); + } else { + long preferredLength = blobContainer.readBlobPreferredLength(); + return new SearchableSnapshotIndexInput(blobContainer, fileInfo, context, preferredLength, BufferedIndexInput.BUFFER_SIZE); + } } @Override @@ -75,18 +251,27 @@ public static Directory create(RepositoriesService repositories, final IndexId indexId = new IndexId(indexSettings.getIndex().getName(), SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())); final BlobContainer blobContainer = blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id()); - final SnapshotId snapshotId = new SnapshotId(SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())); + final SnapshotId snapshotId = new SnapshotId( + SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()) + ); final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId); - final Directory directory; - if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings.getSettings())) { - final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID()); - directory = new CacheDirectory(snapshot, blobContainer, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(), - currentTimeNanosSupplier); - } else { - directory = new SearchableSnapshotDirectory(snapshot, blobContainer); - } - return new InMemoryNoOpCommitDirectory(directory); + final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID()); + Files.createDirectories(cacheDir); + + return new InMemoryNoOpCommitDirectory( + new SearchableSnapshotDirectory( + blobContainer, + snapshot, + snapshotId, + indexId, + shardPath.getShardId(), + indexSettings.getSettings(), + currentTimeNanosSupplier, + cache, + cacheDir + ) + ); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java index 6ceefbba1d9a1..bca015716271b 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java @@ -25,9 +25,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.store.InMemoryNoOpCommitDirectory; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory; import java.io.IOException; import java.util.ArrayList; @@ -91,20 +91,21 @@ protected ShardsIterator shards(ClusterState state, Request request, String[] co @Override protected ShardOperationResult shardOperation(Request request, ShardRouting shardRouting) throws IOException { final IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.index()).getShard(shardRouting.id()); - final CacheDirectory cacheDirectory = unwrapCacheDirectory(indexShard.store().directory()); - assert cacheDirectory != null; - assert cacheDirectory.getShardId().equals(shardRouting.shardId()); - return executeShardOperation(request, shardRouting, cacheDirectory); + final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory()); + assert directory != null; + assert directory.getShardId().equals(shardRouting.shardId()); + return executeShardOperation(request, shardRouting, directory); } - protected abstract ShardOperationResult executeShardOperation(Request request, ShardRouting shardRouting, - CacheDirectory cacheDirectory) throws IOException; + protected abstract ShardOperationResult executeShardOperation(Request request, + ShardRouting shardRouting, + SearchableSnapshotDirectory directory) throws IOException; @Nullable - private static CacheDirectory unwrapCacheDirectory(Directory dir) { + private static SearchableSnapshotDirectory unwrapDirectory(Directory dir) { while (dir != null) { - if (dir instanceof CacheDirectory) { - return (CacheDirectory) dir; + if (dir instanceof SearchableSnapshotDirectory) { + return (SearchableSnapshotDirectory) dir; } else if (dir instanceof InMemoryNoOpCommitDirectory) { dir = ((InMemoryNoOpCommitDirectory) dir).getRealDirectory(); } else if (dir instanceof FilterDirectory) { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportClearSearchableSnapshotsCacheAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportClearSearchableSnapshotsCacheAction.java index 5a8ceab0fb7bf..a608832c746b8 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportClearSearchableSnapshotsCacheAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportClearSearchableSnapshotsCacheAction.java @@ -14,10 +14,10 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory; import java.io.IOException; import java.util.List; @@ -53,9 +53,10 @@ protected ClearSearchableSnapshotsCacheRequest readRequestFrom(StreamInput in) t } @Override - protected EmptyResult executeShardOperation(ClearSearchableSnapshotsCacheRequest request, ShardRouting shardRouting, - CacheDirectory cacheDirectory) { - cacheDirectory.clearCache(); + protected EmptyResult executeShardOperation(ClearSearchableSnapshotsCacheRequest request, + ShardRouting shardRouting, + SearchableSnapshotDirectory directory) { + directory.clearCache(); return EmptyResult.INSTANCE; } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java index 34cdf668536ac..518d6dc2399fd 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -20,7 +21,6 @@ import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.CacheIndexInputStats; import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.Counter; import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.TimedCounter; -import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory; import org.elasticsearch.xpack.searchablesnapshots.cache.IndexInputStats; import java.io.IOException; @@ -58,10 +58,11 @@ protected SearchableSnapshotsStatsRequest readRequestFrom(StreamInput in) throws } @Override - protected SearchableSnapshotShardStats executeShardOperation(SearchableSnapshotsStatsRequest request, ShardRouting shardRouting, - CacheDirectory cacheDirectory) { - return new SearchableSnapshotShardStats(shardRouting, cacheDirectory.getSnapshotId(), cacheDirectory.getIndexId(), - cacheDirectory.getStats().entrySet().stream() + protected SearchableSnapshotShardStats executeShardOperation(SearchableSnapshotsStatsRequest request, + ShardRouting shardRouting, + SearchableSnapshotDirectory directory) { + return new SearchableSnapshotShardStats(shardRouting, directory.getSnapshotId(), directory.getIndexId(), + directory.getStats().entrySet().stream() .map(entry -> toCacheIndexInputStats(entry.getKey(), entry.getValue())) .collect(Collectors.toList())); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java index 01126d589dfea..6205d64c1030e 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; import java.io.EOFException; import java.io.IOException; @@ -32,7 +33,7 @@ public class CacheBufferedIndexInput extends BaseSearchableSnapshotIndexInput { private static final Logger logger = LogManager.getLogger(CacheBufferedIndexInput.class); private static final int COPY_BUFFER_SIZE = 8192; - private final CacheDirectory directory; + private final SearchableSnapshotDirectory directory; private final long offset; private final long end; private final CacheFileReference cacheFileReference; @@ -47,13 +48,13 @@ public class CacheBufferedIndexInput extends BaseSearchableSnapshotIndexInput { // last seek position is kept around in order to detect forward/backward seeks for stats private long lastSeekPosition; - CacheBufferedIndexInput(CacheDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats) { + public CacheBufferedIndexInput(SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats) { this("CachedBufferedIndexInput(" + fileInfo.physicalName() + ")", directory, fileInfo, context, stats, 0L, fileInfo.length(), false, new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length())); stats.incrementOpenCount(); } - private CacheBufferedIndexInput(String resourceDesc, CacheDirectory directory, FileInfo fileInfo, IOContext context, + private CacheBufferedIndexInput(String resourceDesc, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, long offset, long length, boolean isClone, CacheFileReference cacheFileReference) { super(resourceDesc, directory.blobContainer(), fileInfo, context); @@ -228,10 +229,10 @@ private static class CacheFileReference implements CacheFile.EvictionListener { private final long fileLength; private final CacheKey cacheKey; - private final CacheDirectory directory; + private final SearchableSnapshotDirectory directory; private final AtomicReference cacheFile = new AtomicReference<>(); // null if evicted or not yet acquired - private CacheFileReference(CacheDirectory directory, String fileName, long fileLength) { + private CacheFileReference(SearchableSnapshotDirectory directory, String fileName, long fileLength) { this.cacheKey = directory.createCacheKey(fileName); this.fileLength = fileLength; this.directory = directory; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java deleted file mode 100644 index 4b4d926d0066f..0000000000000 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.searchablesnapshots.cache; - -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; -import org.elasticsearch.index.store.BaseSearchableSnapshotDirectory; -import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.snapshots.SnapshotId; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; -import java.util.Map; -import java.util.Objects; -import java.util.function.LongSupplier; - -/** - * {@link CacheDirectory} uses a {@link CacheService} to cache Lucene files provided by another {@link Directory}. - */ -public class CacheDirectory extends BaseSearchableSnapshotDirectory { - - private final Map stats; - private final CacheService cacheService; - private final SnapshotId snapshotId; - private final IndexId indexId; - private final ShardId shardId; - private final Path cacheDir; - private final LongSupplier currentTimeNanosSupplier; - - public CacheDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobContainer blobContainer, - CacheService cacheService, Path cacheDir, SnapshotId snapshotId, IndexId indexId, ShardId shardId, - LongSupplier currentTimeNanosSupplier) - throws IOException { - super(blobContainer, snapshot); - this.stats = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); - this.cacheService = Objects.requireNonNull(cacheService); - this.cacheDir = Files.createDirectories(cacheDir); - this.snapshotId = Objects.requireNonNull(snapshotId); - this.indexId = Objects.requireNonNull(indexId); - this.shardId = Objects.requireNonNull(shardId); - this.currentTimeNanosSupplier = Objects.requireNonNull(currentTimeNanosSupplier); - } - - CacheKey createCacheKey(String fileName) { - return new CacheKey(snapshotId, indexId, shardId, fileName); - } - - CacheFile getCacheFile(CacheKey cacheKey, long fileLength) throws Exception { - return cacheService.get(cacheKey, fileLength, cacheDir); - } - - public SnapshotId getSnapshotId() { - return snapshotId; - } - - public IndexId getIndexId() { - return indexId; - } - - public ShardId getShardId() { - return shardId; - } - - public Map getStats() { - return Collections.unmodifiableMap(stats); - } - - // pkg private for tests - @Nullable - IndexInputStats getStats(String name) { - return stats.get(name); - } - - // pkg private so tests can override - IndexInputStats createIndexInputStats(final long fileLength) { - return new IndexInputStats(fileLength); - } - - long statsCurrentTimeNanos() { - return currentTimeNanosSupplier.getAsLong(); - } - - @Override - protected void innerClose() { - // Ideally we could let the cache evict/remove cached files by itself after the - // directory has been closed. - clearCache(); - } - - public void clearCache() { - cacheService.removeFromCache(cacheKey -> cacheKey.belongsTo(snapshotId, indexId, shardId)); - } - - @Override - public IndexInput openInput(final String name, final IOContext context) throws IOException { - ensureOpen(); - final FileInfo fileInfo = fileInfo(name); - final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length())); - return new CacheBufferedIndexInput(this, fileInfo, context, inputStats); - } -} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheFile.java index 5d418169baf66..f3711a7a3eff9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheFile.java @@ -27,7 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReentrantReadWriteLock; -class CacheFile { +public class CacheFile { @FunctionalInterface public interface EvictionListener { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheKey.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheKey.java index ebbdc8e72d350..47894738c6069 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheKey.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheKey.java @@ -18,7 +18,7 @@ public class CacheKey { private final ShardId shardId; private final String fileName; - CacheKey(SnapshotId snapshotId, IndexId indexId, ShardId shardId, String fileName) { + public CacheKey(SnapshotId snapshotId, IndexId indexId, ShardId shardId, String fileName) { this.snapshotId = Objects.requireNonNull(snapshotId); this.indexId = Objects.requireNonNull(indexId); this.shardId = Objects.requireNonNull(shardId); @@ -71,7 +71,7 @@ public String toString() { ']'; } - boolean belongsTo(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { + public boolean belongsTo(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { return Objects.equals(this.snapshotId, snapshotId) && Objects.equals(this.indexId, indexId) && Objects.equals(this.shardId, shardId); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index 946e3fff254d6..19b7247615065 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -22,7 +22,8 @@ import java.util.function.Predicate; /** - * {@link CacheService} maintains a cache entry for all files read from cached searchable snapshot directories (see {@link CacheDirectory}) + * {@link CacheService} maintains a cache entry for all files read from searchable snapshot directories ( + * see {@link org.elasticsearch.index.store.SearchableSnapshotDirectory}) */ public class CacheService extends AbstractLifecycleComponent { @@ -115,7 +116,7 @@ public CacheFile get(final CacheKey cacheKey, final long fileLength, final Path * * @param predicate the predicate to evaluate */ - void removeFromCache(final Predicate predicate) { + public void removeFromCache(final Predicate predicate) { for (CacheKey cacheKey : cache.keys()) { if (predicate.test(cacheKey)) { cache.invalidate(cacheKey); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 11fdd5c34daab..74d8cd0f1b151 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -40,15 +40,20 @@ import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.fs.FsBlobContainer; +import org.elasticsearch.common.blobstore.fs.FsBlobStore; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; @@ -65,11 +70,20 @@ import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; +import org.hamcrest.Matcher; import java.io.Closeable; import java.io.EOFException; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -77,8 +91,12 @@ import java.util.Map; import static java.util.Collections.emptyMap; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public class SearchableSnapshotDirectoryTests extends ESTestCase { @@ -350,7 +368,14 @@ private void testDirectories(final CheckedBiConsumer 0L, + cacheService, cacheDir)) { consumer.accept(directory, snapshotDirectory); } } finally { @@ -380,10 +405,86 @@ private void testIndexInputs(final CheckedBiConsumer randomFiles = new ArrayList<>(nbRandomFiles); + + final Path shardSnapshotDir = createTempDir(); + for (int i = 0; i < nbRandomFiles; i++) { + final String fileName = "file_" + randomAlphaOfLength(10); + final byte[] fileContent = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); + final String blobName = randomAlphaOfLength(15); + Files.write(shardSnapshotDir.resolve(blobName), fileContent, StandardOpenOption.CREATE_NEW); + randomFiles.add(new BlobStoreIndexShardSnapshot.FileInfo(blobName, + new StoreFileMetaData(fileName, fileContent.length, "_check", Version.CURRENT.luceneVersion), + new ByteSizeValue(fileContent.length))); + } + + final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot("_snapshot", 0L, randomFiles, 0L, 0L, 0, 0L); + final BlobContainer blobContainer = new FsBlobContainer(new FsBlobStore(Settings.EMPTY, shardSnapshotDir, true), + BlobPath.cleanPath(), shardSnapshotDir); + + final SnapshotId snapshotId = new SnapshotId("_name", "_uuid"); + final IndexId indexId = new IndexId("_id", "_uuid"); + final ShardId shardId = new ShardId(new Index("_name", "_id"), 0); + + final Path cacheDir = createTempDir(); + try (SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(blobContainer, snapshot, snapshotId, indexId, + shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), () -> 0L, cacheService, cacheDir)) { + + final byte[] buffer = new byte[1024]; + for (int i = 0; i < randomIntBetween(10, 50); i++) { + final BlobStoreIndexShardSnapshot.FileInfo fileInfo = randomFrom(randomFiles); + final int fileLength = Math.toIntExact(fileInfo.length()); + + try (IndexInput input = directory.openInput(fileInfo.physicalName(), newIOContext(random()))) { + assertThat(input.length(), equalTo((long) fileLength)); + final int start = between(0, fileLength - 1); + final int end = between(start + 1, fileLength); + + input.seek(start); + while (input.getFilePointer() < end) { + input.readBytes(buffer, 0, Math.toIntExact(Math.min(buffer.length, end - input.getFilePointer()))); + } + } + assertListOfFiles(cacheDir, allOf(greaterThan(0), lessThanOrEqualTo(nbRandomFiles)), greaterThan(0L)); + if (randomBoolean()) { + directory.clearCache(); + assertListOfFiles(cacheDir, equalTo(0), equalTo(0L)); + } + } + } + } + } private static void assertThat(String reason, IndexInput actual, IndexInput expected, CheckedFunction eval) throws IOException { assertThat(reason + "\n\t actual index input: " + actual.toString() + "\n\texpected index input: " + expected.toString(), eval.apply(actual), equalTo(eval.apply(expected))); } + + private void assertListOfFiles(Path cacheDir, Matcher matchNumberOfFiles, Matcher matchSizeOfFiles) throws IOException { + final Map files = new HashMap<>(); + try (DirectoryStream stream = Files.newDirectoryStream(cacheDir)) { + for (Path file : stream) { + final String fileName = file.getFileName().toString(); + if (fileName.equals("write.lock") || fileName.startsWith("extra")) { + continue; + } + try { + if (Files.isRegularFile(file)) { + final BasicFileAttributes fileAttributes = Files.readAttributes(file, BasicFileAttributes.class); + files.put(fileName, fileAttributes.size()); + } + } catch (FileNotFoundException | NoSuchFileException e) { + // ignoring as the cache file might be evicted + } + } + } + assertThat("Number of files (" + files.size() + ") mismatch, got : " + files.keySet(), files.size(), matchNumberOfFiles); + assertThat("Sum of file sizes mismatch, got: " + files, files.values().stream().mapToLong(Long::longValue).sum(), matchSizeOfFiles); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java index 2447aae026780..a65d3a4c48710 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.SnapshotId; @@ -28,6 +29,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.assertCounter; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.createCacheService; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.numberOfRanges; @@ -425,7 +427,11 @@ public void testBackwardSeeks() throws Exception { }); } - private static void executeTestCase(CacheService cacheService, TriConsumer test) throws Exception { + private static void executeTestCase( + final CacheService cacheService, + final TriConsumer test + ) throws Exception { + final byte[] fileContent = randomUnicodeOfLength(randomIntBetween(10, MAX_FILE_LENGTH)).getBytes(StandardCharsets.UTF_8); final String fileName = randomAlphaOfLength(10); final SnapshotId snapshotId = new SnapshotId("_name", "_uuid"); @@ -440,13 +446,14 @@ private static void executeTestCase(CacheService cacheService, TriConsumer files = List.of(new FileInfo(blobName, metaData, new ByteSizeValue(fileContent.length))); final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L, files, 0L, 0L, 0, 0L); + final Settings indexSettings = Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(); try (CacheService ignored = cacheService; - CacheDirectory cacheDirectory = - new CacheDirectory(snapshot, blobContainer, cacheService, createTempDir(), snapshotId, indexId, shardId, - () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS)) { + SearchableSnapshotDirectory directory = + new SearchableSnapshotDirectory(blobContainer, snapshot, snapshotId, indexId, shardId, indexSettings, + () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS), cacheService, createTempDir()) { @Override - IndexInputStats createIndexInputStats(long fileLength) { + protected IndexInputStats createIndexInputStats(long fileLength) { if (seekingThreshold == null) { return super.createIndexInputStats(fileLength); } @@ -455,9 +462,9 @@ IndexInputStats createIndexInputStats(long fileLength) { } ) { cacheService.start(); - assertThat(cacheDirectory.getStats(fileName), nullValue()); + assertThat(directory.getStats(fileName), nullValue()); - test.apply(fileName, fileContent, cacheDirectory); + test.apply(fileName, fileContent, directory); } } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java index dc918997d84ce..61087152feaf2 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java @@ -9,9 +9,11 @@ import org.elasticsearch.Version; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.SnapshotId; @@ -26,6 +28,7 @@ import java.util.Objects; import java.util.concurrent.atomic.LongAdder; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.createCacheService; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.numberOfRanges; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.singleBlobContainer; @@ -56,9 +59,10 @@ public void testRandomReads() throws IOException { } final Path cacheDir = createTempDir(); - try (CacheDirectory cacheDirectory - = new CacheDirectory(snapshot, blobContainer, cacheService, cacheDir, snapshotId, indexId, shardId, () -> 0L)) { - try (IndexInput indexInput = cacheDirectory.openInput(fileName, newIOContext(random()))) { + try (SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(blobContainer, snapshot, snapshotId, indexId, + shardId, Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), () -> 0L, cacheService, cacheDir + )) { + try (IndexInput indexInput = directory.openInput(fileName, newIOContext(random()))) { assertEquals(input.length, indexInput.length()); assertEquals(0, indexInput.getFilePointer()); byte[] output = randomReadAndSlice(indexInput, input.length); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectoryTests.java deleted file mode 100644 index f43493831a043..0000000000000 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectoryTests.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.searchablesnapshots.cache; - -import org.apache.lucene.store.IndexInput; -import org.elasticsearch.Version; -import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.fs.FsBlobContainer; -import org.elasticsearch.common.blobstore.fs.FsBlobStore; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; -import org.elasticsearch.index.store.StoreFileMetaData; -import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.test.ESTestCase; -import org.hamcrest.Matcher; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThanOrEqualTo; - -public class CacheDirectoryTests extends ESTestCase { - - public void testClearCache() throws Exception { - try (CacheService cacheService = new CacheService(Settings.EMPTY)) { - cacheService.start(); - - final int nbRandomFiles = randomIntBetween(3, 10); - final List randomFiles = new ArrayList<>(nbRandomFiles); - - final Path shardSnapshotDir = createTempDir(); - for (int i = 0; i < nbRandomFiles; i++) { - final String fileName = "file_" + randomAlphaOfLength(10); - final byte[] fileContent = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); - final String blobName = randomAlphaOfLength(15); - Files.write(shardSnapshotDir.resolve(blobName), fileContent, StandardOpenOption.CREATE_NEW); - randomFiles.add(new BlobStoreIndexShardSnapshot.FileInfo(blobName, - new StoreFileMetaData(fileName, fileContent.length, "_check", Version.CURRENT.luceneVersion), - new ByteSizeValue(fileContent.length))); - } - - final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot("_snapshot", 0L, randomFiles, 0L, 0L, 0, 0L); - final BlobContainer blobContainer = new FsBlobContainer(new FsBlobStore(Settings.EMPTY, shardSnapshotDir, true), - BlobPath.cleanPath(), shardSnapshotDir); - - final Path cacheDir = createTempDir(); - try (CacheDirectory cacheDirectory = newCacheDirectory(snapshot, blobContainer, cacheService, cacheDir)) { - final byte[] buffer = new byte[1024]; - for (int i = 0; i < randomIntBetween(10, 50); i++) { - final BlobStoreIndexShardSnapshot.FileInfo fileInfo = randomFrom(randomFiles); - final int fileLength = Math.toIntExact(fileInfo.length()); - - try (IndexInput input = cacheDirectory.openInput(fileInfo.physicalName(), newIOContext(random()))) { - assertThat(input.length(), equalTo((long) fileLength)); - final int start = between(0, fileLength - 1); - final int end = between(start + 1, fileLength); - - input.seek(start); - while (input.getFilePointer() < end) { - input.readBytes(buffer, 0, Math.toIntExact(Math.min(buffer.length, end - input.getFilePointer()))); - } - } - assertListOfFiles(cacheDir, allOf(greaterThan(0), lessThanOrEqualTo(nbRandomFiles)), greaterThan(0L)); - if (randomBoolean()) { - cacheDirectory.clearCache(); - assertListOfFiles(cacheDir, equalTo(0), equalTo(0L)); - } - } - } - } - } - - private CacheDirectory newCacheDirectory(BlobStoreIndexShardSnapshot snapshot, BlobContainer container, - CacheService cacheService, Path cacheDir) throws IOException { - return new CacheDirectory(snapshot, container, cacheService, cacheDir, new SnapshotId("_na","_na"), new IndexId("_na", "_na"), - new ShardId("_na", "_na", 0), () -> 0L); - } - - private void assertListOfFiles(Path cacheDir, Matcher matchNumberOfFiles, Matcher matchSizeOfFiles) throws IOException { - final Map files = new HashMap<>(); - try (DirectoryStream stream = Files.newDirectoryStream(cacheDir)) { - for (Path file : stream) { - final String fileName = file.getFileName().toString(); - if (fileName.equals("write.lock") || fileName.startsWith("extra")) { - continue; - } - try { - if (Files.isRegularFile(file)) { - final BasicFileAttributes fileAttributes = Files.readAttributes(file, BasicFileAttributes.class); - files.put(fileName, fileAttributes.size()); - } - } catch (FileNotFoundException | NoSuchFileException e) { - // ignoring as the cache file might be evicted - } - } - } - assertThat("Number of files (" + files.size() + ") mismatch, got : " + files.keySet(), files.size(), matchNumberOfFiles); - assertThat("Sum of file sizes mismatch, got: " + files, files.values().stream().mapToLong(Long::longValue).sum(), matchSizeOfFiles); - } -}