|
8 | 8 | import org.apache.lucene.store.IOContext;
|
9 | 9 | import org.apache.lucene.store.IndexInput;
|
10 | 10 | import org.elasticsearch.common.CheckedRunnable;
|
| 11 | +import org.elasticsearch.common.CheckedSupplier; |
11 | 12 | import org.elasticsearch.common.Nullable;
|
12 | 13 | import org.elasticsearch.common.blobstore.BlobContainer;
|
13 | 14 | import org.elasticsearch.core.internal.io.IOUtils;
|
|
22 | 23 | import java.io.IOException;
|
23 | 24 | import java.io.InputStream;
|
24 | 25 | import java.util.Objects;
|
| 26 | +import java.util.concurrent.atomic.LongAdder; |
25 | 27 |
|
26 | 28 | /**
|
27 | 29 | * A {@link DirectBlobContainerIndexInput} instance corresponds to a single file from a Lucene directory that has been snapshotted. Because
|
@@ -194,36 +196,38 @@ private int readFromNewSequentialStream(int part, long pos, byte[] b, int offset
|
194 | 196 | return 0;
|
195 | 197 | }
|
196 | 198 |
|
197 |
| - final long startTimeNanos = directory.statsCurrentTimeNanos(); |
198 |
| - |
199 | 199 | // if we open a stream of length streamLength then it will not be completely consumed by this read, so it is worthwhile to open
|
200 | 200 | // it and keep it open for future reads
|
201 | 201 | final InputStream inputStream = openBlobStream(part, pos, streamLength);
|
202 | 202 | streamForSequentialReads = new StreamForSequentialReads(new FilterInputStream(inputStream) {
|
203 |
| - private int bytesRead = 0; |
| 203 | + private LongAdder bytesRead = new LongAdder(); |
| 204 | + private LongAdder timeNanos = new LongAdder(); |
204 | 205 |
|
205 |
| - @Override |
206 |
| - public int read() throws IOException { |
207 |
| - final int result = super.read(); |
| 206 | + private int onOptimizedRead(CheckedSupplier<Integer, IOException> read) throws IOException { |
| 207 | + final long startTimeNanos = directory.statsCurrentTimeNanos(); |
| 208 | + final int result = read.get(); |
| 209 | + final long endTimeNanos = directory.statsCurrentTimeNanos(); |
208 | 210 | if (result != -1) {
|
209 |
| - bytesRead += result; |
| 211 | + bytesRead.add(result); |
| 212 | + timeNanos.add(endTimeNanos - startTimeNanos); |
210 | 213 | }
|
211 | 214 | return result;
|
212 | 215 | }
|
213 | 216 |
|
| 217 | + @Override |
| 218 | + public int read() throws IOException { |
| 219 | + return onOptimizedRead(super::read); |
| 220 | + } |
| 221 | + |
214 | 222 | @Override
|
215 | 223 | public int read(byte[] b, int off, int len) throws IOException {
|
216 |
| - final int result = super.read(b, off, len); |
217 |
| - if (result != -1) { |
218 |
| - bytesRead += result; |
219 |
| - } |
220 |
| - return result; |
| 224 | + return onOptimizedRead(() -> super.read(b, off, len)); |
221 | 225 | }
|
222 | 226 |
|
223 | 227 | @Override
|
224 | 228 | public void close() throws IOException {
|
225 | 229 | super.close();
|
226 |
| - stats.addOptimizedBytesRead(bytesRead, directory.statsCurrentTimeNanos() - startTimeNanos); |
| 230 | + stats.addOptimizedBytesRead(Math.toIntExact(bytesRead.sumThenReset()), timeNanos.sumThenReset()); |
227 | 231 | }
|
228 | 232 | }, part, pos, streamLength);
|
229 | 233 |
|
|
0 commit comments