Skip to content

Fix small reads of multi-part blobs #54573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,21 @@ public final void close() throws IOException {

protected InputStream openInputStream(final long position, final long length) throws IOException {
assert assertCurrentThreadMayAccessBlobStore();
final long startPart = getPartNumberForPosition(position);
final long endPart = getPartNumberForPosition(position + length);
if ((startPart == endPart) || fileInfo.numberOfParts() == 1L) {
return blobContainer.readBlob(fileInfo.partName(startPart), getRelativePositionInPart(position), length);
if (fileInfo.numberOfParts() == 1L) {
assert position + length <= fileInfo.partBytes(0)
: "cannot read [" + position + "-" + (position + length) + "] from [" + fileInfo + "]";
return blobContainer.readBlob(fileInfo.partName(0L), position, length);
} else {
final long startPart = getPartNumberForPosition(position);
final long endPart = getPartNumberForPosition(position + length);
return new SlicedInputStream(endPart - startPart + 1L) {
@Override
protected InputStream openSlice(long slice) throws IOException {
final long currentPart = startPart + slice;
return blobContainer.readBlob(
fileInfo.partName(currentPart),
(currentPart == startPart) ? getRelativePositionInPart(position) : 0L,
(currentPart == endPart) ? getRelativePositionInPart(length) : getLengthOfPart(currentPart)
);
final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
final long endInPart
= (currentPart == endPart) ? getRelativePositionInPart(position + length) : getLengthOfPart(currentPart);
return blobContainer.readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import static org.elasticsearch.index.store.cache.TestUtils.createCacheService;
import static org.elasticsearch.index.store.cache.TestUtils.singleBlobContainer;
import static org.elasticsearch.index.store.cache.TestUtils.singleSplitBlobContainer;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -51,13 +52,16 @@ public void testRandomReads() throws IOException {
final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8);

final String blobName = randomUnicodeOfLength(10);
final StoreFileMetadata metadata = new StoreFileMetadata(fileName, input.length, "_na", Version.CURRENT.luceneVersion);
final StoreFileMetadata metaData = new StoreFileMetadata(fileName, input.length, "_na", Version.CURRENT.luceneVersion);

final int partSize = randomBoolean() ? input.length : randomIntBetween(1, input.length);

final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L,
List.of(new BlobStoreIndexShardSnapshot.FileInfo(blobName, metadata, new ByteSizeValue(input.length))), 0L, 0L, 0, 0L);
List.of(new BlobStoreIndexShardSnapshot.FileInfo(blobName, metaData, new ByteSizeValue(partSize))), 0L, 0L, 0, 0L);

final BlobContainer singleBlobContainer = singleBlobContainer(blobName, input);
final BlobContainer singleBlobContainer = singleSplitBlobContainer(blobName, input, partSize);
final BlobContainer blobContainer;
if (input.length <= cacheService.getCacheSize()) {
if (input.length == partSize && input.length <= cacheService.getCacheSize()) {
blobContainer = new CountingBlobContainer(singleBlobContainer, cacheService.getRangeSize());
} else {
blobContainer = singleBlobContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,70 +82,98 @@ public static void assertCounter(
* Any attempt to read a different blob will throw a {@link FileNotFoundException}
*/
public static BlobContainer singleBlobContainer(final String blobName, final byte[] blobContent) {
return new BlobContainer() {

return new MostlyUnimplementedFakeBlobContainer() {
@Override
public InputStream readBlob(String name, long position, long length) throws IOException {
if (blobName.equals(name) == false) {
throw new FileNotFoundException("Blob not found: " + name);
}
return Streams.limitStream(new ByteArrayInputStream(blobContent, Math.toIntExact(position), blobContent.length), length);
return Streams.limitStream(new ByteArrayInputStream(blobContent, Math.toIntExact(position),
blobContent.length - Math.toIntExact(position)), length);
}
};
}

@Override
public long readBlobPreferredLength() {
return Long.MAX_VALUE;
}
static BlobContainer singleSplitBlobContainer(final String blobName, final byte[] blobContent, final int partSize) {
if (partSize >= blobContent.length) {
return singleBlobContainer(blobName, blobContent);
} else {
final String prefix = blobName + ".part";
return new MostlyUnimplementedFakeBlobContainer() {
@Override
public InputStream readBlob(String name, long position, long length) throws IOException {
if (name.startsWith(prefix) == false) {
throw new FileNotFoundException("Blob not found: " + name);
}
assert position + length <= partSize
: "cannot read [" + position + "-" + (position + length) + "] from array part of length [" + partSize + "]";
final int partNumber = Integer.parseInt(name.substring(prefix.length()));
final int positionInBlob = Math.toIntExact(position) + partSize * partNumber;
assert positionInBlob + length <= blobContent.length
: "cannot read [" + positionInBlob + "-" + (positionInBlob + length) + "] from array of length ["
+ blobContent.length + "]";
return Streams.limitStream(new ByteArrayInputStream(blobContent,
positionInBlob, blobContent.length - positionInBlob), length);
}
};
}
}

@Override
public Map<String, BlobMetadata> listBlobs() {
throw unsupportedException();
}
private static class MostlyUnimplementedFakeBlobContainer implements BlobContainer {

@Override
public BlobPath path() {
throw unsupportedException();
}
@Override
public long readBlobPreferredLength() {
return Long.MAX_VALUE;
}

@Override
public InputStream readBlob(String blobName) {
throw unsupportedException();
}
@Override
public Map<String, BlobMetadata> listBlobs() {
throw unsupportedException();
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
throw unsupportedException();
}
@Override
public BlobPath path() {
throw unsupportedException();
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
throw unsupportedException();
}
@Override
public InputStream readBlob(String blobName) {
throw unsupportedException();
}

@Override
public DeleteResult delete() {
throw unsupportedException();
}
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
throw unsupportedException();
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) {
throw unsupportedException();
}
@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
throw unsupportedException();
}

@Override
public Map<String, BlobContainer> children() {
throw unsupportedException();
}
@Override
public DeleteResult delete() {
throw unsupportedException();
}

@Override
public Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) {
throw unsupportedException();
}
@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) {
throw unsupportedException();
}

private UnsupportedOperationException unsupportedException() {
assert false : "this operation is not supported and should have not be called";
return new UnsupportedOperationException("This operation is not supported");
}
};
@Override
public Map<String, BlobContainer> children() {
throw unsupportedException();
}

@Override
public Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) {
throw unsupportedException();
}

private UnsupportedOperationException unsupportedException() {
assert false : "this operation is not supported and should have not be called";
return new UnsupportedOperationException("This operation is not supported");
}
}
}