From 9215ccec85e9e0e849bfcf2a6603f26f11cb1b36 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 20 Mar 2020 16:35:43 +0100 Subject: [PATCH 1/3] Extract CacheBufferedIndexInput --- .../BaseSearchableSnapshotDirectory.java | 4 + .../BaseSearchableSnapshotIndexInput.java | 4 + .../cache/CacheBufferedIndexInput.java | 292 +++++++++++++++++ .../cache/CacheDirectory.java | 295 +----------------- .../cache/IndexInputStats.java | 1 - 5 files changed, 310 insertions(+), 286 deletions(-) create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java index 5a1950ab0b923..40f697b0876be 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java @@ -33,6 +33,10 @@ public BaseSearchableSnapshotDirectory(BlobContainer blobContainer, BlobStoreInd this.closed = new AtomicBoolean(false); } + public BlobContainer blobContainer() { + return blobContainer; + } + protected final FileInfo fileInfo(final String name) throws FileNotFoundException { return snapshot.indexFiles() .stream() diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 983a97534a4a1..4d0ea1030d7e5 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.index.store; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.BytesRef; @@ -19,6 +21,8 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput { + protected static final Logger logger = LogManager.getLogger(BaseSearchableSnapshotIndexInput.class); + protected final BlobContainer blobContainer; protected final FileInfo fileInfo; protected final IOContext context; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java new file mode 100644 index 0000000000000..b4105269ce3d1 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java @@ -0,0 +1,292 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.io.Channels; +import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class CacheBufferedIndexInput extends BaseSearchableSnapshotIndexInput { + + private static final int COPY_BUFFER_SIZE = 8192; + + private final CacheDirectory directory; + private final long offset; + private final long end; + private final CacheFileReference cacheFileReference; + private final IndexInputStats stats; + + // the following are only mutable so they can be adjusted after cloning + private AtomicBoolean closed; + private boolean isClone; + + // last read position is kept around in order to detect (non)contiguous reads for stats + private long lastReadPosition; + // last seek position is kept around in order to detect forward/backward seeks for stats + private long lastSeekPosition; + + CacheBufferedIndexInput(CacheDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats) { + this("CachedBufferedIndexInput(" + fileInfo.physicalName() + ")", directory, fileInfo, context, stats, 0L, fileInfo.length(), + false, null); + stats.incrementOpenCount(); + } + + private CacheBufferedIndexInput(String resourceDesc, CacheDirectory directory, FileInfo fileInfo, IOContext context, + IndexInputStats stats, long offset, long length, boolean isClone, + @Nullable CacheFileReference cacheFileReference) { + super(resourceDesc, directory.blobContainer(), fileInfo, context); + this.directory = directory; + this.offset = offset; + this.stats = stats; + this.end = offset + length; + this.closed = new AtomicBoolean(false); + this.isClone = isClone; + this.cacheFileReference = Objects.requireNonNullElseGet(cacheFileReference, + () -> new CacheFileReference(fileInfo.physicalName(), fileInfo.length())); + this.lastReadPosition = this.offset; + this.lastSeekPosition = this.offset; + } + + @Override + public long length() { + return end - offset; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + if (isClone == false) { + stats.incrementCloseCount(); + cacheFileReference.releaseOnClose(); + } + } + } + + @Override + protected void readInternal(final byte[] buffer, final int offset, final int length) throws IOException { + final long position = getFilePointer() + this.offset; + + int totalBytesRead = 0; + while (totalBytesRead < length) { + final long pos = position + totalBytesRead; + final int off = offset + totalBytesRead; + final int len = length - totalBytesRead; + + int bytesRead = 0; + try { + final CacheFile cacheFile = cacheFileReference.get(); + if (cacheFile == null) { + throw new AlreadyClosedException("Failed to acquire a non-evicted cache file"); + } + + try (ReleasableLock ignored = cacheFile.fileLock()) { + bytesRead = cacheFile.fetchRange(pos, + (start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, buffer, off, len), + (start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)) + .get(); + } + } catch (final Exception e) { + if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) { + try { + // cache file was evicted during the range fetching, read bytes directly from source + bytesRead = readDirectly(pos, pos + len, buffer, off); + continue; + } catch (Exception inner) { + e.addSuppressed(inner); + } + } + throw new IOException("Fail to read data from cache", e); + + } finally { + totalBytesRead += bytesRead; + } + } + assert totalBytesRead == length : "partial read operation, read [" + totalBytesRead + "] bytes of [" + length + "]"; + stats.incrementBytesRead(lastReadPosition, position, totalBytesRead); + lastReadPosition = position + totalBytesRead; + lastSeekPosition = lastReadPosition; + } + + int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException { + assert assertFileChannelOpen(fc); + int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position))); + stats.addCachedBytesRead(bytesRead); + return bytesRead; + } + + @SuppressForbidden(reason = "Use positional writes on purpose") + void writeCacheFile(FileChannel fc, long start, long end) throws IOException { + assert assertFileChannelOpen(fc); + final long length = end - start; + final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))]; + logger.trace(() -> new ParameterizedMessage("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference)); + + int bytesCopied = 0; + final long startTimeNanos = directory.statsCurrentTimeNanos(); + try (InputStream input = openInputStream(start, length)) { + stats.incrementInnerOpenCount(); + long remaining = end - start; + while (remaining > 0) { + final int len = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length; + int bytesRead = input.read(copyBuffer, 0, len); + fc.write(ByteBuffer.wrap(copyBuffer, 0, bytesRead), start + bytesCopied); + bytesCopied += bytesRead; + remaining -= bytesRead; + } + final long endTimeNanos = directory.statsCurrentTimeNanos(); + stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos); + } + } + + @Override + protected void seekInternal(long pos) throws IOException { + if (pos > length()) { + throw new EOFException("Reading past end of file [position=" + pos + ", length=" + length() + "] for " + toString()); + } else if (pos < 0L) { + throw new IOException("Seeking to negative position [" + pos + "] for " + toString()); + } + final long position = pos + this.offset; + stats.incrementSeeks(lastSeekPosition, position); + lastSeekPosition = position; + } + + @Override + public CacheBufferedIndexInput clone() { + final CacheBufferedIndexInput clone = (CacheBufferedIndexInput) super.clone(); + clone.closed = new AtomicBoolean(false); + clone.isClone = true; + return clone; + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) { + if (offset < 0 || length < 0 || offset + length > this.length()) { + throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset + + ",length=" + length + ",fileLength=" + this.length() + ": " + this); + } + return new CacheBufferedIndexInput(getFullSliceDescription(sliceDescription), directory, fileInfo, context, stats, + this.offset + offset, length, true, cacheFileReference); + } + + @Override + public String toString() { + return "CacheBufferedIndexInput{" + + "cacheFileReference=" + cacheFileReference + + ", offset=" + offset + + ", end=" + end + + ", length=" + length() + + ", position=" + getFilePointer() + + '}'; + } + + private int readDirectly(long start, long end, byte[] buffer, int offset) throws IOException { + final long length = end - start; + final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))]; + logger.trace(() -> + new ParameterizedMessage("direct reading of range [{}-{}] for cache file [{}]", start, end, cacheFileReference)); + + int bytesCopied = 0; + final long startTimeNanos = directory.statsCurrentTimeNanos(); + try (InputStream input = openInputStream(start, length)) { + stats.incrementInnerOpenCount(); + long remaining = end - start; + while (remaining > 0) { + final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length; + int bytesRead = input.read(copyBuffer, 0, len); + System.arraycopy(copyBuffer, 0, buffer, offset + bytesCopied, len); + bytesCopied += bytesRead; + remaining -= bytesRead; + } + final long endTimeNanos = directory.statsCurrentTimeNanos(); + stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos); + } + return bytesCopied; + } + + private class CacheFileReference implements CacheFile.EvictionListener { + + private final long fileLength; + private final CacheKey cacheKey; + private final AtomicReference cacheFile = new AtomicReference<>(); // null if evicted or not yet acquired + + private CacheFileReference(String fileName, long fileLength) { + this.cacheKey = directory.createCacheKey(fileName); + this.fileLength = fileLength; + } + + @Nullable + CacheFile get() throws Exception { + CacheFile currentCacheFile = cacheFile.get(); + if (currentCacheFile != null) { + return currentCacheFile; + } + + final CacheFile newCacheFile = directory.getCacheFile(cacheKey, fileLength); + synchronized (this) { + currentCacheFile = cacheFile.get(); + if (currentCacheFile != null) { + return currentCacheFile; + } + if (newCacheFile.acquire(this)) { + final CacheFile previousCacheFile = cacheFile.getAndSet(newCacheFile); + assert previousCacheFile == null; + return newCacheFile; + } + } + return null; + } + + @Override + public void onEviction(final CacheFile evictedCacheFile) { + synchronized (this) { + if (cacheFile.compareAndSet(evictedCacheFile, null)) { + evictedCacheFile.release(this); + } + } + } + + void releaseOnClose() { + synchronized (this) { + final CacheFile currentCacheFile = cacheFile.getAndSet(null); + if (currentCacheFile != null) { + currentCacheFile.release(this); + } + } + } + + @Override + public String toString() { + return "CacheFileReference{" + + "cacheKey='" + cacheKey + '\'' + + ", fileLength=" + fileLength + + ", acquired=" + (cacheFile.get() != null) + + '}'; + } + } + + private static boolean assertFileChannelOpen(FileChannel fileChannel) { + assert fileChannel != null; + assert fileChannel.isOpen(); + return true; + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java index dc55b46b43593..4b4d926d0066f 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java @@ -5,39 +5,25 @@ */ package org.elasticsearch.xpack.searchablesnapshots.cache; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.BaseSearchableSnapshotDirectory; -import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.SnapshotId; -import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; /** @@ -45,9 +31,6 @@ */ public class CacheDirectory extends BaseSearchableSnapshotDirectory { - private static final Logger logger = LogManager.getLogger(CacheDirectory.class); - private static final int COPY_BUFFER_SIZE = 8192; - private final Map stats; private final CacheService cacheService; private final SnapshotId snapshotId; @@ -70,10 +53,14 @@ public CacheDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobCont this.currentTimeNanosSupplier = Objects.requireNonNull(currentTimeNanosSupplier); } - private CacheKey createCacheKey(String fileName) { + CacheKey createCacheKey(String fileName) { return new CacheKey(snapshotId, indexId, shardId, fileName); } + CacheFile getCacheFile(CacheKey cacheKey, long fileLength) throws Exception { + return cacheService.get(cacheKey, fileLength, cacheDir); + } + public SnapshotId getSnapshotId() { return snapshotId; } @@ -101,6 +88,10 @@ IndexInputStats createIndexInputStats(final long fileLength) { return new IndexInputStats(fileLength); } + long statsCurrentTimeNanos() { + return currentTimeNanosSupplier.getAsLong(); + } + @Override protected void innerClose() { // Ideally we could let the cache evict/remove cached files by itself after the @@ -117,272 +108,6 @@ public IndexInput openInput(final String name, final IOContext context) throws I ensureOpen(); final FileInfo fileInfo = fileInfo(name); final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length())); - return new CacheBufferedIndexInput(blobContainer, fileInfo, context, inputStats); - } - - private class CacheFileReference implements CacheFile.EvictionListener { - - private final long fileLength; - private final CacheKey cacheKey; - private final AtomicReference cacheFile = new AtomicReference<>(); // null if evicted or not yet acquired - - private CacheFileReference(String fileName, long fileLength) { - this.cacheKey = createCacheKey(fileName); - this.fileLength = fileLength; - } - - @Nullable - CacheFile get() throws Exception { - CacheFile currentCacheFile = cacheFile.get(); - if (currentCacheFile != null) { - return currentCacheFile; - } - - final CacheFile newCacheFile = cacheService.get(cacheKey, fileLength, cacheDir); - synchronized (this) { - currentCacheFile = cacheFile.get(); - if (currentCacheFile != null) { - return currentCacheFile; - } - if (newCacheFile.acquire(this)) { - final CacheFile previousCacheFile = cacheFile.getAndSet(newCacheFile); - assert previousCacheFile == null; - return newCacheFile; - } - } - return null; - } - - String getFileName() { - return cacheKey.getFileName(); - } - - @Override - public void onEviction(final CacheFile evictedCacheFile) { - synchronized (this) { - if (cacheFile.compareAndSet(evictedCacheFile, null)) { - evictedCacheFile.release(this); - } - } - } - - void releaseOnClose() { - synchronized (this) { - final CacheFile currentCacheFile = cacheFile.getAndSet(null); - if (currentCacheFile != null) { - currentCacheFile.release(this); - } - } - } - - @Override - public String toString() { - return "CacheFileReference{" + - "cacheKey='" + cacheKey + '\'' + - ", fileLength=" + fileLength + - ", cacheDir=" + cacheDir + - ", acquired=" + (cacheFile.get() != null) + - '}'; - } - } - - public class CacheBufferedIndexInput extends BaseSearchableSnapshotIndexInput { - - private final long offset; - private final long end; - private final CacheFileReference cacheFileReference; - private final IndexInputStats stats; - - // the following are only mutable so they can be adjusted after cloning - private AtomicBoolean closed; - private boolean isClone; - - // last read position is kept around in order to detect (non)contiguous reads for stats - private long lastReadPosition; - // last seek position is kept around in order to detect forward/backward seeks for stats - private long lastSeekPosition; - - CacheBufferedIndexInput(BlobContainer blobContainer, FileInfo fileInfo, IOContext context, IndexInputStats stats) { - this("CachedBufferedIndexInput(" + fileInfo.physicalName() + ")", blobContainer, fileInfo, context, - new CacheFileReference(fileInfo.physicalName(), fileInfo.length()), stats, - 0L, fileInfo.length(), false); - stats.incrementOpenCount(); - } - - private CacheBufferedIndexInput(String resourceDesc, BlobContainer blobContainer, FileInfo fileInfo, IOContext context, - CacheFileReference cacheFileReference, IndexInputStats stats, - long offset, long length, boolean isClone) { - super(resourceDesc, blobContainer, fileInfo, context); - this.offset = offset; - this.cacheFileReference = cacheFileReference; - this.stats = stats; - this.end = offset + length; - this.closed = new AtomicBoolean(false); - this.isClone = isClone; - this.lastReadPosition = this.offset; - this.lastSeekPosition = this.offset; - } - - @Override - public long length() { - return end - offset; - } - - @Override - public void close() { - if (closed.compareAndSet(false, true)) { - if (isClone == false) { - stats.incrementCloseCount(); - cacheFileReference.releaseOnClose(); - } - } - } - - @Override - protected void readInternal(final byte[] buffer, final int offset, final int length) throws IOException { - final long position = getFilePointer() + this.offset; - - int totalBytesRead = 0; - while (totalBytesRead < length) { - final long pos = position + totalBytesRead; - final int off = offset + totalBytesRead; - final int len = length - totalBytesRead; - - int bytesRead = 0; - try { - final CacheFile cacheFile = cacheFileReference.get(); - if (cacheFile == null) { - throw new AlreadyClosedException("Failed to acquire a non-evicted cache file"); - } - - try (ReleasableLock ignored = cacheFile.fileLock()) { - bytesRead = cacheFile.fetchRange(pos, - (start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, buffer, off, len), - (start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)) - .get(); - } - } catch (final Exception e) { - if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) { - try { - // cache file was evicted during the range fetching, read bytes directly from source - bytesRead = readDirectly(pos, pos + len, buffer, off); - continue; - } catch (Exception inner) { - e.addSuppressed(inner); - } - } - throw new IOException("Fail to read data from cache", e); - - } finally { - totalBytesRead += bytesRead; - } - } - assert totalBytesRead == length : "partial read operation, read [" + totalBytesRead + "] bytes of [" + length + "]"; - stats.incrementBytesRead(lastReadPosition, position, totalBytesRead); - lastReadPosition = position + totalBytesRead; - lastSeekPosition = lastReadPosition; - } - - int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException { - assert assertFileChannelOpen(fc); - int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position))); - stats.addCachedBytesRead(bytesRead); - return bytesRead; - } - - @SuppressForbidden(reason = "Use positional writes on purpose") - void writeCacheFile(FileChannel fc, long start, long end) throws IOException { - assert assertFileChannelOpen(fc); - final long length = end - start; - final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))]; - logger.trace(() -> new ParameterizedMessage("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference)); - - int bytesCopied = 0; - final long startTimeNanos = currentTimeNanosSupplier.getAsLong(); - try (InputStream input = openInputStream(start, length)) { - stats.incrementInnerOpenCount(); - long remaining = end - start; - while (remaining > 0) { - final int len = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length; - int bytesRead = input.read(copyBuffer, 0, len); - fc.write(ByteBuffer.wrap(copyBuffer, 0, bytesRead), start + bytesCopied); - bytesCopied += bytesRead; - remaining -= bytesRead; - } - final long endTimeNanos = currentTimeNanosSupplier.getAsLong(); - stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos); - } - } - - @Override - protected void seekInternal(long pos) throws IOException { - if (pos > length()) { - throw new EOFException("Reading past end of file [position=" + pos + ", length=" + length() + "] for " + toString()); - } else if (pos < 0L) { - throw new IOException("Seeking to negative position [" + pos + "] for " + toString()); - } - final long position = pos + this.offset; - stats.incrementSeeks(lastSeekPosition, position); - lastSeekPosition = position; - } - - @Override - public CacheBufferedIndexInput clone() { - final CacheBufferedIndexInput clone = (CacheBufferedIndexInput) super.clone(); - clone.closed = new AtomicBoolean(false); - clone.isClone = true; - return clone; - } - - @Override - public IndexInput slice(String sliceDescription, long offset, long length) { - if (offset < 0 || length < 0 || offset + length > this.length()) { - throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset - + ",length=" + length + ",fileLength=" + this.length() + ": " + this); - } - return new CacheBufferedIndexInput(getFullSliceDescription(sliceDescription), blobContainer, fileInfo, context, - cacheFileReference, stats, this.offset + offset, length, true); - } - - @Override - public String toString() { - return "CacheBufferedIndexInput{" + - "cacheFileReference=" + cacheFileReference + - ", offset=" + offset + - ", end=" + end + - ", length=" + length() + - ", position=" + getFilePointer() + - '}'; - } - - private int readDirectly(long start, long end, byte[] buffer, int offset) throws IOException { - final long length = end - start; - final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))]; - logger.trace(() -> - new ParameterizedMessage("direct reading of range [{}-{}] for cache file [{}]", start, end, cacheFileReference)); - - int bytesCopied = 0; - final long startTimeNanos = currentTimeNanosSupplier.getAsLong(); - try (InputStream input = openInputStream(start, length)) { - stats.incrementInnerOpenCount(); - long remaining = end - start; - while (remaining > 0) { - final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length; - int bytesRead = input.read(copyBuffer, 0, len); - System.arraycopy(copyBuffer, 0, buffer, offset + bytesCopied, len); - bytesCopied += bytesRead; - remaining -= bytesRead; - } - final long endTimeNanos = currentTimeNanosSupplier.getAsLong(); - stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos); - } - return bytesCopied; - } - } - - private static boolean assertFileChannelOpen(FileChannel fileChannel) { - assert fileChannel != null; - assert fileChannel.isOpen(); - return true; + return new CacheBufferedIndexInput(this, fileInfo, context, inputStats); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/IndexInputStats.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/IndexInputStats.java index 8f95b67aa08cc..2f69c65117c60 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/IndexInputStats.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/IndexInputStats.java @@ -8,7 +8,6 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory.CacheBufferedIndexInput; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; From 9028f8aab9124bbb928669ad65282858f3cb5fee Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 20 Mar 2020 17:28:14 +0100 Subject: [PATCH 2/3] apply feedback --- .../store/BaseSearchableSnapshotIndexInput.java | 4 ---- .../store/SearchableSnapshotIndexInput.java | 4 ++++ .../cache/CacheBufferedIndexInput.java | 17 ++++++++++------- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 4d0ea1030d7e5..983a97534a4a1 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.index.store; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.BytesRef; @@ -21,8 +19,6 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput { - protected static final Logger logger = LogManager.getLogger(BaseSearchableSnapshotIndexInput.class); - protected final BlobContainer blobContainer; protected final FileInfo fileInfo; protected final IOContext context; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java index f2eb0324c88c9..7294cc4a987e9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.index.store; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -48,6 +50,8 @@ */ public class SearchableSnapshotIndexInput extends BaseSearchableSnapshotIndexInput { + private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexInput.class); + private final long offset; private final long length; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java index b4105269ce3d1..01126d589dfea 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.searchablesnapshots.cache; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; @@ -22,12 +24,12 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; public class CacheBufferedIndexInput extends BaseSearchableSnapshotIndexInput { + private static final Logger logger = LogManager.getLogger(CacheBufferedIndexInput.class); private static final int COPY_BUFFER_SIZE = 8192; private final CacheDirectory directory; @@ -47,13 +49,13 @@ public class CacheBufferedIndexInput extends BaseSearchableSnapshotIndexInput { CacheBufferedIndexInput(CacheDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats) { this("CachedBufferedIndexInput(" + fileInfo.physicalName() + ")", directory, fileInfo, context, stats, 0L, fileInfo.length(), - false, null); + false, new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length())); stats.incrementOpenCount(); } private CacheBufferedIndexInput(String resourceDesc, CacheDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, long offset, long length, boolean isClone, - @Nullable CacheFileReference cacheFileReference) { + CacheFileReference cacheFileReference) { super(resourceDesc, directory.blobContainer(), fileInfo, context); this.directory = directory; this.offset = offset; @@ -61,8 +63,7 @@ private CacheBufferedIndexInput(String resourceDesc, CacheDirectory directory, F this.end = offset + length; this.closed = new AtomicBoolean(false); this.isClone = isClone; - this.cacheFileReference = Objects.requireNonNullElseGet(cacheFileReference, - () -> new CacheFileReference(fileInfo.physicalName(), fileInfo.length())); + this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; } @@ -223,15 +224,17 @@ private int readDirectly(long start, long end, byte[] buffer, int offset) throws return bytesCopied; } - private class CacheFileReference implements CacheFile.EvictionListener { + private static class CacheFileReference implements CacheFile.EvictionListener { private final long fileLength; private final CacheKey cacheKey; + private final CacheDirectory directory; private final AtomicReference cacheFile = new AtomicReference<>(); // null if evicted or not yet acquired - private CacheFileReference(String fileName, long fileLength) { + private CacheFileReference(CacheDirectory directory, String fileName, long fileLength) { this.cacheKey = directory.createCacheKey(fileName); this.fileLength = fileLength; + this.directory = directory; } @Nullable From 251e4b35c937d62c7e48c81b3fb1f5064db5c1f8 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 20 Mar 2020 17:40:23 +0100 Subject: [PATCH 3/3] remove unused --- .../index/store/SearchableSnapshotIndexInput.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java index 7294cc4a987e9..f2eb0324c88c9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.index.store; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -50,8 +48,6 @@ */ public class SearchableSnapshotIndexInput extends BaseSearchableSnapshotIndexInput { - private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexInput.class); - private final long offset; private final long length;