diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java index c5b5024e20104..36ea9ba4c7303 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java @@ -118,14 +118,12 @@ public int hashCode() { return Objects.hash(shardRouting, snapshotId, indexId, inputStats); } - public static class CacheIndexInputStats implements Writeable, ToXContentObject { private final String fileName; private final long fileLength; private final long openCount; - private final long innerCount; private final long closeCount; private final Counter forwardSmallSeeks; @@ -137,17 +135,17 @@ public static class CacheIndexInputStats implements Writeable, ToXContentObject private final Counter cachedBytesRead; private final TimedCounter cachedBytesWritten; private final TimedCounter directBytesRead; + private final TimedCounter optimizedBytesRead; - public CacheIndexInputStats(String fileName, long fileLength, long openCount, long innerCount, long closeCount, + public CacheIndexInputStats(String fileName, long fileLength, long openCount, long closeCount, Counter forwardSmallSeeks, Counter backwardSmallSeeks, Counter forwardLargeSeeks, Counter backwardLargeSeeks, Counter contiguousReads, Counter nonContiguousReads, Counter cachedBytesRead, TimedCounter cachedBytesWritten, - TimedCounter directBytesRead) { + TimedCounter directBytesRead, TimedCounter optimizedBytesRead) { this.fileName = fileName; this.fileLength = fileLength; this.openCount = openCount; - this.innerCount = innerCount; this.closeCount = closeCount; this.forwardSmallSeeks = forwardSmallSeeks; this.backwardSmallSeeks = backwardSmallSeeks; @@ -158,13 +156,13 @@ public CacheIndexInputStats(String fileName, long fileLength, long openCount, lo this.cachedBytesRead = cachedBytesRead; this.cachedBytesWritten = cachedBytesWritten; this.directBytesRead = directBytesRead; + this.optimizedBytesRead = optimizedBytesRead; } CacheIndexInputStats(final StreamInput in) throws IOException { this.fileName = in.readString(); this.fileLength = in.readVLong(); this.openCount = in.readVLong(); - this.innerCount = in.readVLong(); this.closeCount = in.readVLong(); this.forwardSmallSeeks = new Counter(in); this.backwardSmallSeeks = new Counter(in); @@ -175,6 +173,7 @@ public CacheIndexInputStats(String fileName, long fileLength, long openCount, lo this.cachedBytesRead = new Counter(in); this.cachedBytesWritten = new TimedCounter(in); this.directBytesRead = new TimedCounter(in); + this.optimizedBytesRead = new TimedCounter(in); } @Override @@ -182,7 +181,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(fileName); out.writeVLong(fileLength); out.writeVLong(openCount); - out.writeVLong(innerCount); out.writeVLong(closeCount); forwardSmallSeeks.writeTo(out); @@ -194,6 +192,7 @@ public void writeTo(StreamOutput out) throws IOException { cachedBytesRead.writeTo(out); cachedBytesWritten.writeTo(out); directBytesRead.writeTo(out); + optimizedBytesRead.writeTo(out); } public String getFileName() { @@ -208,10 +207,6 @@ public long getOpenCount() { return openCount; } - public long getInnerCount() { - return innerCount; - } - public long getCloseCount() { return closeCount; } @@ -252,6 +247,10 @@ public TimedCounter getDirectBytesRead() { return directBytesRead; } + public TimedCounter getOptimizedBytesRead() { + return optimizedBytesRead; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -259,13 +258,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("name", getFileName()); builder.field("length", getFileLength()); builder.field("open_count", getOpenCount()); - builder.field("inner_count", getInnerCount()); builder.field("close_count", getCloseCount()); builder.field("contiguous_bytes_read", getContiguousReads()); builder.field("non_contiguous_bytes_read", getNonContiguousReads()); builder.field("cached_bytes_read", getCachedBytesRead()); builder.field("cached_bytes_written", getCachedBytesWritten()); builder.field("direct_bytes_read", getDirectBytesRead()); + builder.field("optimized_bytes_read", getOptimizedBytesRead()); { builder.startObject("forward_seeks"); builder.field("small", getForwardSmallSeeks()); @@ -293,7 +292,6 @@ public boolean equals(Object other) { CacheIndexInputStats stats = (CacheIndexInputStats) other; return fileLength == stats.fileLength && openCount == stats.openCount - && innerCount == stats.innerCount && closeCount == stats.closeCount && Objects.equals(fileName, stats.fileName) && Objects.equals(forwardSmallSeeks, stats.forwardSmallSeeks) @@ -304,17 +302,18 @@ public boolean equals(Object other) { && Objects.equals(nonContiguousReads, stats.nonContiguousReads) && Objects.equals(cachedBytesRead, stats.cachedBytesRead) && Objects.equals(cachedBytesWritten, stats.cachedBytesWritten) - && Objects.equals(directBytesRead, stats.directBytesRead); + && Objects.equals(directBytesRead, stats.directBytesRead) + && Objects.equals(optimizedBytesRead, stats.optimizedBytesRead); } @Override public int hashCode() { - return Objects.hash(fileName, fileLength, openCount, innerCount, closeCount, + return Objects.hash(fileName, fileLength, openCount, closeCount, forwardSmallSeeks, backwardSmallSeeks, forwardLargeSeeks, backwardLargeSeeks, contiguousReads, nonContiguousReads, cachedBytesRead, cachedBytesWritten, - directBytesRead); + directBytesRead, optimizedBytesRead); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java index d15d6b25e455e..dade0a0ca4204 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java @@ -42,12 +42,12 @@ protected SearchableSnapshotShardStats createTestInstance() { private CacheIndexInputStats randomCacheIndexInputStats() { return new CacheIndexInputStats(randomAlphaOfLength(10), randomNonNegativeLong(), - randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong(), randomCounter(), randomCounter(), randomCounter(), randomCounter(), randomCounter(), randomCounter(), randomCounter(), randomTimedCounter(), - randomTimedCounter()); + randomTimedCounter(), randomTimedCounter()); } private Counter randomCounter() { diff --git a/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml b/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml index c7d2277ac6e9a..34b537d568c83 100644 --- a/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml +++ b/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml @@ -147,7 +147,6 @@ teardown: - is_true: indices.docs.shards.0.0.files.0.name - gt: { indices.docs.shards.0.0.files.0.length: 0 } - gt: { indices.docs.shards.0.0.files.0.open_count: 0 } - - gte: { indices.docs.shards.0.0.files.0.inner_count: 0 } - gt: { indices.docs.shards.0.0.files.0.close_count: 0 } - gte: { indices.docs.shards.0.0.files.0.contiguous_bytes_read.count: 0 } @@ -179,6 +178,13 @@ teardown: - gte: { indices.docs.shards.0.0.files.0.direct_bytes_read.time_in_nanos: 0 } - is_false: indices.docs.shards.0.0.files.0.direct_bytes_read.time + - gte: { indices.docs.shards.0.0.files.0.optimized_bytes_read.count: 0 } + - gte: { indices.docs.shards.0.0.files.0.optimized_bytes_read.sum: 0 } + - gte: { indices.docs.shards.0.0.files.0.optimized_bytes_read.min: 0 } + - gte: { indices.docs.shards.0.0.files.0.optimized_bytes_read.max: 0 } + - gte: { indices.docs.shards.0.0.files.0.optimized_bytes_read.time_in_nanos: 0 } + - is_false: indices.docs.shards.0.0.files.0.optimized_bytes_read.time + - gte: { indices.docs.shards.0.0.files.0.forward_seeks.small.count: 0 } - gte: { indices.docs.shards.0.0.files.0.forward_seeks.small.sum: 0 } - gte: { indices.docs.shards.0.0.files.0.forward_seeks.small.min: 0 } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 140f92084ddc3..13c24b2fde0d1 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -16,20 +16,41 @@ import java.io.IOException; import java.io.InputStream; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput { protected final BlobContainer blobContainer; protected final FileInfo fileInfo; protected final IOContext context; + protected final IndexInputStats stats; + protected final long offset; + protected final long length; - public BaseSearchableSnapshotIndexInput(String resourceDesc, BlobContainer blobContainer, FileInfo fileInfo, IOContext context) { + // the following are only mutable so they can be adjusted after cloning/slicing + protected volatile boolean isClone; + private AtomicBoolean closed; + + public BaseSearchableSnapshotIndexInput( + String resourceDesc, + BlobContainer blobContainer, + FileInfo fileInfo, + IOContext context, + IndexInputStats stats, + long offset, + long length + ) { super(resourceDesc, context); this.blobContainer = Objects.requireNonNull(blobContainer); this.fileInfo = Objects.requireNonNull(fileInfo); this.context = Objects.requireNonNull(context); assert fileInfo.metadata().hashEqualsContents() == false : "this method should only be used with blobs that are NOT stored in metadata's hash field (fileInfo: " + fileInfo + ')'; + this.stats = Objects.requireNonNull(stats); + this.offset = offset; + this.length = length; + this.closed = new AtomicBoolean(false); + this.isClone = false; } public BaseSearchableSnapshotIndexInput( @@ -37,12 +58,46 @@ public BaseSearchableSnapshotIndexInput( BlobContainer blobContainer, FileInfo fileInfo, IOContext context, + IndexInputStats stats, + long offset, + long length, int bufferSize ) { - this(resourceDesc, blobContainer, fileInfo, context); + this(resourceDesc, blobContainer, fileInfo, context, stats, offset, length); setBufferSize(bufferSize); } + @Override + public final long length() { + return length; + } + + @Override + public BaseSearchableSnapshotIndexInput clone() { + final BaseSearchableSnapshotIndexInput clone = (BaseSearchableSnapshotIndexInput) super.clone(); + clone.closed = new AtomicBoolean(false); + clone.isClone = true; + return clone; + } + + protected void ensureOpen() throws IOException { + if (closed.get()) { + throw new IOException(toString() + " is closed"); + } + } + + @Override + public final void close() throws IOException { + if (closed.compareAndSet(false, true)) { + if (isClone == false) { + stats.incrementCloseCount(); + } + innerClose(); + } + } + + public abstract void innerClose() throws IOException; + protected InputStream openInputStream(final long position, final long length) throws IOException { assert assertCurrentThreadMayAccessBlobStore(); final long startPart = getPartNumberForPosition(position); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java index ec13d25ae64fc..6fa1604e98d2a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.function.LongConsumer; +import java.util.function.LongSupplier; /** * {@link IndexInputStats} records stats for a given {@link CachedBlobContainerIndexInput}. @@ -24,9 +25,9 @@ public class IndexInputStats { private final long fileLength; private final long seekingThreshold; + private final LongSupplier currentTimeNanos; private final LongAdder opened = new LongAdder(); - private final LongAdder inner = new LongAdder(); private final LongAdder closed = new LongAdder(); private final Counter forwardSmallSeeks = new Counter(); @@ -39,25 +40,30 @@ public class IndexInputStats { private final Counter nonContiguousReads = new Counter(); private final TimedCounter directBytesRead = new TimedCounter(); + private final TimedCounter optimizedBytesRead = new TimedCounter(); private final Counter cachedBytesRead = new Counter(); private final TimedCounter cachedBytesWritten = new TimedCounter(); - public IndexInputStats(long fileLength) { - this(fileLength, SEEKING_THRESHOLD.getBytes()); + public IndexInputStats(long fileLength, LongSupplier currentTimeNanos) { + this(fileLength, SEEKING_THRESHOLD.getBytes(), currentTimeNanos); } - public IndexInputStats(long fileLength, long seekingThreshold) { + public IndexInputStats(long fileLength, long seekingThreshold, LongSupplier currentTimeNanos) { this.fileLength = fileLength; this.seekingThreshold = seekingThreshold; + this.currentTimeNanos = currentTimeNanos; } - public void incrementOpenCount() { - opened.increment(); + /** + * @return the current time in nanoseconds that should be used to measure statistics. + */ + public long currentTimeNanos() { + return currentTimeNanos.getAsLong(); } - public void incrementInnerOpenCount() { - inner.increment(); + public void incrementOpenCount() { + opened.increment(); } public void incrementCloseCount() { @@ -76,6 +82,10 @@ public void addDirectBytesRead(int bytesRead, long nanoseconds) { directBytesRead.add(bytesRead, nanoseconds); } + public void addOptimizedBytesRead(int bytesRead, long nanoseconds) { + optimizedBytesRead.add(bytesRead, nanoseconds); + } + public void incrementBytesRead(long previousPosition, long currentPosition, int bytesRead) { LongConsumer incBytesRead = (previousPosition == currentPosition) ? contiguousReads::add : nonContiguousReads::add; incBytesRead.accept(bytesRead); @@ -110,10 +120,6 @@ public LongAdder getOpened() { return opened; } - public LongAdder getInnerOpened() { - return inner; - } - public LongAdder getClosed() { return closed; } @@ -146,6 +152,10 @@ public TimedCounter getDirectBytesRead() { return directBytesRead; } + public TimedCounter getOptimizedBytesRead() { + return optimizedBytesRead; + } + public Counter getCachedBytesRead() { return cachedBytesRead; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index a7580555dd2cb..196f8e412cbef 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -7,7 +7,6 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.BaseDirectory; -import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -49,8 +48,9 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; +import static org.apache.lucene.store.BufferedIndexInput.bufferSize; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; @@ -144,10 +144,6 @@ public IndexInputStats getStats(String fileName) { return stats.get(fileName); } - public long statsCurrentTimeNanos() { - return statsCurrentTimeNanosSupplier.getAsLong(); - } - private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws FileNotFoundException { return snapshot().indexFiles() .stream() @@ -227,7 +223,7 @@ public void clearCache() { } protected IndexInputStats createIndexInputStats(final long fileLength) { - return new IndexInputStats(fileLength); + return new IndexInputStats(fileLength, statsCurrentTimeNanosSupplier); } public CacheKey createCacheKey(String fileName) { @@ -253,7 +249,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats); } else { return new DirectBlobContainerIndexInput( - blobContainer(), fileInfo, context, getUncachedChunkSize(), BufferedIndexInput.BUFFER_SIZE); + blobContainer(), fileInfo, context, inputStats, getUncachedChunkSize(), bufferSize(context)); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 7be740728b4ec..8b0e33c7d5e54 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -18,8 +18,8 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; -import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.store.IndexInputStats; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; import java.io.EOFException; import java.io.IOException; @@ -27,7 +27,6 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Locale; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput { @@ -36,14 +35,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private static final int COPY_BUFFER_SIZE = 8192; private final SearchableSnapshotDirectory directory; - private final long offset; - private final long end; private final CacheFileReference cacheFileReference; - private final IndexInputStats stats; - - // the following are only mutable so they can be adjusted after cloning - private AtomicBoolean closed; - private boolean isClone; // last read position is kept around in order to detect (non)contiguous reads for stats private long lastReadPosition; @@ -57,37 +49,23 @@ public CachedBlobContainerIndexInput( IndexInputStats stats ) { this("CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")", directory, fileInfo, context, stats, 0L, fileInfo.length(), - false, new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length())); + new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length())); stats.incrementOpenCount(); } private CachedBlobContainerIndexInput(String resourceDesc, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, - IndexInputStats stats, long offset, long length, boolean isClone, - CacheFileReference cacheFileReference) { - super(resourceDesc, directory.blobContainer(), fileInfo, context); + IndexInputStats stats, long offset, long length, CacheFileReference cacheFileReference) { + super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); this.directory = directory; - this.offset = offset; - this.stats = stats; - this.end = offset + length; - this.closed = new AtomicBoolean(false); - this.isClone = isClone; this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; } @Override - public long length() { - return end - offset; - } - - @Override - public void close() { - if (closed.compareAndSet(false, true)) { - if (isClone == false) { - stats.incrementCloseCount(); - cacheFileReference.releaseOnClose(); - } + public void innerClose() { + if (isClone == false) { + cacheFileReference.releaseOnClose(); } } @@ -151,9 +129,8 @@ private void writeCacheFile(FileChannel fc, long start, long end) throws IOExcep logger.trace(() -> new ParameterizedMessage("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference)); int bytesCopied = 0; - final long startTimeNanos = directory.statsCurrentTimeNanos(); + final long startTimeNanos = stats.currentTimeNanos(); try (InputStream input = openInputStream(start, length)) { - stats.incrementInnerOpenCount(); long remaining = end - start; while (remaining > 0) { final int len = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length; @@ -166,7 +143,7 @@ private void writeCacheFile(FileChannel fc, long start, long end) throws IOExcep bytesCopied += bytesRead; remaining -= bytesRead; } - final long endTimeNanos = directory.statsCurrentTimeNanos(); + final long endTimeNanos = stats.currentTimeNanos(); stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos); } } @@ -185,20 +162,19 @@ protected void seekInternal(long pos) throws IOException { @Override public CachedBlobContainerIndexInput clone() { - final CachedBlobContainerIndexInput clone = (CachedBlobContainerIndexInput) super.clone(); - clone.closed = new AtomicBoolean(false); - clone.isClone = true; - return clone; + return (CachedBlobContainerIndexInput) super.clone(); } @Override public IndexInput slice(String sliceDescription, long offset, long length) { - if (offset < 0 || length < 0 || offset + length > this.length()) { + if (offset < 0 || length < 0 || offset + length > length()) { throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset - + ",length=" + length + ",fileLength=" + this.length() + ": " + this); + + ",length=" + length + ",fileLength=" + length() + ": " + this); } - return new CachedBlobContainerIndexInput(getFullSliceDescription(sliceDescription), directory, fileInfo, context, stats, - this.offset + offset, length, true, cacheFileReference); + final CachedBlobContainerIndexInput slice = new CachedBlobContainerIndexInput(getFullSliceDescription(sliceDescription), directory, + fileInfo, context, stats, this.offset + offset, length, cacheFileReference); + slice.isClone = true; + return slice; } @Override @@ -206,7 +182,6 @@ public String toString() { return "CachedBlobContainerIndexInput{" + "cacheFileReference=" + cacheFileReference + ", offset=" + offset + - ", end=" + end + ", length=" + length() + ", position=" + getFilePointer() + '}'; @@ -219,9 +194,8 @@ private int readDirectly(long start, long end, byte[] buffer, int offset) throws new ParameterizedMessage("direct reading of range [{}-{}] for cache file [{}]", start, end, cacheFileReference)); int bytesCopied = 0; - final long startTimeNanos = directory.statsCurrentTimeNanos(); + final long startTimeNanos = stats.currentTimeNanos(); try (InputStream input = openInputStream(start, length)) { - stats.incrementInnerOpenCount(); long remaining = end - start; while (remaining > 0) { final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length; @@ -234,7 +208,7 @@ private int readDirectly(long start, long end, byte[] buffer, int offset) throws bytesCopied += bytesRead; remaining -= bytesRead; } - final long endTimeNanos = directory.statsCurrentTimeNanos(); + final long endTimeNanos = stats.currentTimeNanos(); stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos); } return bytesCopied; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index 0d1f077a34bbc..d705561fabaf3 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -5,21 +5,24 @@ */ package org.elasticsearch.index.store.direct; -import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; +import org.elasticsearch.index.store.IndexInputStats; import java.io.Closeable; import java.io.EOFException; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Objects; +import java.util.concurrent.atomic.LongAdder; /** * A {@link DirectBlobContainerIndexInput} instance corresponds to a single file from a Lucene directory that has been snapshotted. Because @@ -47,45 +50,44 @@ */ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput { - private final long offset; - private final long length; - private long position; - private volatile boolean closed; @Nullable // if not currently reading sequentially private StreamForSequentialReads streamForSequentialReads; private long sequentialReadSize; private static final long NO_SEQUENTIAL_READ_OPTIMIZATION = 0L; - public DirectBlobContainerIndexInput(BlobContainer blobContainer, FileInfo fileInfo, IOContext context, - long sequentialReadSize, int bufferSize) { - this("DirectBlobContainerIndexInput(" + fileInfo.physicalName() + ")", blobContainer, fileInfo, context, 0L, 0L, fileInfo.length(), - sequentialReadSize, bufferSize); + public DirectBlobContainerIndexInput( + BlobContainer blobContainer, + FileInfo fileInfo, + IOContext context, + IndexInputStats stats, + long sequentialReadSize, + int bufferSize + ) { + this("DirectBlobContainerIndexInput(" + fileInfo.physicalName() + ")", blobContainer, fileInfo, context, stats, 0L, 0L, + fileInfo.length(), sequentialReadSize, bufferSize); + stats.incrementOpenCount(); } - private DirectBlobContainerIndexInput(String resourceDesc, BlobContainer blobContainer, FileInfo fileInfo, IOContext context, - final long position, final long offset, final long length, final long sequentialReadSize, - final int bufferSize) { - super(resourceDesc, blobContainer, fileInfo, context, bufferSize); - this.offset = offset; - this.length = length; + private DirectBlobContainerIndexInput( + String resourceDesc, + BlobContainer blobContainer, + FileInfo fileInfo, + IOContext context, + IndexInputStats stats, + long position, + long offset, + long length, + long sequentialReadSize, + int bufferSize + ) { + super(resourceDesc, blobContainer, fileInfo, context, stats, offset, length, bufferSize); this.position = position; assert sequentialReadSize >= 0; this.sequentialReadSize = sequentialReadSize; - this.closed = false; } - @Override - public long length() { - return length; - } - - private void ensureOpen() throws IOException { - if (closed) { - throw new IOException(toString() + " is closed"); - } - } @Override protected void readInternal(byte[] b, int offset, int length) throws IOException { @@ -118,12 +120,15 @@ private void readInternalBytes(final int part, long pos, final byte[] b, int off if (optimizedReadSize < length) { // we did not read everything in an optimized fashion, so read the remainder directly + final long startTimeNanos = stats.currentTimeNanos(); try (InputStream inputStream = openBlobStream(part, pos + optimizedReadSize, length - optimizedReadSize)) { final int directReadSize = readFully(inputStream, b, offset + optimizedReadSize, length - optimizedReadSize, () -> { throw new EOFException("Read past EOF at [" + position + "] with length [" + fileInfo.partBytes(part) + "]"); }); assert optimizedReadSize + directReadSize == length : optimizedReadSize + " and " + directReadSize + " vs " + length; position += directReadSize; + final long endTimeNanos = stats.currentTimeNanos(); + stats.addDirectBytesRead(directReadSize, endTimeNanos - startTimeNanos); } } } @@ -193,7 +198,37 @@ private int readFromNewSequentialStream(int part, long pos, byte[] b, int offset // if we open a stream of length streamLength then it will not be completely consumed by this read, so it is worthwhile to open // it and keep it open for future reads final InputStream inputStream = openBlobStream(part, pos, streamLength); - streamForSequentialReads = new StreamForSequentialReads(inputStream, part, pos, streamLength); + streamForSequentialReads = new StreamForSequentialReads(new FilterInputStream(inputStream) { + private LongAdder bytesRead = new LongAdder(); + private LongAdder timeNanos = new LongAdder(); + + private int onOptimizedRead(CheckedSupplier read) throws IOException { + final long startTimeNanos = stats.currentTimeNanos(); + final int result = read.get(); + final long endTimeNanos = stats.currentTimeNanos(); + if (result != -1) { + bytesRead.add(result); + timeNanos.add(endTimeNanos - startTimeNanos); + } + return result; + } + + @Override + public int read() throws IOException { + return onOptimizedRead(super::read); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return onOptimizedRead(() -> super.read(b, off, len)); + } + + @Override + public void close() throws IOException { + super.close(); + stats.addOptimizedBytesRead(Math.toIntExact(bytesRead.sumThenReset()), timeNanos.sumThenReset()); + } + }, part, pos, streamLength); final int read = streamForSequentialReads.read(b, offset, length); assert read == length : read + " vs " + length; @@ -203,8 +238,8 @@ private int readFromNewSequentialStream(int part, long pos, byte[] b, int offset @Override protected void seekInternal(long pos) throws IOException { - if (pos > length) { - throw new EOFException("Reading past end of file [position=" + pos + ", length=" + length + "] for " + toString()); + if (pos > length()) { + throw new EOFException("Reading past end of file [position=" + pos + ", length=" + length() + "] for " + toString()); } else if (pos < 0L) { throw new IOException("Seeking to negative position [" + pos + "] for " + toString()); } @@ -215,23 +250,27 @@ protected void seekInternal(long pos) throws IOException { } @Override - public BufferedIndexInput clone() { - return new DirectBlobContainerIndexInput("clone(" + this + ")", blobContainer, fileInfo, context, position, offset, length, + public DirectBlobContainerIndexInput clone() { + final DirectBlobContainerIndexInput clone = new DirectBlobContainerIndexInput("clone(" + this + ")", blobContainer, fileInfo, + context, stats, position, offset, length, // Clones might not be closed when they are no longer needed, but we must always close streamForSequentialReads. The simple // solution: do not optimize sequential reads on clones. NO_SEQUENTIAL_READ_OPTIMIZATION, getBufferSize()); + clone.isClone = true; + return clone; } @Override public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) { final DirectBlobContainerIndexInput slice = new DirectBlobContainerIndexInput(sliceDescription, blobContainer, fileInfo, - context, position, this.offset + offset, length, + context, stats, position, this.offset + offset, length, // Slices might not be closed when they are no longer needed, but we must always close streamForSequentialReads. The simple // solution: do not optimize sequential reads on slices. NO_SEQUENTIAL_READ_OPTIMIZATION, getBufferSize()); + slice.isClone = true; slice.seek(0L); return slice; } else { @@ -241,8 +280,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw } @Override - public void close() throws IOException { - closed = true; + public void innerClose() throws IOException { closeStreamForSequentialReads(); } @@ -252,7 +290,7 @@ public String toString() { "resourceDesc=" + super.toString() + ", fileInfo=" + fileInfo + ", offset=" + offset + - ", length=" + length + + ", length=" + length() + ", position=" + position + '}'; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java index bca015716271b..74c8cbe203d2f 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java @@ -34,7 +34,6 @@ import java.util.List; import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_DIRECTORY_FACTORY_KEY; public abstract class AbstractTransportSearchableSnapshotsAction @@ -76,9 +75,7 @@ protected ShardsIterator shards(ClusterState state, Request request, String[] co if (indexMetaData != null) { Settings indexSettings = indexMetaData.getSettings(); if (INDEX_STORE_TYPE_SETTING.get(indexSettings).equals(SNAPSHOT_DIRECTORY_FACTORY_KEY)) { - if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings)) { - searchableSnapshotIndices.add(concreteIndex); - } + searchableSnapshotIndices.add(concreteIndex); } } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java index 4d91dacd4a73b..f2b649f641801 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java @@ -69,12 +69,12 @@ protected SearchableSnapshotShardStats executeShardOperation(SearchableSnapshots private static CacheIndexInputStats toCacheIndexInputStats(final String fileName, final IndexInputStats inputStats) { return new CacheIndexInputStats(fileName, inputStats.getFileLength(), - inputStats.getOpened().sum(), inputStats.getInnerOpened().sum(), inputStats.getClosed().sum(), + inputStats.getOpened().sum(), inputStats.getClosed().sum(), toCounter(inputStats.getForwardSmallSeeks()), toCounter(inputStats.getBackwardSmallSeeks()), toCounter(inputStats.getForwardLargeSeeks()), toCounter(inputStats.getBackwardLargeSeeks()), toCounter(inputStats.getContiguousReads()), toCounter(inputStats.getNonContiguousReads()), toCounter(inputStats.getCachedBytesRead()), toTimedCounter(inputStats.getCachedBytesWritten()), - toTimedCounter(inputStats.getDirectBytesRead())); + toTimedCounter(inputStats.getDirectBytesRead()), toTimedCounter(inputStats.getOptimizedBytesRead())); } private static Counter toCounter(final IndexInputStats.Counter counter) { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/IndexInputStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/IndexInputStatsTests.java index a93410e36268f..25eb6eb9f7dae 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/IndexInputStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/IndexInputStatsTests.java @@ -7,14 +7,21 @@ import org.elasticsearch.test.ESTestCase; +import java.util.function.LongSupplier; + import static org.elasticsearch.index.store.IndexInputStats.SEEKING_THRESHOLD; import static org.elasticsearch.index.store.cache.TestUtils.assertCounter; public class IndexInputStatsTests extends ESTestCase { + private static LongSupplier FAKE_CLOCK = () -> { + assert false : "should not be called"; + return -1L; + }; + public void testReads() { final long fileLength = randomLongBetween(1L, 1_000L); - final IndexInputStats inputStats = new IndexInputStats(fileLength); + final IndexInputStats inputStats = new IndexInputStats(fileLength, FAKE_CLOCK); assertCounter(inputStats.getContiguousReads(), 0L, 0L, 0L, 0L); assertCounter(inputStats.getNonContiguousReads(), 0L, 0L, 0L, 0L); @@ -45,7 +52,7 @@ public void testReads() { public void testSeeks() { final long fileLength = randomLongBetween(1L, 1_000L); final long seekingThreshold = randomBoolean() ? randomLongBetween(1L, fileLength) : SEEKING_THRESHOLD.getBytes(); - final IndexInputStats inputStats = new IndexInputStats(fileLength, seekingThreshold); + final IndexInputStats inputStats = new IndexInputStats(fileLength, seekingThreshold, FAKE_CLOCK); assertCounter(inputStats.getForwardSmallSeeks(), 0L, 0L, 0L, 0L); assertCounter(inputStats.getForwardLargeSeeks(), 0L, 0L, 0L, 0L); @@ -84,7 +91,7 @@ public void testSeeks() { } public void testSeekToSamePosition() { - final IndexInputStats inputStats = new IndexInputStats(randomLongBetween(1L, 1_000L)); + final IndexInputStats inputStats = new IndexInputStats(randomLongBetween(1L, 1_000L), FAKE_CLOCK); final long position = randomLongBetween(0L, inputStats.getFileLength()); inputStats.incrementSeeks(position, position); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java similarity index 66% rename from x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputStatsTests.java rename to x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index 75e0de8da6d29..47371fea5b0ec 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.index.store.cache; +package org.elasticsearch.index.store; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IOContext; @@ -19,9 +19,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; -import org.elasticsearch.index.store.IndexInputStats; -import org.elasticsearch.index.store.SearchableSnapshotDirectory; -import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.index.store.cache.TestUtils; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; @@ -30,12 +28,13 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; import static org.elasticsearch.index.store.cache.TestUtils.assertCounter; import static org.elasticsearch.index.store.cache.TestUtils.createCacheService; -import static org.elasticsearch.index.store.cache.TestUtils.randomCacheRangeSize; import static org.elasticsearch.index.store.cache.TestUtils.singleBlobContainer; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -43,7 +42,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -public class CachedBlobContainerIndexInputStatsTests extends ESIndexInputTestCase { +public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase { private static final int MAX_FILE_LENGTH = 10_000; @@ -52,16 +51,16 @@ public class CachedBlobContainerIndexInputStatsTests extends ESIndexInputTestCas */ private static final long FAKE_CLOCK_ADVANCE_NANOS = TimeValue.timeValueMillis(100).nanos(); - public void testOpenCount() throws Exception { - executeTestCase(createCacheService(random()), - (fileName, fileContent, cacheDirectory) -> { + public void testOpenCount() { + executeTestCase( + (fileName, fileContent, directory) -> { try { for (long i = 0L; i < randomLongBetween(1L, 20L); i++) { - IndexInputStats inputStats = cacheDirectory.getStats(fileName); + IndexInputStats inputStats = directory.getStats(fileName); assertThat(inputStats, (i == 0L) ? nullValue() : notNullValue()); - final IndexInput input = cacheDirectory.openInput(fileName, newIOContext(random())); - inputStats = cacheDirectory.getStats(fileName); + final IndexInput input = directory.openInput(fileName, newIOContext(random())); + inputStats = directory.getStats(fileName); assertThat(inputStats.getOpened().longValue(), equalTo(i + 1L)); input.close(); } @@ -71,38 +70,14 @@ public void testOpenCount() throws Exception { }); } - public void testInnerOpenCount() throws Exception { - final ByteSizeValue rangeSize = randomCacheRangeSize(random()); - final CacheService noEvictionCacheService = new CacheService(new ByteSizeValue(1, ByteSizeUnit.GB), rangeSize); - - executeTestCase(noEvictionCacheService, - (fileName, fileContent, cacheDirectory) -> { - try { - assertThat( cacheDirectory.getStats(fileName), nullValue()); - - final IndexInput input = cacheDirectory.openInput(fileName, newIOContext(random())); - for (int j = 0; j < input.length(); j++) { - input.readByte(); - } - input.close(); - - final IndexInputStats inputStats = cacheDirectory.getStats(fileName); - assertThat("Inner IndexInput should have been opened for each cached range to write", - inputStats.getInnerOpened().longValue(), equalTo(TestUtils.numberOfRanges(input.length(), rangeSize.getBytes()))); - } catch (IOException e) { - throw new AssertionError(e); - } - }); - } - - public void testCloseCount() throws Exception { - executeTestCase(createCacheService(random()), - (fileName, fileContent, cacheDirectory) -> { + public void testCloseCount() { + executeTestCase( + (fileName, fileContent, directory) -> { try { for (long i = 0L; i < randomLongBetween(1L, 20L); i++) { - final IndexInput input = cacheDirectory.openInput(fileName, newIOContext(random())); + final IndexInput input = directory.openInput(fileName, newIOContext(random())); - IndexInputStats inputStats = cacheDirectory.getStats(fileName); + IndexInputStats inputStats = directory.getStats(fileName); assertThat(inputStats, notNullValue()); assertThat(inputStats.getClosed().longValue(), equalTo(i)); @@ -115,19 +90,21 @@ public void testCloseCount() throws Exception { }); } - public void testCachedBytesReadsAndWrites() throws Exception { + public void testCachedBytesReadsAndWrites() { // a cache service with a low range size but enough space to not evict the cache file final ByteSizeValue rangeSize = new ByteSizeValue(randomIntBetween(512, MAX_FILE_LENGTH), ByteSizeUnit.BYTES); - final CacheService cacheService = new CacheService(new ByteSizeValue(1, ByteSizeUnit.GB), rangeSize); + final ByteSizeValue cacheSize = new ByteSizeValue(1, ByteSizeUnit.GB); - executeTestCase(cacheService, (fileName, fileContent, cacheDirectory) -> { - try (IndexInput input = cacheDirectory.openInput(fileName, newIOContext(random()))) { + executeTestCaseWithCache(cacheSize, rangeSize, (fileName, fileContent, directory) -> { + try (IndexInput input = directory.openInput(fileName, newIOContext(random()))) { final long length = input.length(); - IndexInputStats inputStats = cacheDirectory.getStats(fileName); + final IndexInputStats inputStats = directory.getStats(fileName); assertThat(inputStats, notNullValue()); - randomReadAndSlice(input, Math.toIntExact(length)); + final byte[] result = randomReadAndSlice(input, Math.toIntExact(length)); + assertArrayEquals(fileContent, result); + final long cachedBytesWriteCount = TestUtils.numberOfRanges(length, rangeSize.getBytes()); assertThat(inputStats.getCachedBytesWritten(), notNullValue()); @@ -149,30 +126,62 @@ public void testCachedBytesReadsAndWrites() throws Exception { assertCounter(inputStats.getDirectBytesRead(), 0L, 0L, 0L, 0L); assertThat(inputStats.getDirectBytesRead().totalNanoseconds(), equalTo(0L)); + assertCounter(inputStats.getOptimizedBytesRead(), 0L, 0L, 0L, 0L); + assertThat(inputStats.getOptimizedBytesRead().totalNanoseconds(), equalTo(0L)); + } catch (IOException e) { throw new AssertionError(e); } }); } - public void testDirectBytesReads() throws Exception { - final CacheService noDiskSpaceLeftCacheService - = new CacheService(new ByteSizeValue(0, ByteSizeUnit.BYTES), new ByteSizeValue(0, ByteSizeUnit.BYTES)); + public void testCachedBytesReadsAndWritesNoCache() { + final ByteSizeValue uncachedChunkSize = new ByteSizeValue(randomIntBetween(512, MAX_FILE_LENGTH), ByteSizeUnit.BYTES); + executeTestCaseWithoutCache(uncachedChunkSize, (fileName, fileContent, directory) -> { + try (IndexInput input = directory.openInput(fileName, newIOContext(random()))) { + final long length = input.length(); - executeTestCase(noDiskSpaceLeftCacheService, (fileName, fileContent, cacheDirectory) -> { - assertThat(cacheDirectory.getStats(fileName), nullValue()); - final IOContext ioContext = newIOContext(random()); + final IndexInputStats inputStats = directory.getStats(fileName); + assertThat(inputStats, notNullValue()); - try (IndexInput input = cacheDirectory.openInput(fileName, ioContext)) { - final IndexInputStats inputStats = cacheDirectory.getStats(fileName); + final byte[] result = randomReadAndSlice(input, Math.toIntExact(length)); + assertArrayEquals(fileContent, result); + + assertThat(inputStats.getCachedBytesWritten(), notNullValue()); + assertCounter(inputStats.getCachedBytesWritten(), 0L, 0L, 0L, 0L); + + assertThat(inputStats.getCachedBytesRead(), notNullValue()); + assertCounter(inputStats.getCachedBytesRead(), 0L, 0L, 0L, 0L); + + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + + public void testDirectBytesReadsWithCache() { + // Cache service always evicts files + executeTestCaseWithCache(ByteSizeValue.ZERO, ByteSizeValue.ZERO, (fileName, fileContent, directory) -> { + assertThat(directory.getStats(fileName), nullValue()); + + final IOContext ioContext = newIOContext(random()); + try { + IndexInput input = directory.openInput(fileName, ioContext); + if (randomBoolean()) { + input = input.slice("test", 0L, input.length()); + } + if (randomBoolean()) { + input = input.clone(); + } + final IndexInputStats inputStats = directory.getStats(fileName); // account for internal buffered reads - final long bufferSize = (long) BufferedIndexInput.bufferSize(ioContext); + final long bufferSize = BufferedIndexInput.bufferSize(ioContext); final long remaining = input.length() % bufferSize; final long expectedTotal = input.length(); final long expectedCount = input.length() / bufferSize + (remaining > 0L ? 1L : 0L); final long minRead = remaining > 0L ? remaining : bufferSize; - final long maxRead = input.length() < bufferSize ? input.length() : bufferSize; + final long maxRead = Math.min(input.length(), bufferSize); // read all index input sequentially as it simplifies testing final byte[] readBuffer = new byte[512]; @@ -196,17 +205,88 @@ public void testDirectBytesReads() throws Exception { assertCounter(inputStats.getCachedBytesRead(), 0L, 0L, 0L, 0L); assertThat(inputStats.getCachedBytesWritten().totalNanoseconds(), equalTo(0L)); + input.close(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + + public void testDirectBytesReadsWithoutCache() { + final ByteSizeValue uncachedChunkSize = new ByteSizeValue(randomIntBetween(512, MAX_FILE_LENGTH), ByteSizeUnit.BYTES); + executeTestCaseWithoutCache(uncachedChunkSize, (fileName, fileContent, directory) -> { + assertThat(directory.getStats(fileName), nullValue()); + + final IOContext ioContext = newIOContext(random()); + try (IndexInput original = directory.openInput(fileName, ioContext)) { + final IndexInput input = original.clone(); // always clone to only execute direct reads + final IndexInputStats inputStats = directory.getStats(fileName); + + // account for internal buffered reads + final long bufferSize = BufferedIndexInput.bufferSize(ioContext); + final long remaining = input.length() % bufferSize; + final long expectedTotal = input.length(); + final long expectedCount = input.length() / bufferSize + (remaining > 0L ? 1L : 0L); + final long minRead = remaining > 0L ? remaining : bufferSize; + final long maxRead = Math.min(input.length(), bufferSize); + + // read all index input sequentially as it simplifies testing + for (long i = 0L; i < input.length(); i++) { + input.readByte(); + } + + assertCounter(inputStats.getDirectBytesRead(), expectedTotal, expectedCount, minRead, maxRead); + assertThat(inputStats.getDirectBytesRead().totalNanoseconds(), equalTo(expectedCount * FAKE_CLOCK_ADVANCE_NANOS)); + + // cache file has never been written nor read + assertCounter(inputStats.getCachedBytesWritten(), 0L, 0L, 0L, 0L); + assertCounter(inputStats.getCachedBytesRead(), 0L, 0L, 0L, 0L); + assertThat(inputStats.getCachedBytesWritten().totalNanoseconds(), equalTo(0L)); } catch (IOException e) { throw new AssertionError(e); } }); } - public void testReadBytesContiguously() throws Exception { - // use default cache service settings - final CacheService cacheService = new CacheService(Settings.EMPTY); + public void testOptimizedBytesReads() { + // use a large uncached chunk size that allows to read the file in a single operation + final ByteSizeValue uncachedChunkSize = new ByteSizeValue(1, ByteSizeUnit.GB); + executeTestCaseWithoutCache(uncachedChunkSize, (fileName, fileContent, directory) -> { + final IOContext context = newIOContext(random()); + try (IndexInput input = directory.openInput(fileName, context)) { + final IndexInputStats inputStats = directory.getStats(fileName); + assertThat(inputStats, notNullValue()); + + // read all index input sequentially as it simplifies testing + for (long i = 0L; i < input.length(); i++) { + input.readByte(); + } + + // account for internal buffered reads + final long bufferSize = BufferedIndexInput.bufferSize(context); + if (input.length() <= bufferSize) { + // file is read in a single non-optimized read operation + assertCounter(inputStats.getDirectBytesRead(), input.length(), 1L, input.length(), input.length()); + assertThat(inputStats.getDirectBytesRead().totalNanoseconds(), equalTo(FAKE_CLOCK_ADVANCE_NANOS)); + assertCounter(inputStats.getOptimizedBytesRead(), 0L, 0L, 0L, 0L); + } else { + final long remaining = input.length() % bufferSize; + final long expectedClockCounts = input.length() / bufferSize + (remaining > 0L ? 1L : 0L); + + // file is read in a single optimized read operation + IndexInputStats.TimedCounter optimizedBytesRead = inputStats.getOptimizedBytesRead(); + assertCounter(optimizedBytesRead, input.length(), 1L, input.length(), input.length()); + assertThat(optimizedBytesRead.totalNanoseconds(), equalTo(expectedClockCounts * FAKE_CLOCK_ADVANCE_NANOS)); + assertCounter(inputStats.getDirectBytesRead(), 0L, 0L, 0L, 0L); + } + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } - executeTestCase(cacheService, (fileName, fileContent, cacheDirectory) -> { + public void testReadBytesContiguously() { + executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { final IOContext ioContext = newIOContext(random()); try (IndexInput input = cacheDirectory.openInput(fileName, ioContext)) { @@ -254,11 +334,8 @@ public void testReadBytesContiguously() throws Exception { }); } - public void testReadBytesNonContiguously() throws Exception { - // use default cache service settings - final CacheService cacheService = new CacheService(Settings.EMPTY); - - executeTestCase(cacheService, (fileName, fileContent, cacheDirectory) -> { + public void testReadBytesNonContiguously() { + executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { final IOContext ioContext = newIOContext(random()); try (IndexInput input = cacheDirectory.openInput(fileName, ioContext)) { @@ -302,11 +379,8 @@ public void testReadBytesNonContiguously() throws Exception { }); } - public void testForwardSeeks() throws Exception { - // use default cache service settings - final CacheService cacheService = new CacheService(Settings.EMPTY); - - executeTestCase(cacheService, (fileName, fileContent, cacheDirectory) -> { + public void testForwardSeeks() { + executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { final IOContext ioContext = newIOContext(random()); try (IndexInput indexInput = cacheDirectory.openInput(fileName, ioContext)) { IndexInput input = indexInput; @@ -363,11 +437,8 @@ public void testForwardSeeks() throws Exception { }); } - public void testBackwardSeeks() throws Exception { - // use default cache service settings - final CacheService cacheService = new CacheService(Settings.EMPTY); - - executeTestCase(cacheService, (fileName, fileContent, cacheDirectory) -> { + public void testBackwardSeeks() { + executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { final IOContext ioContext = newIOContext(random()); try (IndexInput indexInput = cacheDirectory.openInput(fileName, ioContext)) { IndexInput input = indexInput; @@ -428,17 +499,50 @@ public void testBackwardSeeks() throws Exception { }); } + private static void executeTestCase(final TriConsumer test) { + executeTestCase(createCacheService(random()), + Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), randomBoolean()).build(), test); + } + + private static void executeTestCaseWithoutCache( + final ByteSizeValue uncachedChunkSize, + final TriConsumer test + ) { + executeTestCase(createCacheService(random()), + Settings.builder() + .put(SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.getKey(), uncachedChunkSize) + .put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), false).build(), test); + } + + private static void executeTestCaseWithDefaultCache(final TriConsumer test) { + executeTestCaseWithCache( + CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getDefault(Settings.EMPTY), + CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getDefault(Settings.EMPTY), test); + } + + private static void executeTestCaseWithCache( + final ByteSizeValue cacheSize, + final ByteSizeValue cacheRangeSize, + final TriConsumer test + ) { + executeTestCase(new CacheService(cacheSize, cacheRangeSize), + Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(), test); + } + private static void executeTestCase( final CacheService cacheService, + final Settings indexSettings, final TriConsumer test ) { final byte[] fileContent = randomUnicodeOfLength(randomIntBetween(10, MAX_FILE_LENGTH)).getBytes(StandardCharsets.UTF_8); - final String fileName = randomAlphaOfLength(10); + final String fileExtension = randomAlphaOfLength(3); + final String fileName = randomAlphaOfLength(10) + '.' + fileExtension; final SnapshotId snapshotId = new SnapshotId("_name", "_uuid"); final IndexId indexId = new IndexId("_name", "_uuid"); final ShardId shardId = new ShardId("_name", "_uuid", 0); final AtomicLong fakeClock = new AtomicLong(); + final LongSupplier statsCurrentTimeNanos = () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS); final Long seekingThreshold = randomBoolean() ? randomLongBetween(1L, fileContent.length) : null; @@ -447,18 +551,17 @@ private static void executeTestCase( final StoreFileMetaData metaData = new StoreFileMetaData(fileName, fileContent.length, "_checksum", Version.CURRENT.luceneVersion); final List files = List.of(new FileInfo(blobName, metaData, new ByteSizeValue(fileContent.length))); final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L, files, 0L, 0L, 0, 0L); - final Settings indexSettings = Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(); try (CacheService ignored = cacheService; SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(() -> blobContainer, () -> snapshot, snapshotId, indexId, shardId, indexSettings, - () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS), cacheService, createTempDir()) { + statsCurrentTimeNanos, cacheService, createTempDir()) { @Override protected IndexInputStats createIndexInputStats(long fileLength) { if (seekingThreshold == null) { return super.createIndexInputStats(fileLength); } - return new IndexInputStats(fileLength, seekingThreshold); + return new IndexInputStats(fileLength, seekingThreshold, statsCurrentTimeNanos); } } ) { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index c476137a8984a..6b02d0a0e0a2c 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -32,18 +32,18 @@ public final class TestUtils { private TestUtils() { } - static CacheService createCacheService(final Random random) { + public static CacheService createCacheService(final Random random) { final ByteSizeValue cacheSize = new ByteSizeValue(randomIntBetween(random, 1, 100), randomFrom(random, List.of(ByteSizeUnit.BYTES, ByteSizeUnit.KB, ByteSizeUnit.MB, ByteSizeUnit.GB))); return new CacheService(cacheSize, randomCacheRangeSize(random)); } - static ByteSizeValue randomCacheRangeSize(final Random random) { + public static ByteSizeValue randomCacheRangeSize(final Random random) { return new ByteSizeValue(randomIntBetween(random, 1, 100), randomFrom(random, List.of(ByteSizeUnit.BYTES, ByteSizeUnit.KB, ByteSizeUnit.MB))); } - static long numberOfRanges(long fileSize, long rangeSize) { + public static long numberOfRanges(long fileSize, long rangeSize) { return numberOfRanges(Math.toIntExact(fileSize), Math.toIntExact(rangeSize)); } @@ -65,11 +65,23 @@ public static void assertCounter(IndexInputStats.Counter counter, long total, lo assertThat(counter.max(), equalTo(max)); } + public static void assertCounter( + IndexInputStats.TimedCounter timedCounter, + long total, + long count, + long min, + long max, + long totalNanoseconds + ) { + assertCounter(timedCounter, total, count, min, max); + assertThat(timedCounter.totalNanoseconds(), equalTo(totalNanoseconds)); + } + /** * A {@link BlobContainer} that can read a single in-memory blob. * Any attempt to read a different blob will throw a {@link FileNotFoundException} */ - static BlobContainer singleBlobContainer(final String blobName, final byte[] blobContent) { + public static BlobContainer singleBlobContainer(final String blobName, final byte[] blobContent) { return new BlobContainer() { @Override diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java index 7b829cf04b5f7..efe90c9bcde26 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import org.elasticsearch.index.store.IndexInputStats; import org.elasticsearch.index.store.StoreFileMetaData; import java.io.ByteArrayInputStream; @@ -46,7 +47,7 @@ private DirectBlobContainerIndexInput createIndexInput(final byte[] input) throw private DirectBlobContainerIndexInput createIndexInput(final byte[] input, long partSize, long minimumReadSize, Runnable onReadBlob) throws IOException { final FileInfo fileInfo = new FileInfo(randomAlphaOfLength(5), - new StoreFileMetaData("test", (long) input.length, "_checksum", Version.LATEST), + new StoreFileMetaData("test", input.length, "_checksum", Version.LATEST), partSize == input.length ? randomFrom( new ByteSizeValue(partSize, ByteSizeUnit.BYTES), @@ -94,7 +95,8 @@ public int read(byte[] b, int off, int len) throws IOException { }; } }); - return new DirectBlobContainerIndexInput(blobContainer, fileInfo, newIOContext(random()), minimumReadSize, + return new DirectBlobContainerIndexInput(blobContainer, fileInfo, newIOContext(random()), new IndexInputStats(0L, () -> 0L), + minimumReadSize, randomBoolean() ? BufferedIndexInput.BUFFER_SIZE : between(BufferedIndexInput.MIN_BUFFER_SIZE, BufferedIndexInput.BUFFER_SIZE)); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 2d9258505a935..0a251a31e8706 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -5,15 +5,16 @@ */ package org.elasticsearch.xpack.searchablesnapshots; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.search.TotalHits; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -26,9 +27,9 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; +import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsResponse; @@ -112,6 +113,9 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception { .setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")).get().getHits().getTotalHits(); logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits); + expectThrows(ResourceNotFoundException.class, "Searchable snapshot stats on a non snapshot searchable index should fail", + () -> client().execute(SearchableSnapshotsStatsAction.INSTANCE, new SearchableSnapshotsStatsRequest()).actionGet()); + CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(fsRepoName, snapshotName) .setWaitForCompletion(true).get(); final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); @@ -154,29 +158,11 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception { assertTrue(SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING.exists(settings)); assertRecovered(restoredIndexName, originalAllHits, originalBarHits); - - final SearchableSnapshotsStatsRequest request = new SearchableSnapshotsStatsRequest(restoredIndexName); - final ActionFuture future = client().execute(SearchableSnapshotsStatsAction.INSTANCE, request); - if (cacheEnabled) { - final SearchableSnapshotsStatsResponse statsResponse = future.actionGet(); - assertThat(statsResponse.getStats(), hasSize(getNumShards(restoredIndexName).totalNumShards)); - for (SearchableSnapshotShardStats stats : statsResponse.getStats()) { - assertThat(stats.getShardRouting().getIndexName(), equalTo(restoredIndexName)); - assertThat(stats.getStats().size(), greaterThan(0)); - for (SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : stats.getStats()) { - for (String ext : nonCachedExtensions) { - if (indexInputStats.getFileName().endsWith(ext)) { - assertEquals(indexInputStats.getFileName(), 0, indexInputStats.getOpenCount()); - } - } - } - } - } else { - expectThrows(ResourceNotFoundException.class, future::actionGet); - } + assertSearchableSnapshotStats(restoredIndexName, cacheEnabled, nonCachedExtensions); internalCluster().fullRestart(); assertRecovered(restoredIndexName, originalAllHits, originalBarHits); + assertSearchableSnapshotStats(restoredIndexName, cacheEnabled, nonCachedExtensions); internalCluster().ensureAtLeastNumDataNodes(2); @@ -192,6 +178,7 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception { .setWaitForNoRelocatingShards(true).setWaitForEvents(Priority.LANGUID).get().isTimedOut()); assertRecovered(restoredIndexName, originalAllHits, originalBarHits); + assertSearchableSnapshotStats(restoredIndexName, cacheEnabled, nonCachedExtensions); } private void assertRecovered(String indexName, TotalHits originalAllHits, TotalHits originalBarHits) throws Exception { @@ -239,4 +226,43 @@ private void assertRecovered(String indexName, TotalHits originalAllHits, TotalH assertThat(barTotalHits, equalTo(originalBarHits)); } } + + private void assertSearchableSnapshotStats(String indexName, boolean cacheEnabled, List nonCachedExtensions) { + final SearchableSnapshotsStatsResponse statsResponse = client().execute(SearchableSnapshotsStatsAction.INSTANCE, + new SearchableSnapshotsStatsRequest(indexName)).actionGet(); + final NumShards restoredNumShards = getNumShards(indexName); + assertThat(statsResponse.getStats(), hasSize(restoredNumShards.totalNumShards)); + + for (SearchableSnapshotShardStats stats : statsResponse.getStats()) { + final ShardRouting shardRouting = stats.getShardRouting(); + assertThat(stats.getShardRouting().getIndexName(), equalTo(indexName)); + if (shardRouting.started()) { + assertThat("Expecting stats to exist for at least 1 Lucene file", stats.getStats().size(), greaterThan(0)); + for (SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : stats.getStats()) { + final String fileName = indexInputStats.getFileName(); + assertThat("Unexpected open count for " + fileName + " of shard " + shardRouting, + indexInputStats.getOpenCount(), greaterThan(0L)); + assertThat("Unexpected close count for " + fileName + " of shard " + shardRouting, + indexInputStats.getCloseCount(), lessThanOrEqualTo(indexInputStats.getOpenCount())); + assertThat("Unexpected file length for " + fileName + " of shard " + shardRouting, + indexInputStats.getFileLength(), greaterThan(0L)); + + if (cacheEnabled == false || nonCachedExtensions.contains(IndexFileNames.getExtension(fileName))) { + assertThat("Expected at least 1 optimized or direct read for " + fileName + " of shard " + shardRouting, + Math.max(indexInputStats.getOptimizedBytesRead().getCount(), indexInputStats.getDirectBytesRead().getCount()), + greaterThan(0L)); + assertThat("Expected no cache read or write for " + fileName + " of shard " + shardRouting, + Math.max(indexInputStats.getCachedBytesRead().getCount(), indexInputStats.getCachedBytesWritten().getCount()), + equalTo(0L)); + } else { + assertThat("Expected at least 1 cache read or write for " + fileName + " of shard " + shardRouting, + Math.max(indexInputStats.getCachedBytesRead().getCount(), indexInputStats.getCachedBytesWritten().getCount()), + greaterThan(0L)); + assertThat("Expected no optimized read for " + fileName + " of shard " + shardRouting, + indexInputStats.getOptimizedBytesRead().getCount(), equalTo(0L)); + } + } + } + } + } }