Skip to content

Commit 47ec08d

Browse files
authored
CacheBufferedIndexInput should throw EOFException (#53975)
InputStream#read may return -1 if it reaches EOF. This is unexpected in a `CacheBufferedIndexInput` but may happen if, for instance, the underlying file is truncated. Today we don't handle this specially and an EOF can cause strange behaviour, often throwing an `ArrayIndexOutOfBoundsException`. This commit adds better handling for EOF by throwing an `EOFException`.
1 parent 4e4a705 commit 47ec08d

File tree

2 files changed

+60
-3
lines changed

2 files changed

+60
-3
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInput.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.InputStream;
2626
import java.nio.ByteBuffer;
2727
import java.nio.channels.FileChannel;
28+
import java.util.Locale;
2829
import java.util.concurrent.atomic.AtomicBoolean;
2930
import java.util.concurrent.atomic.AtomicReference;
3031

@@ -129,15 +130,15 @@ protected void readInternal(final byte[] buffer, final int offset, final int len
129130
lastSeekPosition = lastReadPosition;
130131
}
131132

132-
int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
133+
private int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
133134
assert assertFileChannelOpen(fc);
134135
int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position)));
135136
stats.addCachedBytesRead(bytesRead);
136137
return bytesRead;
137138
}
138139

139140
@SuppressForbidden(reason = "Use positional writes on purpose")
140-
void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
141+
private void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
141142
assert assertFileChannelOpen(fc);
142143
final long length = end - start;
143144
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))];
@@ -151,6 +152,10 @@ void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
151152
while (remaining > 0) {
152153
final int len = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length;
153154
int bytesRead = input.read(copyBuffer, 0, len);
155+
if (bytesRead == -1) {
156+
throw new EOFException(String.format(Locale.ROOT,
157+
"unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s", start, end, remaining, cacheFileReference));
158+
}
154159
fc.write(ByteBuffer.wrap(copyBuffer, 0, bytesRead), start + bytesCopied);
155160
bytesCopied += bytesRead;
156161
remaining -= bytesRead;
@@ -215,7 +220,11 @@ private int readDirectly(long start, long end, byte[] buffer, int offset) throws
215220
while (remaining > 0) {
216221
final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length;
217222
int bytesRead = input.read(copyBuffer, 0, len);
218-
System.arraycopy(copyBuffer, 0, buffer, offset + bytesCopied, len);
223+
if (bytesRead == -1) {
224+
throw new EOFException(String.format(Locale.ROOT,
225+
"unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s", start, end, remaining, cacheFileReference));
226+
}
227+
System.arraycopy(copyBuffer, 0, buffer, offset + bytesCopied, bytesRead);
219228
bytesCopied += bytesRead;
220229
remaining -= bytesRead;
221230
}

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import org.elasticsearch.snapshots.SnapshotId;
2020
import org.elasticsearch.snapshots.mockstore.BlobContainerWrapper;
2121

22+
import java.io.EOFException;
2223
import java.io.FilterInputStream;
2324
import java.io.IOException;
2425
import java.io.InputStream;
2526
import java.nio.charset.StandardCharsets;
2627
import java.nio.file.Path;
28+
import java.util.HashSet;
2729
import java.util.List;
2830
import java.util.Objects;
2931
import java.util.concurrent.atomic.LongAdder;
@@ -81,6 +83,52 @@ public void testRandomReads() throws IOException {
8183
}
8284
}
8385

86+
public void testThrowsEOFException() throws IOException {
87+
try (CacheService cacheService = createCacheService(random())) {
88+
cacheService.start();
89+
90+
SnapshotId snapshotId = new SnapshotId("_name", "_uuid");
91+
IndexId indexId = new IndexId("_name", "_uuid");
92+
ShardId shardId = new ShardId("_name", "_uuid", 0);
93+
94+
final String fileName = randomAlphaOfLength(10);
95+
final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8);
96+
97+
final String blobName = randomUnicodeOfLength(10);
98+
final StoreFileMetaData metaData = new StoreFileMetaData(fileName, input.length + 1, "_na", Version.CURRENT.luceneVersion);
99+
final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L,
100+
List.of(new BlobStoreIndexShardSnapshot.FileInfo(blobName, metaData, new ByteSizeValue(input.length + 1))), 0L, 0L, 0, 0L);
101+
102+
final BlobContainer blobContainer = singleBlobContainer(blobName, input);
103+
104+
final Path cacheDir = createTempDir();
105+
try (CacheDirectory cacheDirectory
106+
= new CacheDirectory(snapshot, blobContainer, cacheService, cacheDir, snapshotId, indexId, shardId, () -> 0L)) {
107+
try (IndexInput indexInput = cacheDirectory.openInput(fileName, newIOContext(random()))) {
108+
final byte[] buffer = new byte[input.length + 1];
109+
final IOException exception = expectThrows(IOException.class, () -> indexInput.readBytes(buffer, 0, buffer.length));
110+
if (containsEOFException(exception, new HashSet<>()) == false) {
111+
throw new AssertionError("inner EOFException not thrown", exception);
112+
}
113+
}
114+
}
115+
}
116+
}
117+
118+
private boolean containsEOFException(Throwable throwable, HashSet<Throwable> seenThrowables) {
119+
if (throwable == null || seenThrowables.add(throwable) == false) {
120+
return false;
121+
}
122+
if (throwable instanceof EOFException) {
123+
return true;
124+
}
125+
for (Throwable suppressed : throwable.getSuppressed()) {
126+
if (containsEOFException(suppressed, seenThrowables)) {
127+
return true;
128+
}
129+
}
130+
return containsEOFException(throwable.getCause(), seenThrowables);
131+
}
84132

85133
/**
86134
* BlobContainer that counts the number of {@link java.io.InputStream} it opens, as well as the

0 commit comments

Comments
 (0)