Skip to content

Commit e28dbde

Browse files
Unify Stream Copy Buffer Usage (#56078)
We have various ways of copying between two streams and handling thread-local buffers throughout the codebase. This commit unifies a number of them and removes buffer allocations in many spots.
1 parent 44c799f commit e28dbde

File tree

10 files changed

+54
-109
lines changed

10 files changed

+54
-109
lines changed

libs/core/src/main/java/org/elasticsearch/core/internal/io/Streams.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,30 +30,61 @@
3030
*/
3131
public class Streams {
3232

33+
private static final ThreadLocal<byte[]> buffer = ThreadLocal.withInitial(() -> new byte[8 * 1024]);
34+
3335
private Streams() {
3436

3537
}
3638

3739
/**
38-
* Copy the contents of the given InputStream to the given OutputStream. Closes both streams when done.
40+
* Copy the contents of the given InputStream to the given OutputStream. Optionally, closes both streams when done.
3941
*
40-
* @param in the stream to copy from
41-
* @param out the stream to copy to
42+
* @param in the stream to copy from
43+
* @param out the stream to copy to
44+
* @param close whether to close both streams after copying
45+
* @param buffer buffer to use for copying
4246
* @return the number of bytes copied
4347
* @throws IOException in case of I/O errors
4448
*/
45-
public static long copy(final InputStream in, final OutputStream out) throws IOException {
49+
public static long copy(final InputStream in, final OutputStream out, byte[] buffer, boolean close) throws IOException {
4650
Exception err = null;
4751
try {
48-
final long byteCount = in.transferTo(out);
52+
long byteCount = 0;
53+
int bytesRead;
54+
while ((bytesRead = in.read(buffer)) != -1) {
55+
out.write(buffer, 0, bytesRead);
56+
byteCount += bytesRead;
57+
}
4958
out.flush();
5059
return byteCount;
5160
} catch (IOException | RuntimeException e) {
5261
err = e;
5362
throw e;
5463
} finally {
55-
IOUtils.close(err, in, out);
64+
if (close) {
65+
IOUtils.close(err, in, out);
66+
}
5667
}
5768
}
5869

70+
/**
71+
* @see #copy(InputStream, OutputStream, byte[], boolean)
72+
*/
73+
public static long copy(final InputStream in, final OutputStream out, boolean close) throws IOException {
74+
return copy(in, out, buffer.get(), close);
75+
}
76+
77+
/**
78+
* @see #copy(InputStream, OutputStream, byte[], boolean)
79+
*/
80+
public static long copy(final InputStream in, final OutputStream out, byte[] buffer) throws IOException {
81+
return copy(in, out, buffer, true);
82+
}
83+
84+
/**
85+
* @see #copy(InputStream, OutputStream, byte[], boolean)
86+
*/
87+
public static long copy(final InputStream in, final OutputStream out) throws IOException {
88+
return copy(in, out, buffer.get(), true);
89+
}
5990
}

libs/x-content/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java

Lines changed: 3 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.elasticsearch.common.xcontent.XContentParser;
3737
import org.elasticsearch.common.xcontent.XContentType;
3838
import org.elasticsearch.common.xcontent.support.filtering.FilterPathBasedFilter;
39-
import org.elasticsearch.core.internal.io.IOUtils;
39+
import org.elasticsearch.core.internal.io.Streams;
4040

4141
import java.io.BufferedInputStream;
4242
import java.io.IOException;
@@ -349,7 +349,7 @@ public void writeRawField(String name, InputStream content, XContentType content
349349
} else {
350350
writeStartRaw(name);
351351
flush();
352-
copyStream(content, os);
352+
Streams.copy(content, os, false);
353353
writeEndRaw();
354354
}
355355
}
@@ -364,24 +364,11 @@ public void writeRawValue(InputStream stream, XContentType xContentType) throws
364364
generator.writeRaw(':');
365365
}
366366
flush();
367-
transfer(stream, os);
367+
Streams.copy(stream, os);
368368
writeEndRaw();
369369
}
370370
}
371371

372-
// A basic copy of Java 9's InputStream#transferTo
373-
private static long transfer(InputStream in, OutputStream out) throws IOException {
374-
Objects.requireNonNull(out, "out");
375-
long transferred = 0;
376-
byte[] buffer = new byte[8192];
377-
int read;
378-
while ((read = in.read(buffer, 0, 8192)) >= 0) {
379-
out.write(buffer, 0, read);
380-
transferred += read;
381-
}
382-
return transferred;
383-
}
384-
385372
private boolean mayWriteRawData(XContentType contentType) {
386373
// When the current generator is filtered (ie filter != null)
387374
// or the content is in a different format than the current generator,
@@ -480,37 +467,4 @@ public void close() throws IOException {
480467
public boolean isClosed() {
481468
return generator.isClosed();
482469
}
483-
484-
/**
485-
* Copy the contents of the given InputStream to the given OutputStream.
486-
* Closes both streams when done.
487-
*
488-
* @param in the stream to copy from
489-
* @param out the stream to copy to
490-
* @return the number of bytes copied
491-
* @throws IOException in case of I/O errors
492-
*/
493-
private static long copyStream(InputStream in, OutputStream out) throws IOException {
494-
Objects.requireNonNull(in, "No InputStream specified");
495-
Objects.requireNonNull(out, "No OutputStream specified");
496-
final byte[] buffer = new byte[8192];
497-
boolean success = false;
498-
try {
499-
long byteCount = 0;
500-
int bytesRead;
501-
while ((bytesRead = in.read(buffer)) != -1) {
502-
out.write(buffer, 0, bytesRead);
503-
byteCount += bytesRead;
504-
}
505-
out.flush();
506-
success = true;
507-
return byteCount;
508-
} finally {
509-
if (success) {
510-
IOUtils.close(in, out);
511-
} else {
512-
IOUtils.closeWhileHandlingException(in, out);
513-
}
514-
}
515-
}
516470
}

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long
292292
* It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy
293293
* is in the stacktrace and is not granted the permissions needed to close and write the channel.
294294
*/
295-
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
295+
org.elasticsearch.core.internal.io.Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
296296

297297
@SuppressForbidden(reason = "channel is based on a socket")
298298
@Override
@@ -350,7 +350,7 @@ private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long
350350
throws IOException {
351351
assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method";
352352
final byte[] buffer = new byte[Math.toIntExact(blobSize)];
353-
org.elasticsearch.common.io.Streams.readFully(inputStream, buffer);
353+
Streams.readFully(inputStream, buffer);
354354
try {
355355
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
356356
new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public InputStream readBlob(String blobName, long position, long length) throws
165165
channel.position(position);
166166
}
167167
assert channel.position() == position;
168-
return org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length);
168+
return Streams.limitStream(Channels.newInputStream(channel), length);
169169
}
170170

171171
@Override
@@ -212,7 +212,8 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream
212212
private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
213213
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
214214
final int bufferSize = blobStore.bufferSizeInBytes();
215-
Streams.copy(inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]);
215+
org.elasticsearch.core.internal.io.Streams.copy(
216+
inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]);
216217
}
217218
IOUtils.fsync(tempBlobPath, false);
218219
}

server/src/main/java/org/elasticsearch/common/io/Streams.java

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -65,45 +65,6 @@ public void write(byte[] b, int off, int len) {
6565
}
6666
};
6767

68-
//---------------------------------------------------------------------
69-
// Copy methods for java.io.InputStream / java.io.OutputStream
70-
//---------------------------------------------------------------------
71-
72-
73-
public static long copy(InputStream in, OutputStream out) throws IOException {
74-
return copy(in, out, new byte[BUFFER_SIZE]);
75-
}
76-
77-
/**
78-
* Copy the contents of the given InputStream to the given OutputStream.
79-
* Closes both streams when done.
80-
*
81-
* @param in the stream to copy from
82-
* @param out the stream to copy to
83-
* @return the number of bytes copied
84-
* @throws IOException in case of I/O errors
85-
*/
86-
public static long copy(InputStream in, OutputStream out, byte[] buffer) throws IOException {
87-
Objects.requireNonNull(in, "No InputStream specified");
88-
Objects.requireNonNull(out, "No OutputStream specified");
89-
// Leverage try-with-resources to close in and out so that exceptions in close() are either propagated or added as suppressed
90-
// exceptions to the main exception
91-
try (InputStream in2 = in; OutputStream out2 = out) {
92-
return doCopy(in2, out2, buffer);
93-
}
94-
}
95-
96-
private static long doCopy(InputStream in, OutputStream out, byte[] buffer) throws IOException {
97-
long byteCount = 0;
98-
int bytesRead;
99-
while ((bytesRead = in.read(buffer)) != -1) {
100-
out.write(buffer, 0, bytesRead);
101-
byteCount += bytesRead;
102-
}
103-
out.flush();
104-
return byteCount;
105-
}
106-
10768
/**
10869
* Copy the contents of the given byte array to the given OutputStream.
10970
* Closes the stream when done.
@@ -222,7 +183,7 @@ public static int readFully(InputStream reader, byte[] dest, int offset, int len
222183
* Fully consumes the input stream, throwing the bytes away. Returns the number of bytes consumed.
223184
*/
224185
public static long consumeFully(InputStream inputStream) throws IOException {
225-
return copy(inputStream, NULL_OUTPUT_STREAM);
186+
return org.elasticsearch.core.internal.io.Streams.copy(inputStream, NULL_OUTPUT_STREAM);
226187
}
227188

228189
public static List<String> readAllLines(InputStream input) throws IOException {
@@ -267,11 +228,9 @@ public static BytesStream flushOnCloseStream(BytesStream os) {
267228
* Reads all bytes from the given {@link InputStream} and closes it afterwards.
268229
*/
269230
public static BytesReference readFully(InputStream in) throws IOException {
270-
try (InputStream inputStream = in) {
271-
BytesStreamOutput out = new BytesStreamOutput();
272-
copy(inputStream, out);
273-
return out.bytes();
274-
}
231+
BytesStreamOutput out = new BytesStreamOutput();
232+
org.elasticsearch.core.internal.io.Streams.copy(in, out);
233+
return out.bytes();
275234
}
276235

277236
/**

server/src/test/java/org/elasticsearch/common/io/StreamsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void testLimitInputStream() throws IOException {
9191
final int limit = randomIntBetween(0, bytes.length);
9292
final BytesArray stuffArray = new BytesArray(bytes);
9393
final ByteArrayOutputStream out = new ByteArrayOutputStream(bytes.length);
94-
final long count = Streams.copy(Streams.limitStream(stuffArray.streamInput(), limit), out);
94+
final long count = org.elasticsearch.core.internal.io.Streams.copy(Streams.limitStream(stuffArray.streamInput(), limit), out);
9595
assertEquals(limit, count);
9696
assertThat(Arrays.equals(out.toByteArray(), Arrays.copyOf(bytes, limit)), equalTo(true));
9797
}

server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@
2828
import org.elasticsearch.common.bytes.BytesReference;
2929
import org.elasticsearch.common.bytes.ReleasableBytesReference;
3030
import org.elasticsearch.common.collect.Tuple;
31-
import org.elasticsearch.common.io.Streams;
3231
import org.elasticsearch.common.io.stream.BytesStreamOutput;
3332
import org.elasticsearch.common.lease.Releasable;
3433
import org.elasticsearch.common.settings.Settings;
3534
import org.elasticsearch.common.unit.TimeValue;
3635
import org.elasticsearch.common.util.PageCacheRecycler;
3736
import org.elasticsearch.common.util.concurrent.ThreadContext;
37+
import org.elasticsearch.core.internal.io.Streams;
3838
import org.elasticsearch.test.ESTestCase;
3939

4040
import java.io.IOException;

server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@
2929
import org.elasticsearch.common.bytes.BytesReference;
3030
import org.elasticsearch.common.bytes.ReleasableBytesReference;
3131
import org.elasticsearch.common.collect.Tuple;
32-
import org.elasticsearch.common.io.Streams;
3332
import org.elasticsearch.common.io.stream.BytesStreamOutput;
3433
import org.elasticsearch.common.transport.TransportAddress;
3534
import org.elasticsearch.common.unit.TimeValue;
3635
import org.elasticsearch.common.util.BigArrays;
3736
import org.elasticsearch.common.util.PageCacheRecycler;
3837
import org.elasticsearch.common.util.concurrent.ThreadContext;
38+
import org.elasticsearch.core.internal.io.Streams;
3939
import org.elasticsearch.test.ESTestCase;
4040
import org.elasticsearch.threadpool.TestThreadPool;
4141
import org.elasticsearch.threadpool.ThreadPool;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
import org.elasticsearch.common.bytes.BytesArray;
1111
import org.elasticsearch.common.bytes.BytesReference;
1212
import org.elasticsearch.common.compress.NotXContentException;
13-
import org.elasticsearch.common.io.Streams;
1413
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
1514
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
1615
import org.elasticsearch.common.xcontent.XContentHelper;
1716
import org.elasticsearch.common.xcontent.XContentParser;
1817
import org.elasticsearch.common.xcontent.XContentType;
18+
import org.elasticsearch.core.internal.io.Streams;
1919

2020
import java.io.ByteArrayOutputStream;
2121
import java.io.IOException;

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.elasticsearch.common.cache.CacheBuilder;
5454
import org.elasticsearch.common.collect.Tuple;
5555
import org.elasticsearch.common.hash.MessageDigests;
56-
import org.elasticsearch.common.io.Streams;
5756
import org.elasticsearch.common.io.stream.BytesStreamOutput;
5857
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
5958
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
@@ -70,6 +69,7 @@
7069
import org.elasticsearch.common.xcontent.XContentBuilder;
7170
import org.elasticsearch.common.xcontent.XContentFactory;
7271
import org.elasticsearch.common.xcontent.XContentType;
72+
import org.elasticsearch.core.internal.io.Streams;
7373
import org.elasticsearch.index.IndexNotFoundException;
7474
import org.elasticsearch.index.engine.VersionConflictEngineException;
7575
import org.elasticsearch.index.query.BoolQueryBuilder;

0 commit comments

Comments
 (0)