diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java index 8aafc16b2ccbb..8e19fbf2fe0db 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java @@ -45,6 +45,16 @@ public InputStream readBlob(String name) throws IOException { return delegate.readBlob(name); } + @Override + public InputStream readBlob(String blobName, long position, long length) throws IOException { + return delegate.readBlob(blobName, position, length); + } + + @Override + public long readBlobPreferredLength() { + return delegate.readBlobPreferredLength(); + } + @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { delegate.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java new file mode 100644 index 0000000000000..5a1950ab0b923 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.store; + +import org.apache.lucene.store.BaseDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.SingleInstanceLockFactory; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +public abstract class BaseSearchableSnapshotDirectory extends BaseDirectory { + + protected final BlobStoreIndexShardSnapshot snapshot; + protected final BlobContainer blobContainer; + private final AtomicBoolean closed; + + public BaseSearchableSnapshotDirectory(BlobContainer blobContainer, BlobStoreIndexShardSnapshot snapshot) { + super(new SingleInstanceLockFactory()); + this.snapshot = Objects.requireNonNull(snapshot); + this.blobContainer = Objects.requireNonNull(blobContainer); + this.closed = new AtomicBoolean(false); + } + + protected final FileInfo fileInfo(final String name) throws FileNotFoundException { + return snapshot.indexFiles() + .stream() + .filter(fileInfo -> fileInfo.physicalName().equals(name)) + .findFirst() + .orElseThrow(() -> new FileNotFoundException(name)); + } + + @Override + public final String[] listAll() { + ensureOpen(); + return snapshot.indexFiles().stream().map(FileInfo::physicalName).sorted(String::compareTo).toArray(String[]::new); + } + + @Override + public final long fileLength(final String name) throws IOException { + ensureOpen(); + return fileInfo(name).length(); + } + + @Override + public Set getPendingDeletions() { + throw unsupportedException(); + } + + @Override + public void sync(Collection names) { + throw unsupportedException(); + } + + @Override + public void syncMetaData() { + throw unsupportedException(); + } + + @Override + public void deleteFile(String name) { + throw unsupportedException(); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) { + throw unsupportedException(); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { + throw unsupportedException(); + } + + @Override + public void rename(String source, String dest) { + throw unsupportedException(); + } + + private static UnsupportedOperationException unsupportedException() { + assert false : "this operation is not supported and should have not be called"; + return new UnsupportedOperationException("Searchable snapshot directory does not support this operation"); + } + + @Override + public final void close() { + if (closed.compareAndSet(false, true)) { + isOpen = false; + innerClose(); + } + } + + protected void innerClose() {} +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java new file mode 100644 index 0000000000000..983a97534a4a1 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.store; + +import org.apache.lucene.store.BufferedIndexInput; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; + +public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput { + + protected final BlobContainer blobContainer; + protected final FileInfo fileInfo; + protected final IOContext context; + + public BaseSearchableSnapshotIndexInput(String resourceDesc, BlobContainer blobContainer, FileInfo fileInfo, IOContext context) { + super(resourceDesc, context); + this.blobContainer = Objects.requireNonNull(blobContainer); + this.fileInfo = Objects.requireNonNull(fileInfo); + this.context = Objects.requireNonNull(context); + } + + public BaseSearchableSnapshotIndexInput( + String resourceDesc, + BlobContainer blobContainer, + FileInfo fileInfo, + IOContext context, + int bufferSize + ) { + this(resourceDesc, blobContainer, fileInfo, context); + setBufferSize(bufferSize); + } + + protected InputStream openInputStream(final long position, final long length) throws IOException { + // TODO move this at the Directory level + if (fileInfo.metadata().hashEqualsContents()) { + // extract blob content from metadata hash + final BytesRef data = fileInfo.metadata().hash(); + if ((position < 0L) || (length < 0L) || (position + length > data.bytes.length)) { + throw new IllegalArgumentException( + "Invalid arguments (pos=" + position + ", length=" + length + ") for hash content (length=" + data.bytes.length + ')' + ); + } + return new ByteArrayInputStream(data.bytes, Math.toIntExact(position), Math.toIntExact(length)); + } + + final long startPart = getPartNumberForPosition(position); + final long endPart = getPartNumberForPosition(position + length); + if ((startPart == endPart) || fileInfo.numberOfParts() == 1L) { + return blobContainer.readBlob(fileInfo.partName(startPart), getRelativePositionInPart(position), length); + } else { + return new SlicedInputStream(endPart - startPart + 1L) { + @Override + protected InputStream openSlice(long slice) throws IOException { + final long currentPart = startPart + slice; + return blobContainer.readBlob( + fileInfo.partName(currentPart), + (currentPart == startPart) ? getRelativePositionInPart(position) : 0L, + (currentPart == endPart) ? getRelativePositionInPart(length) : getLengthOfPart(currentPart) + ); + } + }; + } + } + + private long getPartNumberForPosition(long position) { + ensureValidPosition(position); + final long part = position / fileInfo.partSize().getBytes(); + assert part <= fileInfo.numberOfParts() : "part number [" + part + "] exceeds number of parts: " + fileInfo.numberOfParts(); + assert part >= 0L : "part number [" + part + "] is negative"; + return part; + } + + private long getRelativePositionInPart(long position) { + ensureValidPosition(position); + final long pos = position % fileInfo.partSize().getBytes(); + assert pos < fileInfo.partBytes((int) getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length"; + assert pos >= 0L : "position in part [" + pos + "] is negative"; + return pos; + } + + private long getLengthOfPart(long part) { + return fileInfo.partBytes(Math.toIntExact(part)); + } + + private void ensureValidPosition(long position) { + if (position < 0L || position > fileInfo.length()) { + throw new IllegalArgumentException("Position [" + position + "] is invalid"); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index db7f5597da197..cfd5d8d8a16b2 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -5,18 +5,14 @@ */ package org.elasticsearch.index.store; -import org.apache.lucene.store.BaseDirectory; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.SingleInstanceLockFactory; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -25,12 +21,8 @@ import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; -import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Path; -import java.util.Collection; -import java.util.Objects; -import java.util.Set; import java.util.function.LongSupplier; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; @@ -50,95 +42,24 @@ * shard files and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by * Lucene with the one (or the ones) corresponding blob(s) in the snapshot. */ -public class SearchableSnapshotDirectory extends BaseDirectory { - - private final BlobStoreIndexShardSnapshot snapshot; - private final BlobContainer blobContainer; +public class SearchableSnapshotDirectory extends BaseSearchableSnapshotDirectory { SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobContainer blobContainer) { - super(new SingleInstanceLockFactory()); - this.snapshot = Objects.requireNonNull(snapshot); - this.blobContainer = Objects.requireNonNull(blobContainer); - } - - private FileInfo fileInfo(final String name) throws FileNotFoundException { - return snapshot.indexFiles().stream() - .filter(fileInfo -> fileInfo.physicalName().equals(name)) - .findFirst() - .orElseThrow(() -> new FileNotFoundException(name)); - } - - @Override - public String[] listAll() throws IOException { - ensureOpen(); - return snapshot.indexFiles().stream() - .map(FileInfo::physicalName) - .sorted(String::compareTo) - .toArray(String[]::new); - } - - @Override - public long fileLength(final String name) throws IOException { - ensureOpen(); - return fileInfo(name).length(); + super(blobContainer, snapshot); } @Override public IndexInput openInput(final String name, final IOContext context) throws IOException { ensureOpen(); - return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name), blobContainer.readBlobPreferredLength(), + return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name), context, blobContainer.readBlobPreferredLength(), BufferedIndexInput.BUFFER_SIZE); } - @Override - public void close() { - isOpen = false; - } - @Override public String toString() { return this.getClass().getSimpleName() + "@" + snapshot.snapshot() + " lockFactory=" + lockFactory; } - @Override - public Set getPendingDeletions() { - throw unsupportedException(); - } - - @Override - public void sync(Collection names) { - throw unsupportedException(); - } - - @Override - public void syncMetaData() { - throw unsupportedException(); - } - - @Override - public void deleteFile(String name) { - throw unsupportedException(); - } - - @Override - public IndexOutput createOutput(String name, IOContext context) { - throw unsupportedException(); - } - - @Override - public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { - throw unsupportedException(); - } - - @Override - public void rename(String source, String dest) { - throw unsupportedException(); - } - - private static UnsupportedOperationException unsupportedException() { - return new UnsupportedOperationException("Searchable snapshot directory does not support this operation"); - } - public static Directory create(RepositoriesService repositories, CacheService cache, IndexSettings indexSettings, @@ -158,11 +79,13 @@ public static Directory create(RepositoriesService repositories, SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())); final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId); - Directory directory = new SearchableSnapshotDirectory(snapshot, blobContainer); + final Directory directory; if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings.getSettings())) { final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID()); - directory = new CacheDirectory(directory, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(), + directory = new CacheDirectory(snapshot, blobContainer, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(), currentTimeNanosSupplier); + } else { + directory = new SearchableSnapshotDirectory(snapshot, blobContainer); } return new InMemoryNoOpCommitDirectory(directory); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java index fe8aabf510b64..f2eb0324c88c9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java @@ -6,6 +6,7 @@ package org.elasticsearch.index.store; import org.apache.lucene.store.BufferedIndexInput; +import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.CheckedRunnable; @@ -45,10 +46,8 @@ * {@link InputStream} in {@code streamForSequentialReads}. Clones and slices, however, do not expect to be read sequentially and so make * a new request to the {@link BlobContainer} each time their internal buffer needs refilling. */ -public class SearchableSnapshotIndexInput extends BufferedIndexInput { +public class SearchableSnapshotIndexInput extends BaseSearchableSnapshotIndexInput { - private final BlobContainer blobContainer; - private final FileInfo fileInfo; private final long offset; private final long length; @@ -60,18 +59,16 @@ public class SearchableSnapshotIndexInput extends BufferedIndexInput { private long sequentialReadSize; private static final long NO_SEQUENTIAL_READ_OPTIMIZATION = 0L; - - SearchableSnapshotIndexInput(final BlobContainer blobContainer, final FileInfo fileInfo, long sequentialReadSize, int bufferSize) { - this("SearchableSnapshotIndexInput(" + fileInfo.physicalName() + ")", blobContainer, fileInfo, 0L, 0L, fileInfo.length(), + SearchableSnapshotIndexInput(BlobContainer blobContainer, FileInfo fileInfo, IOContext context, + long sequentialReadSize, int bufferSize) { + this("SearchableSnapshotIndexInput(" + fileInfo.physicalName() + ")", blobContainer, fileInfo, context, 0L, 0L, fileInfo.length(), sequentialReadSize, bufferSize); } - private SearchableSnapshotIndexInput(final String resourceDesc, final BlobContainer blobContainer, final FileInfo fileInfo, + private SearchableSnapshotIndexInput(String resourceDesc, BlobContainer blobContainer, FileInfo fileInfo, IOContext context, final long position, final long offset, final long length, final long sequentialReadSize, final int bufferSize) { - super(resourceDesc, bufferSize); - this.blobContainer = Objects.requireNonNull(blobContainer); - this.fileInfo = Objects.requireNonNull(fileInfo); + super(resourceDesc, blobContainer, fileInfo, context, bufferSize); this.offset = offset; this.length = length; this.position = position; @@ -220,7 +217,7 @@ protected void seekInternal(long pos) throws IOException { @Override public BufferedIndexInput clone() { - return new SearchableSnapshotIndexInput("clone(" + this + ")", blobContainer, fileInfo, position, offset, length, + return new SearchableSnapshotIndexInput("clone(" + this + ")", blobContainer, fileInfo, context, position, offset, length, // Clones might not be closed when they are no longer needed, but we must always close streamForSequentialReads. The simple // solution: do not optimize sequential reads on clones. NO_SEQUENTIAL_READ_OPTIMIZATION, @@ -230,8 +227,8 @@ public BufferedIndexInput clone() { @Override public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) { - final SearchableSnapshotIndexInput slice = new SearchableSnapshotIndexInput(sliceDescription, blobContainer, fileInfo, position, - this.offset + offset, length, + final SearchableSnapshotIndexInput slice = new SearchableSnapshotIndexInput(sliceDescription, blobContainer, fileInfo, context, + position, this.offset + offset, length, // Slices might not be closed when they are no longer needed, but we must always close streamForSequentialReads. The simple // solution: do not optimize sequential reads on slices. NO_SEQUENTIAL_READ_OPTIMIZATION, diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java index b1aa6bc6836a7..dc55b46b43593 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java @@ -9,22 +9,26 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import org.elasticsearch.index.store.BaseSearchableSnapshotDirectory; +import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.SnapshotId; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; @@ -39,7 +43,7 @@ /** * {@link CacheDirectory} uses a {@link CacheService} to cache Lucene files provided by another {@link Directory}. */ -public class CacheDirectory extends FilterDirectory { +public class CacheDirectory extends BaseSearchableSnapshotDirectory { private static final Logger logger = LogManager.getLogger(CacheDirectory.class); private static final int COPY_BUFFER_SIZE = 8192; @@ -52,10 +56,11 @@ public class CacheDirectory extends FilterDirectory { private final Path cacheDir; private final LongSupplier currentTimeNanosSupplier; - public CacheDirectory(Directory in, CacheService cacheService, Path cacheDir, SnapshotId snapshotId, IndexId indexId, ShardId shardId, + public CacheDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobContainer blobContainer, + CacheService cacheService, Path cacheDir, SnapshotId snapshotId, IndexId indexId, ShardId shardId, LongSupplier currentTimeNanosSupplier) throws IOException { - super(in); + super(blobContainer, snapshot); this.stats = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); this.cacheService = Objects.requireNonNull(cacheService); this.cacheDir = Files.createDirectories(cacheDir); @@ -96,8 +101,8 @@ IndexInputStats createIndexInputStats(final long fileLength) { return new IndexInputStats(fileLength); } - public void close() throws IOException { - super.close(); + @Override + protected void innerClose() { // Ideally we could let the cache evict/remove cached files by itself after the // directory has been closed. clearCache(); @@ -110,8 +115,9 @@ public void clearCache() { @Override public IndexInput openInput(final String name, final IOContext context) throws IOException { ensureOpen(); - final long fileLength = fileLength(name); - return new CacheBufferedIndexInput(name, fileLength, context, stats.computeIfAbsent(name, n -> createIndexInputStats(fileLength))); + final FileInfo fileInfo = fileInfo(name); + final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length())); + return new CacheBufferedIndexInput(blobContainer, fileInfo, context, inputStats); } private class CacheFileReference implements CacheFile.EvictionListener { @@ -180,9 +186,8 @@ public String toString() { } } - public class CacheBufferedIndexInput extends BufferedIndexInput { + public class CacheBufferedIndexInput extends BaseSearchableSnapshotIndexInput { - private final IOContext ioContext; private final long offset; private final long end; private final CacheFileReference cacheFileReference; @@ -197,16 +202,17 @@ public class CacheBufferedIndexInput extends BufferedIndexInput { // last seek position is kept around in order to detect forward/backward seeks for stats private long lastSeekPosition; - CacheBufferedIndexInput(String fileName, long fileLength, IOContext ioContext, IndexInputStats stats) { - this(new CacheFileReference(fileName, fileLength), ioContext, stats, - "CachedBufferedIndexInput(" + fileName + ")", 0L, fileLength, false); + CacheBufferedIndexInput(BlobContainer blobContainer, FileInfo fileInfo, IOContext context, IndexInputStats stats) { + this("CachedBufferedIndexInput(" + fileInfo.physicalName() + ")", blobContainer, fileInfo, context, + new CacheFileReference(fileInfo.physicalName(), fileInfo.length()), stats, + 0L, fileInfo.length(), false); stats.incrementOpenCount(); } - private CacheBufferedIndexInput(CacheFileReference cacheFileReference, IOContext ioContext, IndexInputStats stats, - String desc, long offset, long length, boolean isClone) { - super(desc, ioContext); - this.ioContext = ioContext; + private CacheBufferedIndexInput(String resourceDesc, BlobContainer blobContainer, FileInfo fileInfo, IOContext context, + CacheFileReference cacheFileReference, IndexInputStats stats, + long offset, long length, boolean isClone) { + super(resourceDesc, blobContainer, fileInfo, context); this.offset = offset; this.cacheFileReference = cacheFileReference; this.stats = stats; @@ -287,23 +293,21 @@ int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int of @SuppressForbidden(reason = "Use positional writes on purpose") void writeCacheFile(FileChannel fc, long start, long end) throws IOException { assert assertFileChannelOpen(fc); - final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, end - start))]; + final long length = end - start; + final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))]; logger.trace(() -> new ParameterizedMessage("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference)); int bytesCopied = 0; - try (IndexInput input = in.openInput(cacheFileReference.getFileName(), ioContext)) { + final long startTimeNanos = currentTimeNanosSupplier.getAsLong(); + try (InputStream input = openInputStream(start, length)) { stats.incrementInnerOpenCount(); - final long startTimeNanos = currentTimeNanosSupplier.getAsLong(); - if (start > 0) { - input.seek(start); - } long remaining = end - start; while (remaining > 0) { - final int size = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length; - input.readBytes(copyBuffer, 0, size); - fc.write(ByteBuffer.wrap(copyBuffer, 0, size), start + bytesCopied); - bytesCopied += size; - remaining -= size; + final int len = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length; + int bytesRead = input.read(copyBuffer, 0, len); + fc.write(ByteBuffer.wrap(copyBuffer, 0, bytesRead), start + bytesCopied); + bytesCopied += bytesRead; + remaining -= bytesRead; } final long endTimeNanos = currentTimeNanosSupplier.getAsLong(); stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos); @@ -336,8 +340,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) { throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset + ",length=" + length + ",fileLength=" + this.length() + ": " + this); } - return new CacheBufferedIndexInput(cacheFileReference, ioContext, stats, - getFullSliceDescription(sliceDescription), this.offset + offset, length, true); + return new CacheBufferedIndexInput(getFullSliceDescription(sliceDescription), blobContainer, fileInfo, context, + cacheFileReference, stats, this.offset + offset, length, true); } @Override @@ -352,24 +356,22 @@ public String toString() { } private int readDirectly(long start, long end, byte[] buffer, int offset) throws IOException { - final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, end - start))]; + final long length = end - start; + final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))]; logger.trace(() -> new ParameterizedMessage("direct reading of range [{}-{}] for cache file [{}]", start, end, cacheFileReference)); int bytesCopied = 0; - try (IndexInput input = in.openInput(cacheFileReference.getFileName(), ioContext)) { + final long startTimeNanos = currentTimeNanosSupplier.getAsLong(); + try (InputStream input = openInputStream(start, length)) { stats.incrementInnerOpenCount(); - final long startTimeNanos = currentTimeNanosSupplier.getAsLong(); - if (start > 0) { - input.seek(start); - } long remaining = end - start; while (remaining > 0) { final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length; - input.readBytes(copyBuffer, 0, len); + int bytesRead = input.read(copyBuffer, 0, len); System.arraycopy(copyBuffer, 0, buffer, offset + bytesCopied, len); - bytesCopied += len; - remaining -= len; + bytesCopied += bytesRead; + remaining -= bytesRead; } final long endTimeNanos = currentTimeNanosSupplier.getAsLong(); stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java index fbb3d6b94dd56..0cb4b3e46ba38 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java @@ -93,7 +93,7 @@ public int read(byte[] b, int off, int len) throws IOException { }; } }); - return new SearchableSnapshotIndexInput(blobContainer, fileInfo, minimumReadSize, + return new SearchableSnapshotIndexInput(blobContainer, fileInfo, newIOContext(random()), minimumReadSize, randomBoolean() ? BufferedIndexInput.BUFFER_SIZE : between(BufferedIndexInput.MIN_BUFFER_SIZE, BufferedIndexInput.BUFFER_SIZE)); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java index 518c980792915..2447aae026780 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java @@ -6,28 +6,33 @@ package org.elasticsearch.xpack.searchablesnapshots.cache; import org.apache.lucene.store.BufferedIndexInput; -import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.Version; import org.elasticsearch.common.TriConsumer; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.SnapshotId; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.assertCounter; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.createCacheService; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.numberOfRanges; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.randomCacheRangeSize; +import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.singleBlobContainer; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -430,10 +435,15 @@ private static void executeTestCase(CacheService cacheService, TriConsumer files = List.of(new FileInfo(blobName, metaData, new ByteSizeValue(fileContent.length))); + final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L, files, 0L, 0L, 0, 0L); + try (CacheService ignored = cacheService; - Directory directory = newDirectory(); CacheDirectory cacheDirectory = - new CacheDirectory(directory, cacheService, createTempDir(), snapshotId, indexId, shardId, + new CacheDirectory(snapshot, blobContainer, cacheService, createTempDir(), snapshotId, indexId, shardId, () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS)) { @Override IndexInputStats createIndexInputStats(long fileLength) { @@ -447,10 +457,6 @@ IndexInputStats createIndexInputStats(long fileLength) { cacheService.start(); assertThat(cacheDirectory.getStats(fileName), nullValue()); - final IndexOutput indexOutput = directory.createOutput(fileName, newIOContext(random())); - indexOutput.writeBytes(fileContent, fileContent.length); - indexOutput.close(); - test.apply(fileName, fileContent, cacheDirectory); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java index 479dac8ee0d73..dc918997d84ce 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java @@ -5,25 +5,30 @@ */ package org.elasticsearch.xpack.searchablesnapshots.cache; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; -import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; +import org.elasticsearch.Version; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.mockstore.BlobContainerWrapper; -import java.io.FileNotFoundException; +import java.io.FilterInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.LongAdder; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.createCacheService; import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.numberOfRanges; +import static org.elasticsearch.xpack.searchablesnapshots.cache.TestUtils.singleBlobContainer; import static org.hamcrest.Matchers.equalTo; public class CacheBufferedIndexInputTests extends ESIndexInputTestCase { @@ -40,14 +45,19 @@ public void testRandomReads() throws IOException { final String fileName = randomAlphaOfLength(10); final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); - Directory directory = new SingleFileDirectory(fileName, input); + final String blobName = randomUnicodeOfLength(10); + final StoreFileMetaData metaData = new StoreFileMetaData(fileName, input.length, "_na", Version.CURRENT.luceneVersion); + final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L, + List.of(new BlobStoreIndexShardSnapshot.FileInfo(blobName, metaData, new ByteSizeValue(input.length))), 0L, 0L, 0, 0L); + + BlobContainer blobContainer = singleBlobContainer(blobName, input); if (input.length <= cacheService.getCacheSize()) { - directory = new CountingDirectory(directory, cacheService.getRangeSize()); + blobContainer = new CountingBlobContainer(blobContainer, cacheService.getRangeSize()); } final Path cacheDir = createTempDir(); try (CacheDirectory cacheDirectory - = new CacheDirectory(directory, cacheService, cacheDir, snapshotId, indexId, shardId, () -> 0L)) { + = new CacheDirectory(snapshot, blobContainer, cacheService, cacheDir, snapshotId, indexId, shardId, () -> 0L)) { try (IndexInput indexInput = cacheDirectory.openInput(fileName, newIOContext(random()))) { assertEquals(input.length, indexInput.length()); assertEquals(0, indexInput.getFilePointer()); @@ -56,145 +66,102 @@ public void testRandomReads() throws IOException { } } - if (directory instanceof CountingDirectory) { + if (blobContainer instanceof CountingBlobContainer) { long numberOfRanges = numberOfRanges(input.length, cacheService.getRangeSize()); assertThat("Expected " + numberOfRanges + " ranges fetched from the source", - ((CountingDirectory) directory).totalOpens.sum(), equalTo(numberOfRanges)); + ((CountingBlobContainer) blobContainer).totalOpens.sum(), equalTo(numberOfRanges)); assertThat("All bytes should have been read from source", - ((CountingDirectory) directory).totalBytes.sum(), equalTo((long) input.length)); + ((CountingBlobContainer) blobContainer).totalBytes.sum(), equalTo((long) input.length)); } - - directory.close(); } } } - /** - * FilterDirectory that provides a single IndexInput with a given name and content. - */ - private static class SingleFileDirectory extends FilterDirectory { - - private final String fileName; - private final byte[] fileContent; - - SingleFileDirectory(final String fileName, final byte[] fileContent) { - super(null); - this.fileName = Objects.requireNonNull(fileName); - this.fileContent = Objects.requireNonNull(fileContent); - } - - @Override - public String[] listAll() { - return new String[]{fileName}; - } - - @Override - public long fileLength(String name) throws IOException { - if (name.equals(fileName)) { - return fileContent.length; - } - throw new FileNotFoundException(name); - } - - @Override - public IndexInput openInput(String name, IOContext context) throws IOException { - if (name.equals(fileName)) { - return new ByteArrayIndexInput(fileName, fileContent); - } - throw new FileNotFoundException(name); - } - - @Override - public void close() { - } - } /** - * FilterDirectory that counts the number of IndexInput it opens, as well as the + * BlobContainer that counts the number of {@link java.io.InputStream} it opens, as well as the * total number of bytes read from them. */ - private static class CountingDirectory extends FilterDirectory { + private static class CountingBlobContainer extends BlobContainerWrapper { private final LongAdder totalBytes = new LongAdder(); private final LongAdder totalOpens = new LongAdder(); private final int rangeSize; - CountingDirectory(Directory in, int rangeSize) { + CountingBlobContainer(BlobContainer in, int rangeSize) { super(in); this.rangeSize = rangeSize; } @Override - public IndexInput openInput(String name, IOContext context) throws IOException { - return new CountingIndexInput(this, super.openInput(name, context), rangeSize); + public InputStream readBlob(String blobName, long position, long length) throws IOException { + return new CountingInputStream(this, super.readBlob(blobName, position, length), length, rangeSize); + } + + @Override + public InputStream readBlob(String name) { + assert false : "this method should never be called"; + throw new UnsupportedOperationException(); } } /** - * IndexInput that counts the number of bytes read from it, as well as the positions + * InputStream that counts the number of bytes read from it, as well as the positions * where read operations start and finish. */ - private static class CountingIndexInput extends IndexInput { + private static class CountingInputStream extends FilterInputStream { - private final CountingDirectory dir; - private final IndexInput in; + private final CountingBlobContainer container; private final int rangeSize; + private final long length; private long bytesRead = 0L; + private long position = 0L; private long start = Long.MAX_VALUE; private long end = Long.MIN_VALUE; - CountingIndexInput(CountingDirectory directory, IndexInput input, int rangeSize) { - super("CountingIndexInput(" + input + ")"); - this.dir = Objects.requireNonNull(directory); - this.in = Objects.requireNonNull(input); + CountingInputStream(CountingBlobContainer container, InputStream input, long length, int rangeSize) { + super(input); + this.container = Objects.requireNonNull(container); this.rangeSize = rangeSize; - dir.totalOpens.increment(); + this.length = length; + this.container.totalOpens.increment(); } @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - if (getFilePointer() < start) { - start = getFilePointer(); + public int read() throws IOException { + if (position < start) { + start = position; } - in.readBytes(b, offset, len); - bytesRead += len; - - if (getFilePointer() > end) { - end = getFilePointer(); + final int result = in.read(); + if (result == -1) { + return result; } - } - - @Override - public byte readByte() { - throw new UnsupportedOperationException(); - } - - @Override - public long getFilePointer() { - return in.getFilePointer(); - } + bytesRead += 1L; + position += 1L; - @Override - public void seek(long pos) throws IOException { - in.seek(pos); + if (position > end) { + end = position; + } + return result; } @Override - public long length() { - return in.length(); - } + public int read(byte[] b, int offset, int len) throws IOException { + if (position < start) { + start = position; + } - @Override - public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { - return new CountingIndexInput(dir, in.slice(sliceDescription, offset, length), rangeSize); - } + final int result = in.read(b, offset, len); + bytesRead += len; + position += len; - @Override - public IndexInput clone() { - return new CountingIndexInput(dir, in.clone(), rangeSize); + if (position > end) { + end = position; + } + return result; } @Override @@ -204,27 +171,27 @@ public void close() throws IOException { throw new AssertionError("Read operation should start at the beginning of a range"); } if (end % rangeSize != 0) { - if (end != in.length()) { + if (end != length) { throw new AssertionError("Read operation should finish at the end of a range or the end of the file"); } } - if (in.length() <= rangeSize) { - if (bytesRead != in.length()) { - throw new AssertionError("All [" + in.length() + "] bytes should have been read, no more no less but got:" + bytesRead); + if (length <= rangeSize) { + if (bytesRead != length) { + throw new AssertionError("All [" + length + "] bytes should have been read, no more no less but got:" + bytesRead); } } else { if (bytesRead != rangeSize) { - if (end != in.length()) { + if (end != length) { throw new AssertionError("Expecting [" + rangeSize + "] bytes to be read but got:" + bytesRead); } - final long remaining = in.length() % rangeSize; + final long remaining = length % rangeSize; if (bytesRead != remaining) { throw new AssertionError("Expecting [" + remaining + "] bytes to be read but got:" + bytesRead); } } } - dir.totalBytes.add(bytesRead); + this.container.totalBytes.add(bytesRead); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectoryTests.java index 57b428dd2aa2f..f43493831a043 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectoryTests.java @@ -5,11 +5,17 @@ */ package org.elasticsearch.xpack.searchablesnapshots.cache; -import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.Version; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.fs.FsBlobContainer; +import org.elasticsearch.common.blobstore.fs.FsBlobStore; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.ESTestCase; @@ -22,8 +28,11 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.allOf; @@ -36,50 +45,55 @@ public class CacheDirectoryTests extends ESTestCase { public void testClearCache() throws Exception { try (CacheService cacheService = new CacheService(Settings.EMPTY)) { cacheService.start(); - try (Directory directory = newDirectory()) { - final int nbRandomFiles = randomIntBetween(3, 10); - final Map randomFiles = new HashMap<>(nbRandomFiles); - for (int i = 0; i < nbRandomFiles; i++) { - final String fileName = randomAlphaOfLength(10); - final byte[] fileContent = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); + final int nbRandomFiles = randomIntBetween(3, 10); + final List randomFiles = new ArrayList<>(nbRandomFiles); - final IndexOutput indexOutput = directory.createOutput(fileName, newIOContext(random())); - indexOutput.writeBytes(fileContent, fileContent.length); - indexOutput.close(); - randomFiles.put(fileName, fileContent.length); - } + final Path shardSnapshotDir = createTempDir(); + for (int i = 0; i < nbRandomFiles; i++) { + final String fileName = "file_" + randomAlphaOfLength(10); + final byte[] fileContent = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); + final String blobName = randomAlphaOfLength(15); + Files.write(shardSnapshotDir.resolve(blobName), fileContent, StandardOpenOption.CREATE_NEW); + randomFiles.add(new BlobStoreIndexShardSnapshot.FileInfo(blobName, + new StoreFileMetaData(fileName, fileContent.length, "_check", Version.CURRENT.luceneVersion), + new ByteSizeValue(fileContent.length))); + } - final Path cacheDir = createTempDir(); - try (CacheDirectory cacheDirectory = newCacheDirectory(directory, cacheService, cacheDir)) { - final byte[] buffer = new byte[1024]; - for (int i = 0; i < randomIntBetween(10, 50); i++) { - final String fileName = randomFrom(randomFiles.keySet()); - final int fileLength = randomFiles.get(fileName); + final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot("_snapshot", 0L, randomFiles, 0L, 0L, 0, 0L); + final BlobContainer blobContainer = new FsBlobContainer(new FsBlobStore(Settings.EMPTY, shardSnapshotDir, true), + BlobPath.cleanPath(), shardSnapshotDir); - try (IndexInput input = cacheDirectory.openInput(fileName, newIOContext(random()))) { - assertThat(input.length(), equalTo((long) fileLength)); - final int start = between(0, fileLength - 1); - final int end = between(start + 1, fileLength); + final Path cacheDir = createTempDir(); + try (CacheDirectory cacheDirectory = newCacheDirectory(snapshot, blobContainer, cacheService, cacheDir)) { + final byte[] buffer = new byte[1024]; + for (int i = 0; i < randomIntBetween(10, 50); i++) { + final BlobStoreIndexShardSnapshot.FileInfo fileInfo = randomFrom(randomFiles); + final int fileLength = Math.toIntExact(fileInfo.length()); - input.seek(start); - while (input.getFilePointer() < end) { - input.readBytes(buffer, 0, Math.toIntExact(Math.min(buffer.length, end - input.getFilePointer()))); - } - } - assertListOfFiles(cacheDir, allOf(greaterThan(0), lessThanOrEqualTo(nbRandomFiles)), greaterThan(0L)); - if (randomBoolean()) { - cacheDirectory.clearCache(); - assertListOfFiles(cacheDir, equalTo(0), equalTo(0L)); + try (IndexInput input = cacheDirectory.openInput(fileInfo.physicalName(), newIOContext(random()))) { + assertThat(input.length(), equalTo((long) fileLength)); + final int start = between(0, fileLength - 1); + final int end = between(start + 1, fileLength); + + input.seek(start); + while (input.getFilePointer() < end) { + input.readBytes(buffer, 0, Math.toIntExact(Math.min(buffer.length, end - input.getFilePointer()))); } } + assertListOfFiles(cacheDir, allOf(greaterThan(0), lessThanOrEqualTo(nbRandomFiles)), greaterThan(0L)); + if (randomBoolean()) { + cacheDirectory.clearCache(); + assertListOfFiles(cacheDir, equalTo(0), equalTo(0L)); + } } } } } - private CacheDirectory newCacheDirectory(Directory directory, CacheService cacheService, Path cacheDir) throws IOException { - return new CacheDirectory(directory, cacheService, cacheDir, new SnapshotId("_na","_na"), new IndexId("_na", "_na"), + private CacheDirectory newCacheDirectory(BlobStoreIndexShardSnapshot snapshot, BlobContainer container, + CacheService cacheService, Path cacheDir) throws IOException { + return new CacheDirectory(snapshot, container, cacheService, cacheDir, new SnapshotId("_na","_na"), new IndexId("_na", "_na"), new ShardId("_na", "_na", 0), () -> 0L); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/TestUtils.java index f3023a0a8969d..bae63733b5ab8 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/TestUtils.java @@ -5,10 +5,20 @@ */ package org.elasticsearch.xpack.searchablesnapshots.cache; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.DeleteResult; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; import java.util.List; +import java.util.Map; import java.util.Random; import static com.carrotsearch.randomizedtesting.generators.RandomNumbers.randomIntBetween; @@ -52,4 +62,71 @@ static void assertCounter(IndexInputStats.Counter counter, long total, long coun assertThat(counter.min(), equalTo(min)); assertThat(counter.max(), equalTo(max)); } + + /** + * A {@link BlobContainer} that can read a single in-memory blob. + * Any attempt to read a different blob will throw a {@link FileNotFoundException} + */ + static BlobContainer singleBlobContainer(final String blobName, final byte[] blobContent) { + return new BlobContainer() { + + @Override + public InputStream readBlob(String name, long position, long length) throws IOException { + if (blobName.equals(name) == false) { + throw new FileNotFoundException("Blob not found: " + name); + } + return Streams.limitStream(new ByteArrayInputStream(blobContent, Math.toIntExact(position), blobContent.length), length); + } + + @Override + public Map listBlobs() { + throw unsupportedException(); + } + + @Override + public BlobPath path() { + throw unsupportedException(); + } + + @Override + public InputStream readBlob(String blobName) { + throw unsupportedException(); + } + + @Override + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) { + throw unsupportedException(); + } + + @Override + public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) { + throw unsupportedException(); + } + + @Override + public DeleteResult delete() { + throw unsupportedException(); + } + + @Override + public void deleteBlobsIgnoringIfNotExists(List blobNames) { + throw unsupportedException(); + } + + @Override + public Map children() { + throw unsupportedException(); + } + + @Override + public Map listBlobsByPrefix(String blobNamePrefix) { + throw unsupportedException(); + } + + private UnsupportedOperationException unsupportedException() { + assert false : "this operation is not supported and should have not be called"; + return new UnsupportedOperationException("This operation is not supported"); + } + }; + } }