Skip to content

Commit 6e7a5e0

Browse files
authored
Fix small reads of multi-part blobs (#54573)
1 parent e8cf4f9 commit 6e7a5e0

File tree

3 files changed

+94
-61
lines changed

3 files changed

+94
-61
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,21 @@ public final void close() throws IOException {
100100

101101
protected InputStream openInputStream(final long position, final long length) throws IOException {
102102
assert assertCurrentThreadMayAccessBlobStore();
103-
final long startPart = getPartNumberForPosition(position);
104-
final long endPart = getPartNumberForPosition(position + length);
105-
if ((startPart == endPart) || fileInfo.numberOfParts() == 1L) {
106-
return blobContainer.readBlob(fileInfo.partName(startPart), getRelativePositionInPart(position), length);
103+
if (fileInfo.numberOfParts() == 1L) {
104+
assert position + length <= fileInfo.partBytes(0)
105+
: "cannot read [" + position + "-" + (position + length) + "] from [" + fileInfo + "]";
106+
return blobContainer.readBlob(fileInfo.partName(0L), position, length);
107107
} else {
108+
final long startPart = getPartNumberForPosition(position);
109+
final long endPart = getPartNumberForPosition(position + length);
108110
return new SlicedInputStream(endPart - startPart + 1L) {
109111
@Override
110112
protected InputStream openSlice(long slice) throws IOException {
111113
final long currentPart = startPart + slice;
112-
return blobContainer.readBlob(
113-
fileInfo.partName(currentPart),
114-
(currentPart == startPart) ? getRelativePositionInPart(position) : 0L,
115-
(currentPart == endPart) ? getRelativePositionInPart(length) : getLengthOfPart(currentPart)
116-
);
114+
final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
115+
final long endInPart
116+
= (currentPart == endPart) ? getRelativePositionInPart(position + length) : getLengthOfPart(currentPart);
117+
return blobContainer.readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart);
117118
}
118119
};
119120
}

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import static org.elasticsearch.index.store.cache.TestUtils.createCacheService;
3535
import static org.elasticsearch.index.store.cache.TestUtils.singleBlobContainer;
36+
import static org.elasticsearch.index.store.cache.TestUtils.singleSplitBlobContainer;
3637
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
3738
import static org.hamcrest.Matchers.equalTo;
3839

@@ -51,13 +52,16 @@ public void testRandomReads() throws IOException {
5152
final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8);
5253

5354
final String blobName = randomUnicodeOfLength(10);
54-
final StoreFileMetadata metadata = new StoreFileMetadata(fileName, input.length, "_na", Version.CURRENT.luceneVersion);
55+
final StoreFileMetadata metaData = new StoreFileMetadata(fileName, input.length, "_na", Version.CURRENT.luceneVersion);
56+
57+
final int partSize = randomBoolean() ? input.length : randomIntBetween(1, input.length);
58+
5559
final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L,
56-
List.of(new BlobStoreIndexShardSnapshot.FileInfo(blobName, metadata, new ByteSizeValue(input.length))), 0L, 0L, 0, 0L);
60+
List.of(new BlobStoreIndexShardSnapshot.FileInfo(blobName, metaData, new ByteSizeValue(partSize))), 0L, 0L, 0, 0L);
5761

58-
final BlobContainer singleBlobContainer = singleBlobContainer(blobName, input);
62+
final BlobContainer singleBlobContainer = singleSplitBlobContainer(blobName, input, partSize);
5963
final BlobContainer blobContainer;
60-
if (input.length <= cacheService.getCacheSize()) {
64+
if (input.length == partSize && input.length <= cacheService.getCacheSize()) {
6165
blobContainer = new CountingBlobContainer(singleBlobContainer, cacheService.getRangeSize());
6266
} else {
6367
blobContainer = singleBlobContainer;

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java

Lines changed: 76 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -82,70 +82,98 @@ public static void assertCounter(
8282
* Any attempt to read a different blob will throw a {@link FileNotFoundException}
8383
*/
8484
public static BlobContainer singleBlobContainer(final String blobName, final byte[] blobContent) {
85-
return new BlobContainer() {
86-
85+
return new MostlyUnimplementedFakeBlobContainer() {
8786
@Override
8887
public InputStream readBlob(String name, long position, long length) throws IOException {
8988
if (blobName.equals(name) == false) {
9089
throw new FileNotFoundException("Blob not found: " + name);
9190
}
92-
return Streams.limitStream(new ByteArrayInputStream(blobContent, Math.toIntExact(position), blobContent.length), length);
91+
return Streams.limitStream(new ByteArrayInputStream(blobContent, Math.toIntExact(position),
92+
blobContent.length - Math.toIntExact(position)), length);
9393
}
94+
};
95+
}
9496

95-
@Override
96-
public long readBlobPreferredLength() {
97-
return Long.MAX_VALUE;
98-
}
97+
static BlobContainer singleSplitBlobContainer(final String blobName, final byte[] blobContent, final int partSize) {
98+
if (partSize >= blobContent.length) {
99+
return singleBlobContainer(blobName, blobContent);
100+
} else {
101+
final String prefix = blobName + ".part";
102+
return new MostlyUnimplementedFakeBlobContainer() {
103+
@Override
104+
public InputStream readBlob(String name, long position, long length) throws IOException {
105+
if (name.startsWith(prefix) == false) {
106+
throw new FileNotFoundException("Blob not found: " + name);
107+
}
108+
assert position + length <= partSize
109+
: "cannot read [" + position + "-" + (position + length) + "] from array part of length [" + partSize + "]";
110+
final int partNumber = Integer.parseInt(name.substring(prefix.length()));
111+
final int positionInBlob = Math.toIntExact(position) + partSize * partNumber;
112+
assert positionInBlob + length <= blobContent.length
113+
: "cannot read [" + positionInBlob + "-" + (positionInBlob + length) + "] from array of length ["
114+
+ blobContent.length + "]";
115+
return Streams.limitStream(new ByteArrayInputStream(blobContent,
116+
positionInBlob, blobContent.length - positionInBlob), length);
117+
}
118+
};
119+
}
120+
}
99121

100-
@Override
101-
public Map<String, BlobMetadata> listBlobs() {
102-
throw unsupportedException();
103-
}
122+
private static class MostlyUnimplementedFakeBlobContainer implements BlobContainer {
104123

105-
@Override
106-
public BlobPath path() {
107-
throw unsupportedException();
108-
}
124+
@Override
125+
public long readBlobPreferredLength() {
126+
return Long.MAX_VALUE;
127+
}
109128

110-
@Override
111-
public InputStream readBlob(String blobName) {
112-
throw unsupportedException();
113-
}
129+
@Override
130+
public Map<String, BlobMetadata> listBlobs() {
131+
throw unsupportedException();
132+
}
114133

115-
@Override
116-
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
117-
throw unsupportedException();
118-
}
134+
@Override
135+
public BlobPath path() {
136+
throw unsupportedException();
137+
}
119138

120-
@Override
121-
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
122-
throw unsupportedException();
123-
}
139+
@Override
140+
public InputStream readBlob(String blobName) {
141+
throw unsupportedException();
142+
}
124143

125-
@Override
126-
public DeleteResult delete() {
127-
throw unsupportedException();
128-
}
144+
@Override
145+
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
146+
throw unsupportedException();
147+
}
129148

130-
@Override
131-
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) {
132-
throw unsupportedException();
133-
}
149+
@Override
150+
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
151+
throw unsupportedException();
152+
}
134153

135-
@Override
136-
public Map<String, BlobContainer> children() {
137-
throw unsupportedException();
138-
}
154+
@Override
155+
public DeleteResult delete() {
156+
throw unsupportedException();
157+
}
139158

140-
@Override
141-
public Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) {
142-
throw unsupportedException();
143-
}
159+
@Override
160+
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) {
161+
throw unsupportedException();
162+
}
144163

145-
private UnsupportedOperationException unsupportedException() {
146-
assert false : "this operation is not supported and should have not be called";
147-
return new UnsupportedOperationException("This operation is not supported");
148-
}
149-
};
164+
@Override
165+
public Map<String, BlobContainer> children() {
166+
throw unsupportedException();
167+
}
168+
169+
@Override
170+
public Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) {
171+
throw unsupportedException();
172+
}
173+
174+
private UnsupportedOperationException unsupportedException() {
175+
assert false : "this operation is not supported and should have not be called";
176+
return new UnsupportedOperationException("This operation is not supported");
177+
}
150178
}
151179
}

0 commit comments

Comments
 (0)