-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Abort non-fully consumed S3 input streams instead of draining #62167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pinging @elastic/es-distributed (:Distributed/Snapshot/Restore) |
Sorry for the delay here @tlrx , I'm taking a look today :) |
Take your time, no hurry :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I wonder if we can make this change a lot smaller though by just not bothering with the object metadata in the way suggested inline?
|
||
private InputStream currentStream; | ||
private S3ObjectInputStream currentStream; | ||
private long currentStreamLastOffset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment below, I think this isn't necessary potentially.
return metadata.getContentLength(); | ||
} catch (Exception e) { | ||
assert false : e; | ||
return Long.MAX_VALUE - 1L; // assume a large stream so that the underlying stream is aborted on closing, unless eof is reached |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, maybe we should just use our own end
and start
offsets or use metadata.getContentLength()
if we don't have an end
instead of going through the indirection of the SDH header parsing here? That seems a lot more straight forward to me and doesn't require us to be scared of random exceptions from SDK misbehavior?
Also, then we could just make our life real easy. If eof
is set to true
or start + currentOffset == currentStreamLastOffset
-> close, else abort
. No need to even get the length from the metadata because any open ended stream of unknown length we'd read till EOF anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to even get the length from the metadata because any open ended stream of unknown length we'd read till EOF anyway?
That's what I'm trying to avoid here; if we know the exact range or if we don't know it, the S3 endpoint should return the content length and we can use it to know if all bytes were really consumed. The exceptional case here should never happen and in this case we set an extra large end
which should force anyway the stream to be aborted before closing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, I guess I still don't really like that we redundantly store the lengths and offsets here to some degree, but I also just noticed we we do the same in the GCS stream as well. I suppose this approach is the safest for now :) => let's go with it then.
* suppressing all thrown exceptions. | ||
*/ | ||
private void maybeAbort(S3ObjectInputStream stream) { | ||
if (eof) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (eof || start + currentOffset == currentStreamLastOffset) {
and drop the conditional from the try {
since both cases mean the same to us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let's do it you way :) Thanks Tanguy!
return metadata.getContentLength(); | ||
} catch (Exception e) { | ||
assert false : e; | ||
return Long.MAX_VALUE - 1L; // assume a large stream so that the underlying stream is aborted on closing, unless eof is reached |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, I guess I still don't really like that we redundantly store the lengths and offsets here to some degree, but I also just noticed we we do the same in the GCS stream as well. I suppose this approach is the safest for now :) => let's go with it then.
@elasticmachine update branch |
Thanks Armin! |
Today when an S3RetryingInputStream is closed the remaining bytes that were not consumed are drained right before closing the underlying stream. In some contexts it might be more efficient to not consume the remaining bytes and just drop the connection. This is for example the case with snapshot backed indices prewarming, where there is not point in reading potentially large blobs if we know the cache file we want to write the content of the blob as already been evicted. Draining all bytes here takes a slot in the prewarming thread pool for nothing.
Today when an
S3RetryingInputStream
is closed the remaining bytes that were not consumed are drained right before closing the underlying stream. In some contexts it might be more efficient to not consume the remaining bytes and just drop the connection.This is for example the case with snapshot backed indices prewarming, where there is not point in reading potentially large blobs if we know the cache file we want to write the content of the blob as already been evicted. Draining all bytes here takes a slot in the prewarming thread pool for nothing.
Regular snapshot restores could also benefit from dropping connection instead of draining bytes in the case the restore is aborted. As of today, the restoring of the file continues even if the restore was aborted and takes a slot in the snapshot thread pool. By throwing an appropriate exception and aborting the S3 input stream we could quickly stop the download and free up the slot in the snapshot thread pool (could be done in a follow up PR).