7
7
8
8
import org .apache .lucene .store .BufferedIndexInput ;
9
9
import org .apache .lucene .store .IndexInput ;
10
+ import org .elasticsearch .common .Nullable ;
10
11
import org .elasticsearch .common .blobstore .BlobContainer ;
12
+ import org .elasticsearch .core .internal .io .IOUtils ;
11
13
import org .elasticsearch .index .snapshots .blobstore .BlobStoreIndexShardSnapshot .FileInfo ;
12
14
15
+ import java .io .Closeable ;
13
16
import java .io .EOFException ;
14
17
import java .io .IOException ;
15
18
import java .io .InputStream ;
32
35
* next read will occur. In the case of a Lucene file snapshotted into multiple parts, this position is used to identify which part must
33
36
* be read at which position (see {@link #readInternal(byte[], int, int)}. This position is also passed over to cloned and sliced input
34
37
* along with the {@link FileInfo} so that they can also track their reading position.
38
+ *
39
+ * The {@code sequentialReadSize} constructor parameter configures the {@link SearchableSnapshotIndexInput} to perform a larger read on the
40
+ * underlying {@link BlobContainer} than it needs in order to fill its internal buffer, on the assumption that the client is reading
41
+ * sequentially from this file and will consume the rest of this stream in due course. It keeps hold of the partially-consumed
42
+ * {@link InputStream} in {@code streamForSequentialReads}. Clones and slices, however, do not expect to be read sequentially and so make
43
+ * a new request to the {@link BlobContainer} each time their internal buffer needs refilling.
35
44
*/
36
45
public class SearchableSnapshotIndexInput extends BufferedIndexInput {
37
46
@@ -41,20 +50,30 @@ public class SearchableSnapshotIndexInput extends BufferedIndexInput {
41
50
private final long length ;
42
51
43
52
private long position ;
44
- private boolean closed ;
53
+ private volatile boolean closed ;
54
+
55
+ @ Nullable // if not currently reading sequentially
56
+ private StreamForSequentialReads streamForSequentialReads ;
57
+ private long sequentialReadSize ;
58
+ private static final long NO_SEQUENTIAL_READ_OPTIMIZATION = 0L ;
59
+
45
60
46
- public SearchableSnapshotIndexInput (final BlobContainer blobContainer , final FileInfo fileInfo ) {
47
- this ("SearchableSnapshotIndexInput(" + fileInfo .physicalName () + ")" , blobContainer , fileInfo , 0L , 0L , fileInfo .length ());
61
+ SearchableSnapshotIndexInput (final BlobContainer blobContainer , final FileInfo fileInfo , long sequentialReadSize , int bufferSize ) {
62
+ this ("SearchableSnapshotIndexInput(" + fileInfo .physicalName () + ")" , blobContainer , fileInfo , 0L , 0L , fileInfo .length (),
63
+ sequentialReadSize , bufferSize );
48
64
}
49
65
50
- private SearchableSnapshotIndexInput (final String resourceDesc , final BlobContainer blobContainer ,
51
- final FileInfo fileInfo , final long position , final long offset , final long length ) {
52
- super (resourceDesc );
66
+ private SearchableSnapshotIndexInput (final String resourceDesc , final BlobContainer blobContainer , final FileInfo fileInfo ,
67
+ final long position , final long offset , final long length , final long sequentialReadSize ,
68
+ final int bufferSize ) {
69
+ super (resourceDesc , bufferSize );
53
70
this .blobContainer = Objects .requireNonNull (blobContainer );
54
71
this .fileInfo = Objects .requireNonNull (fileInfo );
55
72
this .offset = offset ;
56
73
this .length = length ;
57
74
this .position = position ;
75
+ assert sequentialReadSize >= 0 ;
76
+ this .sequentialReadSize = sequentialReadSize ;
58
77
this .closed = false ;
59
78
}
60
79
@@ -73,12 +92,12 @@ private void ensureOpen() throws IOException {
73
92
protected void readInternal (byte [] b , int offset , int length ) throws IOException {
74
93
ensureOpen ();
75
94
if (fileInfo .numberOfParts () == 1L ) {
76
- readInternalBytes (0L , position , b , offset , length );
95
+ readInternalBytes (0 , position , b , offset , length );
77
96
} else {
78
97
int len = length ;
79
98
int off = offset ;
80
99
while (len > 0 ) {
81
- long currentPart = position / fileInfo .partSize ().getBytes ();
100
+ int currentPart = Math . toIntExact ( position / fileInfo .partSize ().getBytes () );
82
101
int remainingBytesInPart ;
83
102
if (currentPart < (fileInfo .numberOfParts () - 1 )) {
84
103
remainingBytesInPart = Math .toIntExact (((currentPart + 1L ) * fileInfo .partSize ().getBytes ()) - position );
@@ -93,12 +112,93 @@ protected void readInternal(byte[] b, int offset, int length) throws IOException
93
112
}
94
113
}
95
114
96
- private void readInternalBytes (final long part , final long pos , byte [] b , int offset , int length ) throws IOException {
97
- try (InputStream inputStream = blobContainer .readBlob (fileInfo .partName (part ), pos , length )) {
98
- int read = inputStream .read (b , offset , length );
99
- assert read == length ;
100
- position += read ;
115
+ private void readInternalBytes (final int part , long pos , final byte [] b , int offset , int length ) throws IOException {
116
+ int optimizedReadSize = readOptimized (part , pos , b , offset , length );
117
+ assert optimizedReadSize <= length ;
118
+ position += optimizedReadSize ;
119
+
120
+ if (optimizedReadSize < length ) {
121
+ // we did not read everything in an optimized fashion, so read the remainder directly
122
+ try (InputStream inputStream
123
+ = blobContainer .readBlob (fileInfo .partName (part ), pos + optimizedReadSize , length - optimizedReadSize )) {
124
+ final int directReadSize = inputStream .read (b , offset + optimizedReadSize , length - optimizedReadSize );
125
+ assert optimizedReadSize + directReadSize == length : optimizedReadSize + " and " + directReadSize + " vs " + length ;
126
+ position += directReadSize ;
127
+ }
128
+ }
129
+ }
130
+
131
+ /**
132
+ * Attempt to satisfy this read in an optimized fashion using {@code streamForSequentialReadsRef}.
133
+ * @return the number of bytes read
134
+ */
135
+ private int readOptimized (int part , long pos , byte [] b , int offset , int length ) throws IOException {
136
+ if (sequentialReadSize == NO_SEQUENTIAL_READ_OPTIMIZATION ) {
137
+ return 0 ;
138
+ }
139
+
140
+ int read = 0 ;
141
+ if (streamForSequentialReads == null ) {
142
+ // starting a new sequential read
143
+ read = readFromNewSequentialStream (part , pos , b , offset , length );
144
+ } else if (streamForSequentialReads .canContinueSequentialRead (part , pos )) {
145
+ // continuing a sequential read that we started previously
146
+ read = streamForSequentialReads .read (b , offset , length );
147
+ if (streamForSequentialReads .isFullyRead ()) {
148
+ // the current stream was exhausted by this read, so it should be closed
149
+ streamForSequentialReads .close ();
150
+ streamForSequentialReads = null ;
151
+ } else {
152
+ // the current stream contained enough data for this read and more besides, so we leave it in place
153
+ assert read == length : length + " remaining" ;
154
+ }
155
+
156
+ if (read < length ) {
157
+ // the current stream didn't contain enough data for this read, so we must read more
158
+ read += readFromNewSequentialStream (part , pos + read , b , offset + read , length - read );
159
+ }
160
+ } else {
161
+ // not a sequential read, so stop optimizing for this usage pattern and fall through to the unoptimized behaviour
162
+ assert streamForSequentialReads .isFullyRead () == false ;
163
+ sequentialReadSize = NO_SEQUENTIAL_READ_OPTIMIZATION ;
164
+ closeStreamForSequentialReads ();
101
165
}
166
+ return read ;
167
+ }
168
+
169
+ private void closeStreamForSequentialReads () throws IOException {
170
+ try {
171
+ IOUtils .close (streamForSequentialReads );
172
+ } finally {
173
+ streamForSequentialReads = null ;
174
+ }
175
+ }
176
+
177
+ /**
178
+ * If appropriate, open a new stream for sequential reading and satisfy the given read using it.
179
+ * @return the number of bytes read; if a new stream wasn't opened then nothing was read so the caller should perform the read directly.
180
+ */
181
+ private int readFromNewSequentialStream (int part , long pos , byte [] b , int offset , int length ) throws IOException {
182
+
183
+ assert streamForSequentialReads == null : "should only be called when a new stream is needed" ;
184
+ assert sequentialReadSize > 0L : "should only be called if optimizing sequential reads" ;
185
+
186
+ final long streamLength = Math .min (sequentialReadSize , fileInfo .partBytes (part ) - pos );
187
+ if (streamLength <= length ) {
188
+ // streamLength <= length so this single read will consume the entire stream, so there is no need to keep hold of it, so we can
189
+ // tell the caller to read the data directly
190
+ return 0 ;
191
+ }
192
+
193
+ // if we open a stream of length streamLength then it will not be completely consumed by this read, so it is worthwhile to open
194
+ // it and keep it open for future reads
195
+ final InputStream inputStream = blobContainer .readBlob (fileInfo .partName (part ), pos , streamLength );
196
+ streamForSequentialReads = new StreamForSequentialReads (inputStream , part , pos , streamLength );
197
+
198
+ final int read = streamForSequentialReads .read (b , offset , length );
199
+ assert read == length : read + " vs " + length ;
200
+ assert streamForSequentialReads .isFullyRead () == false ;
201
+ return read ;
102
202
}
103
203
104
204
@ Override
@@ -108,19 +208,30 @@ protected void seekInternal(long pos) throws IOException {
108
208
} else if (pos < 0L ) {
109
209
throw new IOException ("Seeking to negative position [" + pos + "] for " + toString ());
110
210
}
111
- this .position = offset + pos ;
211
+ if (position != offset + pos ) {
212
+ position = offset + pos ;
213
+ closeStreamForSequentialReads ();
214
+ }
112
215
}
113
216
114
217
@ Override
115
218
public BufferedIndexInput clone () {
116
- return new SearchableSnapshotIndexInput ("clone(" + this + ")" , blobContainer , fileInfo , position , offset , length );
219
+ return new SearchableSnapshotIndexInput ("clone(" + this + ")" , blobContainer , fileInfo , position , offset , length ,
220
+ // Clones might not be closed when they are no longer needed, but we must always close streamForSequentialReads. The simple
221
+ // solution: do not optimize sequential reads on clones.
222
+ NO_SEQUENTIAL_READ_OPTIMIZATION ,
223
+ getBufferSize ());
117
224
}
118
225
119
226
@ Override
120
227
public IndexInput slice (String sliceDescription , long offset , long length ) throws IOException {
121
228
if ((offset >= 0L ) && (length >= 0L ) && (offset + length <= length ())) {
122
- final SearchableSnapshotIndexInput slice =
123
- new SearchableSnapshotIndexInput (sliceDescription , blobContainer , fileInfo , position , this .offset + offset , length );
229
+ final SearchableSnapshotIndexInput slice = new SearchableSnapshotIndexInput (sliceDescription , blobContainer , fileInfo , position ,
230
+ this .offset + offset , length ,
231
+ // Slices might not be closed when they are no longer needed, but we must always close streamForSequentialReads. The simple
232
+ // solution: do not optimize sequential reads on slices.
233
+ NO_SEQUENTIAL_READ_OPTIMIZATION ,
234
+ getBufferSize ());
124
235
slice .seek (0L );
125
236
return slice ;
126
237
} else {
@@ -132,6 +243,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw
132
243
@ Override
133
244
public void close () throws IOException {
134
245
closed = true ;
246
+ closeStreamForSequentialReads ();
135
247
}
136
248
137
249
@ Override
@@ -144,4 +256,40 @@ public String toString() {
144
256
", position=" + position +
145
257
'}' ;
146
258
}
259
+
260
+ private static class StreamForSequentialReads implements Closeable {
261
+ private final InputStream inputStream ;
262
+ private final int part ;
263
+ private long pos ; // position within this part
264
+ private final long maxPos ;
265
+
266
+ StreamForSequentialReads (InputStream inputStream , int part , long pos , long streamLength ) {
267
+ this .inputStream = Objects .requireNonNull (inputStream );
268
+ this .part = part ;
269
+ this .pos = pos ;
270
+ this .maxPos = pos + streamLength ;
271
+ }
272
+
273
+ boolean canContinueSequentialRead (int part , long pos ) {
274
+ return this .part == part && this .pos == pos ;
275
+ }
276
+
277
+ int read (byte [] b , int offset , int length ) throws IOException {
278
+ assert this .pos < maxPos : "should not try and read from a fully-read stream" ;
279
+ int read = inputStream .read (b , offset , length );
280
+ assert read <= length : read + " vs " + length ;
281
+ pos += read ;
282
+ return read ;
283
+ }
284
+
285
+ boolean isFullyRead () {
286
+ assert this .pos <= maxPos ;
287
+ return this .pos >= maxPos ;
288
+ }
289
+
290
+ @ Override
291
+ public void close () throws IOException {
292
+ inputStream .close ();
293
+ }
294
+ }
147
295
}
0 commit comments