Skip to content

Commit 957a7ae

Browse files
authored
SearchableSnapshotIndexInput should read all bytes (#52199)
This commit fixes a bug in SearchableSnapshotIndexInput which does not use the InputStream.read(byte[], int, int) method correctly. The javadoc indicates that: "An attempt is made to read as many as len bytes, but a smaller number may be read." Consequently, SearchableSnapshotIndexInput should try to read more bytes when a first reading operation has not brought back all the requested bytes.
1 parent c97c97c commit 957a7ae

File tree

2 files changed

+39
-4
lines changed

2 files changed

+39
-4
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.lucene.store.BufferedIndexInput;
99
import org.apache.lucene.store.IndexInput;
1010
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.common.CheckedRunnable;
1112
import org.elasticsearch.common.Nullable;
1213
import org.elasticsearch.common.blobstore.BlobContainer;
1314
import org.elasticsearch.core.internal.io.IOUtils;
@@ -122,7 +123,9 @@ private void readInternalBytes(final int part, long pos, final byte[] b, int off
122123
if (optimizedReadSize < length) {
123124
// we did not read everything in an optimized fashion, so read the remainder directly
124125
try (InputStream inputStream = openBlobStream(part, pos + optimizedReadSize, length - optimizedReadSize)) {
125-
final int directReadSize = inputStream.read(b, offset + optimizedReadSize, length - optimizedReadSize);
126+
final int directReadSize = readFully(inputStream, b, offset + optimizedReadSize, length - optimizedReadSize, () -> {
127+
throw new EOFException("Read past EOF at [" + position + "] with length [" + fileInfo.partBytes(part) + "]");
128+
});
126129
assert optimizedReadSize + directReadSize == length : optimizedReadSize + " and " + directReadSize + " vs " + length;
127130
position += directReadSize;
128131
}
@@ -280,6 +283,23 @@ private InputStream openBlobStream(int part, long pos, long length) throws IOExc
280283
return stream;
281284
}
282285

286+
/**
287+
* Fully read up to {@code length} bytes from the given {@link InputStream}
288+
*/
289+
private static int readFully(InputStream inputStream, byte[] b, int offset, int length, CheckedRunnable<IOException> onEOF)
290+
throws IOException {
291+
int totalRead = 0;
292+
while (totalRead < length) {
293+
final int read = inputStream.read(b, offset + totalRead, length - totalRead);
294+
if (read == -1) {
295+
onEOF.run();
296+
break;
297+
}
298+
totalRead += read;
299+
}
300+
return totalRead > 0 ? totalRead : -1;
301+
}
302+
283303
private static class StreamForSequentialReads implements Closeable {
284304
private final InputStream inputStream;
285305
private final int part;
@@ -299,7 +319,7 @@ boolean canContinueSequentialRead(int part, long pos) {
299319

300320
int read(byte[] b, int offset, int length) throws IOException {
301321
assert this.pos < maxPos : "should not try and read from a fully-read stream";
302-
int read = inputStream.read(b, offset, length);
322+
final int read = readFully(inputStream, b, offset, length, () -> {});
303323
assert read <= length : read + " vs " + length;
304324
pos += read;
305325
return read;

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
import java.io.ByteArrayInputStream;
1818
import java.io.EOFException;
19+
import java.io.FilterInputStream;
1920
import java.io.IOException;
21+
import java.io.InputStream;
2022
import java.nio.charset.StandardCharsets;
2123
import java.util.concurrent.atomic.AtomicInteger;
2224

@@ -64,9 +66,10 @@ private SearchableSnapshotIndexInput createIndexInput(final byte[] input, long p
6466

6567
onReadBlob.run();
6668

69+
final InputStream stream;
6770
if (fileInfo.numberOfParts() == 1L) {
6871
assertThat("Unexpected blob name [" + name + "]", name, equalTo(fileInfo.name()));
69-
return new ByteArrayInputStream(input, Math.toIntExact(position), Math.toIntExact(length));
72+
stream = new ByteArrayInputStream(input, Math.toIntExact(position), Math.toIntExact(length));
7073

7174
} else {
7275
assertThat("Unexpected blob name [" + name + "]", name, allOf(startsWith(fileInfo.name()), containsString(".part")));
@@ -75,7 +78,19 @@ private SearchableSnapshotIndexInput createIndexInput(final byte[] input, long p
7578
assertThat("Unexpected part number [" + partNumber + "] for [" + name + "]", partNumber,
7679
allOf(greaterThanOrEqualTo(0L), lessThan(fileInfo.numberOfParts())));
7780

78-
return new ByteArrayInputStream(input, Math.toIntExact(partNumber * partSize + position), Math.toIntExact(length));
81+
stream = new ByteArrayInputStream(input, Math.toIntExact(partNumber * partSize + position), Math.toIntExact(length));
82+
}
83+
84+
if (randomBoolean()) {
85+
return stream;
86+
} else {
87+
// sometimes serve less bytes than expected, in agreement with InputStream{@link #read(byte[], int, int)} javadoc
88+
return new FilterInputStream(stream) {
89+
@Override
90+
public int read(byte[] b, int off, int len) throws IOException {
91+
return super.read(b, off, randomIntBetween(1, len));
92+
}
93+
};
7994
}
8095
});
8196
return new SearchableSnapshotIndexInput(blobContainer, fileInfo, minimumReadSize,

0 commit comments

Comments
 (0)