Skip to content

Commit 9eac5ce

Browse files
Dry up inputstream to bytesreference (#43675) (#44094)
* Dry up Reading InputStream to BytesReference * Dry up spots where we use the same pattern to get from an InputStream to a BytesReferences
1 parent f1ebb82 commit 9eac5ce

File tree

8 files changed

+52
-90
lines changed

8 files changed

+52
-90
lines changed

server/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@
2121

2222
import org.elasticsearch.common.Nullable;
2323
import org.elasticsearch.common.bytes.BytesReference;
24-
import org.elasticsearch.common.io.stream.BytesStreamOutput;
25-
import org.elasticsearch.common.io.stream.StreamInput;
24+
import org.elasticsearch.common.io.Streams;
2625
import org.elasticsearch.common.xcontent.XContentHelper;
2726
import org.elasticsearch.common.xcontent.XContentType;
28-
import org.elasticsearch.core.internal.io.Streams;
2927

3028
import java.io.IOException;
3129
import java.util.Objects;
@@ -93,10 +91,6 @@ public static BytesReference uncompress(BytesReference bytes) throws IOException
9391
}
9492

9593
private static BytesReference uncompress(BytesReference bytes, Compressor compressor) throws IOException {
96-
StreamInput compressed = compressor.streamInput(bytes.streamInput());
97-
BytesStreamOutput bStream = new BytesStreamOutput();
98-
Streams.copy(compressed, bStream);
99-
compressed.close();
100-
return bStream.bytes();
94+
return Streams.readFully(compressor.streamInput(bytes.streamInput()));
10195
}
10296
}

server/src/main/java/org/elasticsearch/common/io/Streams.java

+12
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.common.bytes.BytesReference;
2323
import org.elasticsearch.common.io.stream.BytesStream;
24+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2425
import org.elasticsearch.common.io.stream.StreamOutput;
2526

2627
import java.io.BufferedReader;
@@ -226,6 +227,17 @@ public static BytesStream flushOnCloseStream(BytesStream os) {
226227
return new FlushOnCloseOutputStream(os);
227228
}
228229

230+
/**
231+
* Reads all bytes from the given {@link InputStream} and closes it afterwards.
232+
*/
233+
public static BytesReference readFully(InputStream in) throws IOException {
234+
try (InputStream inputStream = in) {
235+
BytesStreamOutput out = new BytesStreamOutput();
236+
copy(inputStream, out);
237+
return out.bytes();
238+
}
239+
}
240+
229241
/**
230242
* A wrapper around a {@link BytesStream} that makes the close operation a flush. This is
231243
* needed as sometimes a stream will be closed but the bytes that the stream holds still need

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+13-27
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.common.collect.Tuple;
5151
import org.elasticsearch.common.component.AbstractLifecycleComponent;
5252
import org.elasticsearch.common.compress.NotXContentException;
53+
import org.elasticsearch.common.io.Streams;
5354
import org.elasticsearch.common.io.stream.BytesStreamOutput;
5455
import org.elasticsearch.common.lucene.Lucene;
5556
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
@@ -60,10 +61,8 @@
6061
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
6162
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
6263
import org.elasticsearch.common.xcontent.XContentFactory;
63-
import org.elasticsearch.common.xcontent.XContentHelper;
6464
import org.elasticsearch.common.xcontent.XContentParser;
6565
import org.elasticsearch.common.xcontent.XContentType;
66-
import org.elasticsearch.core.internal.io.Streams;
6766
import org.elasticsearch.index.mapper.MapperService;
6867
import org.elasticsearch.index.shard.ShardId;
6968
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
@@ -671,32 +670,23 @@ public RepositoryData getRepositoryData() {
671670
final String snapshotsIndexBlobName = INDEX_FILE_PREFIX + Long.toString(indexGen);
672671

673672
RepositoryData repositoryData;
674-
try (InputStream blob = blobContainer().readBlob(snapshotsIndexBlobName)) {
675-
BytesStreamOutput out = new BytesStreamOutput();
676-
Streams.copy(blob, out);
677-
// EMPTY is safe here because RepositoryData#fromXContent calls namedObject
678-
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY,
679-
LoggingDeprecationHandler.INSTANCE, out.bytes(), XContentType.JSON)) {
680-
repositoryData = RepositoryData.snapshotsFromXContent(parser, indexGen);
681-
} catch (NotXContentException e) {
682-
logger.warn("[{}] index blob is not valid x-content [{} bytes]", snapshotsIndexBlobName, out.bytes().length());
683-
throw e;
684-
}
673+
// EMPTY is safe here because RepositoryData#fromXContent calls namedObject
674+
try (InputStream blob = blobContainer().readBlob(snapshotsIndexBlobName);
675+
XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
676+
LoggingDeprecationHandler.INSTANCE, blob)) {
677+
repositoryData = RepositoryData.snapshotsFromXContent(parser, indexGen);
685678
}
686679

687680
// now load the incompatible snapshot ids, if they exist
688-
try (InputStream blob = blobContainer().readBlob(INCOMPATIBLE_SNAPSHOTS_BLOB)) {
689-
BytesStreamOutput out = new BytesStreamOutput();
690-
Streams.copy(blob, out);
691-
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY,
692-
LoggingDeprecationHandler.INSTANCE, out.bytes(), XContentType.JSON)) {
693-
repositoryData = repositoryData.incompatibleSnapshotsFromXContent(parser);
694-
}
681+
try (InputStream blob = blobContainer().readBlob(INCOMPATIBLE_SNAPSHOTS_BLOB);
682+
XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
683+
LoggingDeprecationHandler.INSTANCE, blob)) {
684+
repositoryData = repositoryData.incompatibleSnapshotsFromXContent(parser);
695685
} catch (NoSuchFileException e) {
696686
if (isReadOnly()) {
697687
logger.debug("[{}] Incompatible snapshots blob [{}] does not exist, the likely " +
698-
"reason is that there are no incompatible snapshots in the repository",
699-
metadata.name(), INCOMPATIBLE_SNAPSHOTS_BLOB);
688+
"reason is that there are no incompatible snapshots in the repository",
689+
metadata.name(), INCOMPATIBLE_SNAPSHOTS_BLOB);
700690
} else {
701691
// write an empty incompatible-snapshots blob - we do this so that there
702692
// is a blob present, which helps speed up some cloud-based repositories
@@ -804,11 +794,7 @@ long latestIndexBlobId() throws IOException {
804794

805795
// package private for testing
806796
long readSnapshotIndexLatestBlob() throws IOException {
807-
try (InputStream blob = blobContainer().readBlob(INDEX_LATEST_BLOB)) {
808-
BytesStreamOutput out = new BytesStreamOutput();
809-
Streams.copy(blob, out);
810-
return Numbers.bytesToLong(out.bytes().toBytesRef());
811-
}
797+
return Numbers.bytesToLong(Streams.readFully(blobContainer().readBlob(INDEX_LATEST_BLOB)).toBytesRef());
812798
}
813799

814800
private long listBlobsToGetLatestIndexId() throws IOException {

server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java

+15-18
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.bytes.BytesArray;
3131
import org.elasticsearch.common.bytes.BytesReference;
3232
import org.elasticsearch.common.compress.CompressorFactory;
33+
import org.elasticsearch.common.io.Streams;
3334
import org.elasticsearch.common.io.stream.BytesStreamOutput;
3435
import org.elasticsearch.common.io.stream.StreamOutput;
3536
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
@@ -42,7 +43,6 @@
4243
import org.elasticsearch.common.xcontent.XContentHelper;
4344
import org.elasticsearch.common.xcontent.XContentParser;
4445
import org.elasticsearch.common.xcontent.XContentType;
45-
import org.elasticsearch.core.internal.io.Streams;
4646
import org.elasticsearch.gateway.CorruptStateException;
4747
import org.elasticsearch.snapshots.SnapshotInfo;
4848

@@ -149,24 +149,21 @@ public String blobName(String name) {
149149
* @param blobName blob name
150150
*/
151151
public T readBlob(BlobContainer blobContainer, String blobName) throws IOException {
152-
try (InputStream inputStream = blobContainer.readBlob(blobName)) {
153-
ByteArrayOutputStream out = new ByteArrayOutputStream();
154-
Streams.copy(inputStream, out);
155-
final byte[] bytes = out.toByteArray();
156-
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
157-
try (ByteArrayIndexInput indexInput = new ByteArrayIndexInput(resourceDesc, bytes)) {
158-
CodecUtil.checksumEntireFile(indexInput);
159-
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
160-
long filePointer = indexInput.getFilePointer();
161-
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
162-
try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
163-
new BytesArray(bytes, (int) filePointer, (int) contentSize))) {
164-
return reader.apply(parser);
165-
}
166-
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
167-
// we trick this into a dedicated exception with the original stacktrace
168-
throw new CorruptStateException(ex);
152+
final BytesReference bytes = Streams.readFully(blobContainer.readBlob(blobName));
153+
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
154+
try (ByteArrayIndexInput indexInput =
155+
new ByteArrayIndexInput(resourceDesc, BytesReference.toBytes(bytes))) {
156+
CodecUtil.checksumEntireFile(indexInput);
157+
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
158+
long filePointer = indexInput.getFilePointer();
159+
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
160+
try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
161+
bytes.slice((int) filePointer, (int) contentSize))) {
162+
return reader.apply(parser);
169163
}
164+
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
165+
// we trick this into a dedicated exception with the original stacktrace
166+
throw new CorruptStateException(ex);
170167
}
171168
}
172169

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java

+4-11
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,11 @@
2222
import org.elasticsearch.action.support.PlainActionFuture;
2323
import org.elasticsearch.common.Strings;
2424
import org.elasticsearch.common.blobstore.BlobContainer;
25-
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2625
import org.elasticsearch.common.unit.TimeValue;
2726
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
2827
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
29-
import org.elasticsearch.common.xcontent.XContentHelper;
3028
import org.elasticsearch.common.xcontent.XContentParser;
3129
import org.elasticsearch.common.xcontent.XContentType;
32-
import org.elasticsearch.core.internal.io.Streams;
3330
import org.elasticsearch.repositories.IndexId;
3431
import org.elasticsearch.repositories.RepositoriesService;
3532
import org.elasticsearch.repositories.RepositoryData;
@@ -81,14 +78,10 @@ protected void doRun() throws Exception {
8178
}
8279
assertIndexGenerations(blobContainer, latestGen);
8380
final RepositoryData repositoryData;
84-
try (InputStream inputStream = blobContainer.readBlob("index-" + latestGen);
85-
BytesStreamOutput out = new BytesStreamOutput()) {
86-
Streams.copy(inputStream, out);
87-
try (XContentParser parser =
88-
XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
89-
out.bytes(), XContentType.JSON)) {
90-
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen);
91-
}
81+
try (InputStream blob = blobContainer.readBlob("index-" + latestGen);
82+
XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
83+
LoggingDeprecationHandler.INSTANCE, blob)) {
84+
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen);
9285
}
9386
assertIndexUUIDs(blobContainer, repositoryData);
9487
assertSnapshotUUIDs(blobContainer, repositoryData);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/TemplateUtils.java

+2-9
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,15 @@
1616
import org.elasticsearch.common.collect.ImmutableOpenMap;
1717
import org.elasticsearch.common.compress.CompressedXContent;
1818
import org.elasticsearch.common.compress.NotXContentException;
19+
import org.elasticsearch.common.io.Streams;
1920
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
2021
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2122
import org.elasticsearch.common.xcontent.XContentFactory;
2223
import org.elasticsearch.common.xcontent.XContentHelper;
2324
import org.elasticsearch.common.xcontent.XContentParser;
2425
import org.elasticsearch.common.xcontent.XContentType;
25-
import org.elasticsearch.core.internal.io.Streams;
2626

27-
import java.io.ByteArrayOutputStream;
2827
import java.io.IOException;
29-
import java.io.InputStream;
3028
import java.util.Map;
3129
import java.util.function.Predicate;
3230
import java.util.regex.Pattern;
@@ -73,12 +71,7 @@ public static String loadTemplate(String resource, String version, String versio
7371
* Loads a resource from the classpath and returns it as a {@link BytesReference}
7472
*/
7573
public static BytesReference load(String name) throws IOException {
76-
try (InputStream is = TemplateUtils.class.getResourceAsStream(name)) {
77-
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
78-
Streams.copy(is, out);
79-
return new BytesArray(out.toByteArray());
80-
}
81-
}
74+
return Streams.readFully(TemplateUtils.class.getResourceAsStream(name));
8275
}
8376

8477
/**

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,14 @@
1111
import org.elasticsearch.action.index.IndexRequest;
1212
import org.elasticsearch.action.index.IndexResponse;
1313
import org.elasticsearch.client.Client;
14-
import org.elasticsearch.common.bytes.BytesArray;
14+
import org.elasticsearch.common.io.Streams;
1515
import org.elasticsearch.common.unit.TimeValue;
1616
import org.elasticsearch.common.util.concurrent.ThreadContext;
1717
import org.elasticsearch.common.xcontent.ToXContent;
1818
import org.elasticsearch.common.xcontent.XContentBuilder;
1919
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
2020
import org.elasticsearch.common.xcontent.XContentFactory;
2121
import org.elasticsearch.common.xcontent.XContentType;
22-
import org.elasticsearch.core.internal.io.Streams;
2322
import org.elasticsearch.index.mapper.DateFieldMapper;
2423
import org.elasticsearch.rest.RestStatus;
2524
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
@@ -38,7 +37,6 @@
3837
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
3938
import org.elasticsearch.xpack.ml.notifications.Auditor;
4039

41-
import java.io.ByteArrayOutputStream;
4240
import java.io.IOException;
4341
import java.io.InputStream;
4442
import java.util.Date;
@@ -407,9 +405,7 @@ private DataCounts postData(InputStream inputStream, XContentType xContentType)
407405
throws IOException {
408406
PostDataAction.Request request = new PostDataAction.Request(jobId);
409407
request.setDataDescription(dataDescription);
410-
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
411-
Streams.copy(inputStream, outputStream);
412-
request.setContent(new BytesArray(outputStream.toByteArray()), xContentType);
408+
request.setContent(Streams.readFully(inputStream), xContentType);
413409
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
414410
PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).actionGet();
415411
return response.getDataCounts();

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/ClusterAlertsUtil.java

+2-11
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,11 @@
77

88
import org.elasticsearch.Version;
99
import org.elasticsearch.cluster.service.ClusterService;
10-
import org.elasticsearch.common.bytes.BytesArray;
1110
import org.elasticsearch.common.bytes.BytesReference;
11+
import org.elasticsearch.common.io.Streams;
1212
import org.elasticsearch.common.settings.SettingsException;
13-
import org.elasticsearch.core.internal.io.Streams;
1413

15-
import java.io.ByteArrayOutputStream;
1614
import java.io.IOException;
17-
import java.io.InputStream;
1815
import java.util.Arrays;
1916
import java.util.List;
2017
import java.util.Locale;
@@ -124,13 +121,7 @@ public static String loadWatch(final ClusterService clusterService, final String
124121
}
125122

126123
private static BytesReference loadResource(final String resource) throws IOException {
127-
try (InputStream is = ClusterAlertsUtil.class.getResourceAsStream(resource)) {
128-
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
129-
Streams.copy(is, out);
130-
131-
return new BytesArray(out.toByteArray());
132-
}
133-
}
124+
return Streams.readFully(ClusterAlertsUtil.class.getResourceAsStream(resource));
134125
}
135126

136127
/**

0 commit comments

Comments
 (0)