-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Unify Stream Copy Buffer Usage #56078
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unify Stream Copy Buffer Usage #56078
Conversation
We have various ways of copying between two streams and handling thread-local buffers throughout the codebase. This commit unifies a number of them and removes buffer allocations in many spots. I do not think that increasing the thread local buffer size from 1k to 8k is a problem in the affected spots: First off, we almost always will do string reading and writing on the same thread if we do one of the two operations on it so in most cases it's a 2k to 8k increase to begin with. Second, with an Xss of 1M we are increasing the per thread allocation by .6% to .8% only while saving a lot of temporary byte array allocations in spots where we are often just copying from memory to memory anyway (XContent raw fields, (de)-compressing, ...). Plus, the larger thread local buffers for string operations will cause fewer allocations when reading large strings as well and make writing large strings a little faster also.
Pinging @elastic/es-core-infra (:Core/Infra/Core) |
While I agree the size is not a concern, exposing a shared static like this worries me. This is a truly global buffer, with zero protections of re-use/overwriting within the same call stack. |
Fair point, but I'd argue that the Java docs on this are pretty clear and it's not too non-standard off a pattern (e.g. Netty and Cassandra use the in many spots). Also, we shouldn't really write any code ever (and didn't so far) that escapes a |
Jenkins run elasticsearch-ci/1 (known unrelated failure) |
@rjernst ping :) I think I still like this one, wdyt? |
I'm still hesitant to have a truly global buffer (accessed via a public static method). It doesn't seem like we have that many uses. Could we instead keep a couple static/private in their current locations (and increase the size of necessary)? I would rather have a handful of these buffers in key places reused rather than assume all code is correctly using the buffer across class boundaries (eg not calling into another method that uses the buffer while still decoding the current buffer). Perhaps others have opinions on this, though. |
Sounds good I reverted the public buffer for now and went back to using the in-class buffers we had before where we had them. I left the 2 or 3 spots where we could use thread-local buffers as well that I initially changed, unchanged now because they're not really on the hot path (I think at least) and it probably doesn't make much sense to use a thread-local there for now. |
return tempBuffer.get(); | ||
} | ||
|
||
public static long doCopy(InputStream in, OutputStream out, byte[] buffer) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we name this method copy
and also move the try/catch/finally logic here? I guess we can add a flag about whether to close the streams and you can various methods like:
public static long copy(final InputStream in, final OutputStream out) throws IOException {
copy(in, out, buffer.get(), true);
}
public static long copy(final InputStream in, final OutputStream out, boolean close) throws IOException {
copy(in, out, buffer.get(), close);
}
public static long copy(final InputStream in, final OutputStream out, byte[] buffer) {
copy(in, out, buffer, true);
}
public static long copy(final InputStream in, final OutputStream out, byte[] buffer, boolean close) throws IOException {
Exception err = null;
try {
long byteCount = 0;
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead);
byteCount += bytesRead;
}
out.flush();
return byteCount;
} catch (Exception e) {
err = e;
throw e;
} finally {
if (close) {
IOUtils.close(err, in, out);
}
}
}
libs/core/src/main/java/org/elasticsearch/core/internal/io/Streams.java
Outdated
Show resolved
Hide resolved
* @throws IOException in case of I/O errors | ||
*/ | ||
private static long copyStream(InputStream in, OutputStream out) throws IOException { | ||
private static void copyStream(InputStream in, OutputStream out) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should just be able to remove this method and use Streams#copy
?
@@ -309,7 +309,9 @@ public boolean isOpen() { | |||
public void close() throws IOException { | |||
SocketAccess.doPrivilegedVoidIOException(writeChannel::close); | |||
} | |||
}), buffer); | |||
})) { | |||
org.elasticsearch.core.internal.io.Streams.doCopy(inputStream, out, buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With my suggestion, we can just let Streams.copy
handle the closing here?
@@ -212,7 +212,8 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream | |||
private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException { | |||
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) { | |||
final int bufferSize = blobStore.bufferSizeInBytes(); | |||
Streams.copy(inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]); | |||
org.elasticsearch.core.internal.io.Streams.doCopy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should consolidate the two Streams
classes since the internal class was added for splitting the method out for a MR jar to take advantage of JDK9's transferTo
method in #29322, but we don't use transferTo
anymore for copying and on master we are already at JDK11 for a minimum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ to that idea but maybe do it in another PR since that involves quite a few unrelated changes I think (since the one in :server
uses BytesReference
in a bunch of places and such).
Jenkins run elasticsearch-ci/2 (unrelated CCR failure) |
Thanks @jaymode I applied your suggestions, looks much cleaner now I think :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks Jay & Ryan! |
We have various ways of copying between two streams and handling thread-local buffers throughout the codebase. This commit unifies a number of them and removes buffer allocations in many spots.
Small oversight in elastic#56078 that only showed up during backporting.
Small oversight in #56078 that only showed up during backporting where a stream copy was turned from a non-closing to a closing one. Enhanced part of a test in this PR to make it show up in master also even though we practically never use this method with stream targets that actually close.
Small oversight in elastic#56078 that only showed up during backporting where a stream copy was turned from a non-closing to a closing one. Enhanced part of a test in this PR to make it show up in master also even though we practically never use this method with stream targets that actually close.
Small oversight in #56078 that only showed up during backporting where a stream copy was turned from a non-closing to a closing one. Enhanced part of a test in this PR to make it show up in master also even though we practically never use this method with stream targets that actually close.
We have various ways of copying between two streams and handling thread-local
buffers and stream copying throughout the code-base. This commit unifies a number of spots and
removes buffer allocations in many spots.
I do not think that increasing the thread local buffer size from 1k to 8k is a problem in the affected spots:
First off, we almost always will do string reading and writing on the same thread
if we do one of the two operations on it so in most cases it's only a 6k increase to begin with.
Second, with an Xss of 1M we are increasing the per thread allocation by less than a percent at worst only while saving a lot of temporary byte array allocations in all kinds of common spots (raw xcontent fields, (de-)compressing and so on). Plus, having these thread-local buffers occupy a little space instead of having lots of uncollected 8k buffers on heap makes the circuit breaker a little more accurate as well.