Skip to content

Commit fe9904f

Browse files
More Efficient Blobstore Metdata IO (#55777) (#55788)
No need to copy all these bytes multiple times, especially not when writing a multiple MB global cluster state snapshot through this method.
1 parent d56f25a commit fe9904f

File tree

3 files changed

+25
-23
lines changed

3 files changed

+25
-23
lines changed

server/src/main/java/org/elasticsearch/common/compress/Compressor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.io.stream.StreamOutput;
2525

2626
import java.io.IOException;
27+
import java.io.OutputStream;
2728

2829
public interface Compressor {
2930

@@ -37,5 +38,5 @@ public interface Compressor {
3738
* Creates a new stream output that compresses the contents and writes to the provided stream
3839
* output. Closing the returned {@link StreamOutput} will close the provided stream output.
3940
*/
40-
StreamOutput streamOutput(StreamOutput out) throws IOException;
41+
StreamOutput streamOutput(OutputStream out) throws IOException;
4142
}

server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ public void close() throws IOException {
107107
}
108108

109109
@Override
110-
public StreamOutput streamOutput(StreamOutput out) throws IOException {
111-
out.writeBytes(HEADER);
110+
public StreamOutput streamOutput(OutputStream out) throws IOException {
111+
out.write(HEADER);
112112
final boolean nowrap = true;
113113
final Deflater deflater = new Deflater(LEVEL, nowrap);
114114
final boolean syncFlush = true;

server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
import org.apache.lucene.index.CorruptIndexException;
2323
import org.apache.lucene.index.IndexFormatTooNewException;
2424
import org.apache.lucene.index.IndexFormatTooOldException;
25+
import org.apache.lucene.store.ByteBuffersDataInput;
26+
import org.apache.lucene.store.ByteBuffersIndexInput;
27+
import org.apache.lucene.store.IndexInput;
2528
import org.apache.lucene.store.OutputStreamIndexOutput;
29+
import org.apache.lucene.util.BytesRef;
2630
import org.elasticsearch.cluster.metadata.Metadata;
2731
import org.elasticsearch.common.CheckedConsumer;
2832
import org.elasticsearch.common.CheckedFunction;
2933
import org.elasticsearch.common.blobstore.BlobContainer;
30-
import org.elasticsearch.common.bytes.BytesArray;
3134
import org.elasticsearch.common.bytes.BytesReference;
3235
import org.elasticsearch.common.compress.CompressorFactory;
3336
import org.elasticsearch.common.io.Streams;
@@ -46,10 +49,10 @@
4649
import org.elasticsearch.gateway.CorruptStateException;
4750
import org.elasticsearch.snapshots.SnapshotInfo;
4851

49-
import java.io.ByteArrayOutputStream;
5052
import java.io.IOException;
5153
import java.io.InputStream;
5254
import java.io.OutputStream;
55+
import java.util.Arrays;
5356
import java.util.HashMap;
5457
import java.util.Locale;
5558
import java.util.Map;
@@ -127,8 +130,10 @@ public String blobName(String name) {
127130
public T readBlob(BlobContainer blobContainer, String blobName) throws IOException {
128131
final BytesReference bytes = Streams.readFully(blobContainer.readBlob(blobName));
129132
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
130-
try (ByteArrayIndexInput indexInput =
131-
new ByteArrayIndexInput(resourceDesc, BytesReference.toBytes(bytes))) {
133+
try {
134+
final IndexInput indexInput = bytes.length() > 0 ? new ByteBuffersIndexInput(
135+
new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
136+
: new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
132137
CodecUtil.checksumEntireFile(indexInput);
133138
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
134139
long filePointer = indexInput.getFilePointer();
@@ -182,19 +187,9 @@ public void write(T obj, BlobContainer blobContainer, String name, boolean failI
182187
});
183188
}
184189

185-
private void writeTo(final T obj, final String blobName, final CheckedConsumer<BytesArray, IOException> consumer) throws IOException {
186-
final BytesReference bytes;
187-
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
188-
if (compress) {
189-
try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput)) {
190-
write(obj, compressedStreamOutput);
191-
}
192-
} else {
193-
write(obj, bytesStreamOutput);
194-
}
195-
bytes = bytesStreamOutput.bytes();
196-
}
197-
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
190+
private void writeTo(final T obj, final String blobName,
191+
final CheckedConsumer<BytesReference, IOException> consumer) throws IOException {
192+
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
198193
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
199194
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) {
200195
CodecUtil.writeHeader(indexOutput, codec, VERSION);
@@ -205,15 +200,21 @@ public void close() throws IOException {
205200
// in order to write the footer we need to prevent closing the actual index input.
206201
}
207202
}) {
208-
bytes.writeTo(indexOutputOutputStream);
203+
if (compress) {
204+
try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(indexOutputOutputStream)) {
205+
write(obj, compressedStreamOutput);
206+
}
207+
} else {
208+
write(obj, indexOutputOutputStream);
209+
}
209210
}
210211
CodecUtil.writeFooter(indexOutput);
211212
}
212-
consumer.accept(new BytesArray(outputStream.toByteArray()));
213+
consumer.accept(outputStream.bytes());
213214
}
214215
}
215216

216-
private void write(T obj, StreamOutput streamOutput) throws IOException {
217+
private void write(T obj, OutputStream streamOutput) throws IOException {
217218
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE, streamOutput)) {
218219
builder.startObject();
219220
obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);

0 commit comments

Comments
 (0)