diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 13c24b2fde0d1..28d91d0eed2c9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -100,20 +100,21 @@ public final void close() throws IOException { protected InputStream openInputStream(final long position, final long length) throws IOException { assert assertCurrentThreadMayAccessBlobStore(); - final long startPart = getPartNumberForPosition(position); - final long endPart = getPartNumberForPosition(position + length); - if ((startPart == endPart) || fileInfo.numberOfParts() == 1L) { - return blobContainer.readBlob(fileInfo.partName(startPart), getRelativePositionInPart(position), length); + if (fileInfo.numberOfParts() == 1L) { + assert position + length <= fileInfo.partBytes(0) + : "cannot read [" + position + "-" + (position + length) + "] from [" + fileInfo + "]"; + return blobContainer.readBlob(fileInfo.partName(0L), position, length); } else { + final long startPart = getPartNumberForPosition(position); + final long endPart = getPartNumberForPosition(position + length); return new SlicedInputStream(endPart - startPart + 1L) { @Override protected InputStream openSlice(long slice) throws IOException { final long currentPart = startPart + slice; - return blobContainer.readBlob( - fileInfo.partName(currentPart), - (currentPart == startPart) ? getRelativePositionInPart(position) : 0L, - (currentPart == endPart) ? getRelativePositionInPart(length) : getLengthOfPart(currentPart) - ); + final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L; + final long endInPart + = (currentPart == endPart) ? getRelativePositionInPart(position + length) : getLengthOfPart(currentPart); + return blobContainer.readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart); } }; } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java index f09de8ca1e2bb..ed299ab4849ea 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java @@ -33,6 +33,7 @@ import static org.elasticsearch.index.store.cache.TestUtils.createCacheService; import static org.elasticsearch.index.store.cache.TestUtils.singleBlobContainer; +import static org.elasticsearch.index.store.cache.TestUtils.singleSplitBlobContainer; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; import static org.hamcrest.Matchers.equalTo; @@ -51,13 +52,16 @@ public void testRandomReads() throws IOException { final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); final String blobName = randomUnicodeOfLength(10); - final StoreFileMetadata metadata = new StoreFileMetadata(fileName, input.length, "_na", Version.CURRENT.luceneVersion); + final StoreFileMetadata metaData = new StoreFileMetadata(fileName, input.length, "_na", Version.CURRENT.luceneVersion); + + final int partSize = randomBoolean() ? input.length : randomIntBetween(1, input.length); + final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L, - List.of(new BlobStoreIndexShardSnapshot.FileInfo(blobName, metadata, new ByteSizeValue(input.length))), 0L, 0L, 0, 0L); + List.of(new BlobStoreIndexShardSnapshot.FileInfo(blobName, metaData, new ByteSizeValue(partSize))), 0L, 0L, 0, 0L); - final BlobContainer singleBlobContainer = singleBlobContainer(blobName, input); + final BlobContainer singleBlobContainer = singleSplitBlobContainer(blobName, input, partSize); final BlobContainer blobContainer; - if (input.length <= cacheService.getCacheSize()) { + if (input.length == partSize && input.length <= cacheService.getCacheSize()) { blobContainer = new CountingBlobContainer(singleBlobContainer, cacheService.getRangeSize()); } else { blobContainer = singleBlobContainer; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index c7d916b7c61b6..393adf0ad09e5 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -82,70 +82,98 @@ public static void assertCounter( * Any attempt to read a different blob will throw a {@link FileNotFoundException} */ public static BlobContainer singleBlobContainer(final String blobName, final byte[] blobContent) { - return new BlobContainer() { - + return new MostlyUnimplementedFakeBlobContainer() { @Override public InputStream readBlob(String name, long position, long length) throws IOException { if (blobName.equals(name) == false) { throw new FileNotFoundException("Blob not found: " + name); } - return Streams.limitStream(new ByteArrayInputStream(blobContent, Math.toIntExact(position), blobContent.length), length); + return Streams.limitStream(new ByteArrayInputStream(blobContent, Math.toIntExact(position), + blobContent.length - Math.toIntExact(position)), length); } + }; + } - @Override - public long readBlobPreferredLength() { - return Long.MAX_VALUE; - } + static BlobContainer singleSplitBlobContainer(final String blobName, final byte[] blobContent, final int partSize) { + if (partSize >= blobContent.length) { + return singleBlobContainer(blobName, blobContent); + } else { + final String prefix = blobName + ".part"; + return new MostlyUnimplementedFakeBlobContainer() { + @Override + public InputStream readBlob(String name, long position, long length) throws IOException { + if (name.startsWith(prefix) == false) { + throw new FileNotFoundException("Blob not found: " + name); + } + assert position + length <= partSize + : "cannot read [" + position + "-" + (position + length) + "] from array part of length [" + partSize + "]"; + final int partNumber = Integer.parseInt(name.substring(prefix.length())); + final int positionInBlob = Math.toIntExact(position) + partSize * partNumber; + assert positionInBlob + length <= blobContent.length + : "cannot read [" + positionInBlob + "-" + (positionInBlob + length) + "] from array of length [" + + blobContent.length + "]"; + return Streams.limitStream(new ByteArrayInputStream(blobContent, + positionInBlob, blobContent.length - positionInBlob), length); + } + }; + } + } - @Override - public Map listBlobs() { - throw unsupportedException(); - } + private static class MostlyUnimplementedFakeBlobContainer implements BlobContainer { - @Override - public BlobPath path() { - throw unsupportedException(); - } + @Override + public long readBlobPreferredLength() { + return Long.MAX_VALUE; + } - @Override - public InputStream readBlob(String blobName) { - throw unsupportedException(); - } + @Override + public Map listBlobs() { + throw unsupportedException(); + } - @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) { - throw unsupportedException(); - } + @Override + public BlobPath path() { + throw unsupportedException(); + } - @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) { - throw unsupportedException(); - } + @Override + public InputStream readBlob(String blobName) { + throw unsupportedException(); + } - @Override - public DeleteResult delete() { - throw unsupportedException(); - } + @Override + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) { + throw unsupportedException(); + } - @Override - public void deleteBlobsIgnoringIfNotExists(List blobNames) { - throw unsupportedException(); - } + @Override + public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) { + throw unsupportedException(); + } - @Override - public Map children() { - throw unsupportedException(); - } + @Override + public DeleteResult delete() { + throw unsupportedException(); + } - @Override - public Map listBlobsByPrefix(String blobNamePrefix) { - throw unsupportedException(); - } + @Override + public void deleteBlobsIgnoringIfNotExists(List blobNames) { + throw unsupportedException(); + } - private UnsupportedOperationException unsupportedException() { - assert false : "this operation is not supported and should have not be called"; - return new UnsupportedOperationException("This operation is not supported"); - } - }; + @Override + public Map children() { + throw unsupportedException(); + } + + @Override + public Map listBlobsByPrefix(String blobNamePrefix) { + throw unsupportedException(); + } + + private UnsupportedOperationException unsupportedException() { + assert false : "this operation is not supported and should have not be called"; + return new UnsupportedOperationException("This operation is not supported"); + } } }