|
7 | 7 |
|
8 | 8 | import org.apache.lucene.store.BufferedIndexInput;
|
9 | 9 | import org.apache.lucene.store.IndexInput;
|
| 10 | +import org.apache.lucene.util.BytesRef; |
10 | 11 | import org.elasticsearch.common.Nullable;
|
11 | 12 | import org.elasticsearch.common.blobstore.BlobContainer;
|
12 | 13 | import org.elasticsearch.core.internal.io.IOUtils;
|
13 | 14 | import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
|
14 | 15 |
|
| 16 | +import java.io.ByteArrayInputStream; |
15 | 17 | import java.io.Closeable;
|
16 | 18 | import java.io.EOFException;
|
17 | 19 | import java.io.IOException;
|
@@ -119,8 +121,7 @@ private void readInternalBytes(final int part, long pos, final byte[] b, int off
|
119 | 121 |
|
120 | 122 | if (optimizedReadSize < length) {
|
121 | 123 | // we did not read everything in an optimized fashion, so read the remainder directly
|
122 |
| - try (InputStream inputStream |
123 |
| - = blobContainer.readBlob(fileInfo.partName(part), pos + optimizedReadSize, length - optimizedReadSize)) { |
| 124 | + try (InputStream inputStream = openBlobStream(part, pos + optimizedReadSize, length - optimizedReadSize)) { |
124 | 125 | final int directReadSize = inputStream.read(b, offset + optimizedReadSize, length - optimizedReadSize);
|
125 | 126 | assert optimizedReadSize + directReadSize == length : optimizedReadSize + " and " + directReadSize + " vs " + length;
|
126 | 127 | position += directReadSize;
|
@@ -192,7 +193,7 @@ private int readFromNewSequentialStream(int part, long pos, byte[] b, int offset
|
192 | 193 |
|
193 | 194 | // if we open a stream of length streamLength then it will not be completely consumed by this read, so it is worthwhile to open
|
194 | 195 | // it and keep it open for future reads
|
195 |
| - final InputStream inputStream = blobContainer.readBlob(fileInfo.partName(part), pos, streamLength); |
| 196 | + final InputStream inputStream = openBlobStream(part, pos, streamLength); |
196 | 197 | streamForSequentialReads = new StreamForSequentialReads(inputStream, part, pos, streamLength);
|
197 | 198 |
|
198 | 199 | final int read = streamForSequentialReads.read(b, offset, length);
|
@@ -257,6 +258,28 @@ public String toString() {
|
257 | 258 | '}';
|
258 | 259 | }
|
259 | 260 |
|
| 261 | + private InputStream openBlobStream(int part, long pos, long length) throws IOException { |
| 262 | + final InputStream stream; |
| 263 | + if (fileInfo.metadata().hashEqualsContents() == false) { |
| 264 | + stream = blobContainer.readBlob(fileInfo.partName(part), pos, length); |
| 265 | + } else { |
| 266 | + // extract blob content from metadata hash |
| 267 | + final BytesRef data = fileInfo.metadata().hash(); |
| 268 | + if (part > 0) { |
| 269 | + assert fileInfo.numberOfParts() >= part; |
| 270 | + for (int i = 0; i < part; i++) { |
| 271 | + pos += fileInfo.partBytes(i); |
| 272 | + } |
| 273 | + } |
| 274 | + if ((pos < 0L) || (length < 0L) || (pos + length > data.bytes.length)) { |
| 275 | + throw new IllegalArgumentException("Invalid arguments (pos=" + pos + ", length=" + length |
| 276 | + + ") for hash content (length=" + data.bytes.length + ')'); |
| 277 | + } |
| 278 | + stream = new ByteArrayInputStream(data.bytes, Math.toIntExact(pos), Math.toIntExact(length)); |
| 279 | + } |
| 280 | + return stream; |
| 281 | + } |
| 282 | + |
260 | 283 | private static class StreamForSequentialReads implements Closeable {
|
261 | 284 | private final InputStream inputStream;
|
262 | 285 | private final int part;
|
|
0 commit comments