-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Introduce ChunkedBlobOutputStream #74620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce ChunkedBlobOutputStream #74620
Conversation
Extracted the chunked output stream logic from elastic#74313 and added tests for it to make it easier to review.
Pinging @elastic/es-distributed (Team:Distributed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - I don't find the usage of this class really obvious and I wonder if we should instead keep the parts list and buffer private (and to be passed as parameters to the flush buffer method) but given the deadlines here I'm fine with moving forward as it is.
|
||
protected ChunkedBlobOutputStream(BigArrays bigArrays, long maxBytesToBuffer) { | ||
this.bigArrays = bigArrays; | ||
this.maxBytesToBuffer = maxBytesToBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should check maxBytesToBuffer
> 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ added a check
public final void close() throws IOException { | ||
if (closed) { | ||
assert false : "this output stream should only be closed once"; | ||
throw new AlreadyClosedException("already closed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should just ignore double closing? I think that's what is usually done in many streams (but we should keep the assertion)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of weird to have an assertion here but quietly skip over double closing otherwise? If we had a bug here that would only show in some corner case we'd never see it in logs otherwise?
/** | ||
* Mark all blob bytes as properly received by {@link #write}, indicating that {@link #close} may finalize the blob. | ||
*/ | ||
public final void markSuccess() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really a suggestion:
public final void markSuccess() { | |
public final void done() { |
|
||
@Override | ||
protected void onCompletion() throws IOException { | ||
if (buffer.size() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be handled by ChunkedBlobOutputStream itself I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, I couldn't find a neat way of encapsulating this (without adding even more complexity at least) because of the way the onCompletion
will do a normal write if nothing has been buffered where it needs access to the buffer anyway.
final BytesReference bytes = buffer.bytes(); | ||
bytes.writeTo(out); | ||
writtenBytesCounter.addAndGet(bytes.length()); | ||
finishPart(partIdSupplier.incrementAndGet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could make flushBuffer() return the part identifier and have the logic that exist in finishPart(()
being private in ChunkedBlobOutputStream too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bit of complexity to doing this I think with all the error handling and special casing so different across implementations, but I'll try and see if I can do something nicer in the main PR (don't want to break the API here now and I'm having ab it of a hard time to reason through all edge cases around API changes from the main PR). But in general I'm ++ to the idea if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I agree with Tanguy, the usage of this class is a bit difficult to follow, but as we discussed it's difficult to handle logic failures not related to the upload itself.
Thanks Tanguy and Francisco! |
Extracted the chunked output stream logic from elastic#74313 and added tests for it to make it easier to review.
This PR adds a new API for doing streaming serialization writes to a repository to enable repository metadata of arbitrary size and at bounded memory during writing. The existing write-APIs require knowledge of the eventual blob size beforehand. This forced us to materialize the serialized blob in memory before writing, costing a lot of memory in case of e.g. very large RepositoryData (and limiting us to 2G max blob size). With this PR the requirement to fully materialize the serialized metadata goes away and the memory overhead becomes completely bounded by the outbound buffer size of the repository implementation. As we move to larger repositories this makes master node stability a lot more predictable since writing out RepositoryData does not take as much memory any longer (same applies to shard level metadata), enables aggregating multiple metadata blobs into a single larger blobs without massive overhead and removes the 2G size limit on RepositoryData. backport of #74313 and #74620
See elastic#53119 for more context about why those tests are muted on JDK8. They start failing more often recently now elastic#74313 and elastic#74620 have been merged, as reported in elastic#74739.
Extracted the chunked output stream logic from #74313 and added tests for it to make it easier to review.