-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Optimize sequential reads in SearchableSnapshotIndexInput #51230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize sequential reads in SearchableSnapshotIndexInput #51230
Conversation
Today `SearchableSnapshotIndexInput` translates each `readBytesInternal` call to one or more calls to `readBlob` on the underlying repository. We make a lot of small `readBytesInternal` calls since they are used to fill a small in-memory buffer. Calls to `readBlob` are expensive: blob storage providers like AWS S3 charge money per API call. A common usage pattern is to take a brand-new `IndexInput`, seek to a particular location, and then sequentially read a substantial amount of data and stream it to disk. This commit optimizes the implementation for that specific usage pattern. Rather than calling `readBlob` each time the internal buffer needs filling we instead request a (potentially much larger) range of the blob and consume the response bit-by-bit as needed by a sequentially-reading client.
Pinging @elastic/es-distributed (:Distributed/Snapshot/Restore) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left some comments, but I'm not sure I understand the concurrency model.
...hable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java
Outdated
Show resolved
Hide resolved
...hable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java
Outdated
Show resolved
Hide resolved
...able-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java
Outdated
Show resolved
Hide resolved
assert streamForSequentialReads.isFullyRead() == false; | ||
int read = streamForSequentialReads.inputStream.read(b, offset, length); | ||
assert read <= length : read + " vs " + length; | ||
streamForSequentialReads.pos += read; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we put this logic into StreamForSequentialReads? Perhaps that class could enforce that only sequential reads are possible from the stream (and offer a method to say isSequentialReadPossible), with the logic in this class here just trying to call these methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...able-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java
Outdated
Show resolved
Hide resolved
...able-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
// read part of a blob directly; the code above falls through to this case where there is no optimization possible |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe put everything above this into a readOptimized()
method that returns a boolean (denoting whether it read or not). This will allow having so many explicit returns in the above code (and the deliberate fall-through logic).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...able-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java
Outdated
Show resolved
Hide resolved
return true; | ||
} else { | ||
// streamLength <= length so this single read will consume the entire stream, so there is no need to keep hold of it, so we can | ||
// tell the caller to read the data directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not use this existing open stream as much as possible? We might not be able to read the full bytes from this stream, but perhaps we can use it to read everything up to streamLength, and subsequently request a new stream for the rest? This might avoid redownloading data in case where the buffer size is not a proper divisor of sequentialReadSize?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point we don't have an existing open stream, we're trying to create a new one. If we can satisfy part of a read from the existing stream then we do so (see comment containing the string the current stream didn't contain enough data for this read, so we must read more
).
…e-snapshot-readahead
Failure of elasticsearch-ci/2 looks like #51347; checkstyle fix is incoming. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - I left only minor comments. Thanks for the additional tests and the many comments that helps to review this 👍
this("SearchableSnapshotIndexInput(" + fileInfo.physicalName() + ")", blobContainer, fileInfo, 0L, 0L, fileInfo.length()); | ||
// optimisation for the case where we perform a single seek, then read a large block of data sequentially, then close the input | ||
@Nullable // if not currently reading sequentially | ||
private StreamForSequentialReads streamForSequentialReads; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe update the class javadoc to explain how/why we use this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, done in f18251a
assert streamForSequentialReads.isFullyRead() == false; | ||
sequentialReadSize = NO_SEQUENTIAL_READ_OPTIMIZATION; | ||
IOUtils.close(streamForSequentialReads); | ||
streamForSequentialReads = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're closing + nullify the streamForSequentialReads
many times, maybe it deserves its own closeSequentialStream()
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, this is no longer a one-liner. Done in c9cf7bc.
* @return the number of bytes read; if a new stream wasn't opened then nothing was read so the caller should perform the read directly. | ||
*/ | ||
private int readFromNewSequentialStream(int part, long pos, byte[] b, int offset, int length) | ||
throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method signature can fit on a single line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It didn't always ;) Fixed in 51d2af5
if (position != offset + pos) { | ||
position = offset + pos; | ||
IOUtils.close(streamForSequentialReads); | ||
streamForSequentialReads = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe nullify in a finally block, just in case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good point, done in c9cf7bc.
} | ||
|
||
@Override | ||
public BufferedIndexInput clone() { | ||
return new SearchableSnapshotIndexInput("clone(" + this + ")", blobContainer, fileInfo, position, offset, length); | ||
return new SearchableSnapshotIndexInput("clone(" + this + ")", blobContainer, fileInfo, position, offset, length, | ||
NO_SEQUENTIAL_READ_OPTIMIZATION, getBufferSize()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a small word on why we can't read optimized for clones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments added in 2fee6b8.
Thanks @tlrx |
Today
SearchableSnapshotIndexInput
translates eachreadBytesInternal
callto one or more calls to
readBlob
on the underlying repository. We make a lotof small
readBytesInternal
calls since they are used to fill a smallin-memory buffer. Calls to
readBlob
are expensive: blob storage providerslike AWS S3 charge money per API call.
A common usage pattern is to take a brand-new
IndexInput
, seek to aparticular location, and then sequentially read a substantial amount of data
and stream it to disk.
This commit optimizes the implementation for that specific usage pattern.
Rather than calling
readBlob
each time the internal buffer needs filling weinstead request a (potentially much larger) range of the blob and consume the
response bit-by-bit as needed by a sequentially-reading client.