Skip to content

Commit 3bb0e5a

Browse files
Introduce ChunkedBlobOutputStream
Extracted the chunked output stream logic from elastic#74313 and added tests for it to make it easier to review.
1 parent 90f2271 commit 3bb0e5a

File tree

2 files changed

+298
-0
lines changed

2 files changed

+298
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
package org.elasticsearch.repositories.blobstore;
9+
10+
import org.apache.lucene.store.AlreadyClosedException;
11+
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
12+
import org.elasticsearch.common.util.BigArrays;
13+
import org.elasticsearch.core.Releasables;
14+
15+
import java.io.IOException;
16+
import java.io.OutputStream;
17+
import java.util.ArrayList;
18+
import java.util.List;
19+
20+
/**
21+
* Base class for doing chunked writes to a blob store. Some blob stores require either up-front knowledge of the size of the blob that
22+
* will be written or writing it in chunks that are then joined into the final blob at the end of the write. This class provides a basis
23+
* on which to implement an output stream that encapsulates such a chunked write.
24+
*
25+
* @param <T> type of chunk identifier
26+
*/
27+
public abstract class ChunkedBlobOutputStream<T> extends OutputStream {
28+
29+
/**
30+
* List of identifiers of already written chunks.
31+
*/
32+
protected final List<T> parts = new ArrayList<>();
33+
34+
/**
35+
* Size of the write buffer above which it must be flushed to storage.
36+
*/
37+
private final long maxBytesToBuffer;
38+
39+
/**
40+
* Big arrays to be able to allocate buffers from pooled bytes.
41+
*/
42+
private final BigArrays bigArrays;
43+
44+
/**
45+
* Current write buffer.
46+
*/
47+
protected ReleasableBytesStreamOutput buffer;
48+
49+
/**
50+
* Set to true once no more calls to {@link #write} are expected and the blob has been received by {@link #write} in full so that
51+
* {@link #close()} knows whether to clean up existing chunks or finish a chunked write.
52+
*/
53+
protected boolean successful = false;
54+
55+
/**
56+
* Is set to {@code true} once this stream has been closed.
57+
*/
58+
private boolean closed = false;
59+
60+
/**
61+
* Number of bytes flushed to blob storage so far.
62+
*/
63+
protected long flushedBytes = 0L;
64+
65+
protected ChunkedBlobOutputStream(BigArrays bigArrays, long maxBytesToBuffer) {
66+
this.bigArrays = bigArrays;
67+
this.maxBytesToBuffer = maxBytesToBuffer;
68+
buffer = new ReleasableBytesStreamOutput(bigArrays);
69+
}
70+
71+
@Override
72+
public final void write(int b) throws IOException {
73+
buffer.write(b);
74+
maybeFlushBuffer();
75+
}
76+
77+
@Override
78+
public final void write(byte[] b, int off, int len) throws IOException {
79+
buffer.write(b, off, len);
80+
maybeFlushBuffer();
81+
}
82+
83+
@Override
84+
public final void close() throws IOException {
85+
if (closed) {
86+
assert false : "this output stream should only be closed once";
87+
throw new AlreadyClosedException("already closed");
88+
}
89+
closed = true;
90+
try {
91+
if (successful) {
92+
onCompletion();
93+
} else {
94+
onFailure();
95+
}
96+
} finally {
97+
Releasables.close(buffer);
98+
}
99+
}
100+
101+
/**
102+
* Mark all blob bytes as properly received by {@link #write}, indicating that {@link #close} may finalize the blob.
103+
*/
104+
public final void markSuccess() {
105+
this.successful = true;
106+
}
107+
108+
/**
109+
* Finish writing the current buffer contents to storage and track them by the given {@code partId}. Depending on whether all contents
110+
* have already been written either prepare the write buffer for additional writes or release the buffer.
111+
*
112+
* @param partId part identifier to track for use when closing
113+
*/
114+
protected final void finishPart(T partId) {
115+
flushedBytes += buffer.size();
116+
parts.add(partId);
117+
buffer.close();
118+
// only need a new buffer if we're not done yet
119+
if (successful) {
120+
buffer = null;
121+
} else {
122+
buffer = new ReleasableBytesStreamOutput(bigArrays);
123+
}
124+
}
125+
126+
/**
127+
* Write the contents of {@link #buffer} to storage. Implementations should call {@link #finishPart} at the end to track the the chunk
128+
* of data just written and ready {@link #buffer} for the next write.
129+
*/
130+
protected abstract void flushBuffer() throws IOException;
131+
132+
/**
133+
* Invoked once all write chunks/parts are ready to be combined into the final blob. Implementations must invoke the necessary logic
134+
* for combining the uploaded chunks into the final blob in this method.
135+
*/
136+
protected abstract void onCompletion() throws IOException;
137+
138+
/**
139+
* Invoked in case writing all chunks of data to storage failed. Implementations should run any cleanup required for the already
140+
* written data in this method.
141+
*/
142+
protected abstract void onFailure();
143+
144+
private void maybeFlushBuffer() throws IOException {
145+
if (buffer.size() >= maxBytesToBuffer) {
146+
flushBuffer();
147+
}
148+
}
149+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.repositories.blobstore;
10+
11+
import org.elasticsearch.common.bytes.BytesReference;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.common.util.BigArrays;
14+
import org.elasticsearch.common.util.MockBigArrays;
15+
import org.elasticsearch.common.util.MockPageCacheRecycler;
16+
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
17+
import org.elasticsearch.test.ESTestCase;
18+
19+
import java.io.IOException;
20+
import java.io.OutputStream;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
import java.util.zip.CRC32;
25+
import java.util.zip.CheckedOutputStream;
26+
27+
public class ChunkedBlobOutputStreamTests extends ESTestCase {
28+
29+
private BigArrays bigArrays;
30+
31+
@Override
32+
public void setUp() throws Exception {
33+
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
34+
super.setUp();
35+
}
36+
37+
@Override
38+
public void tearDown() throws Exception {
39+
super.tearDown();
40+
}
41+
42+
public void testSuccessfulChunkedWrite() throws IOException {
43+
final long chunkSize = randomLongBetween(10, 1024);
44+
final CRC32 checksumIn = new CRC32();
45+
final CRC32 checksumOut = new CRC32();
46+
final CheckedOutputStream out = new CheckedOutputStream(OutputStream.nullOutputStream(), checksumOut);
47+
final AtomicLong writtenBytesCounter = new AtomicLong(0L);
48+
final long bytesToWrite = randomLongBetween(chunkSize - 5, 1000 * chunkSize);
49+
long written = 0;
50+
try (ChunkedBlobOutputStream<Integer> stream = new ChunkedBlobOutputStream<>(bigArrays, chunkSize) {
51+
52+
private final AtomicInteger partIdSupplier = new AtomicInteger();
53+
54+
@Override
55+
protected void flushBuffer() throws IOException {
56+
final BytesReference bytes = buffer.bytes();
57+
bytes.writeTo(out);
58+
writtenBytesCounter.addAndGet(bytes.length());
59+
finishPart(partIdSupplier.incrementAndGet());
60+
}
61+
62+
@Override
63+
protected void onCompletion() throws IOException {
64+
if (buffer.size() > 0) {
65+
flushBuffer();
66+
}
67+
out.flush();
68+
for (int i = 0; i < partIdSupplier.get(); i++) {
69+
assertEquals((long) i + 1, (long) parts.get(i));
70+
}
71+
}
72+
73+
@Override
74+
protected void onFailure() {
75+
fail("not supposed to fail");
76+
}
77+
}) {
78+
final byte[] buffer = new byte[randomInt(Math.toIntExact(2 * chunkSize)) + 1];
79+
while (written < bytesToWrite) {
80+
if (randomBoolean()) {
81+
random().nextBytes(buffer);
82+
final int offset = randomInt(buffer.length - 2) + 1;
83+
final int length = Math.toIntExact(Math.min(bytesToWrite - written, buffer.length - offset));
84+
stream.write(buffer, offset, length);
85+
checksumIn.update(buffer, offset, length);
86+
written += length;
87+
} else {
88+
int oneByte = randomByte();
89+
stream.write(oneByte);
90+
checksumIn.update(oneByte);
91+
written++;
92+
}
93+
}
94+
stream.markSuccess();
95+
}
96+
assertEquals(bytesToWrite, written);
97+
assertEquals(bytesToWrite, writtenBytesCounter.get());
98+
assertEquals(checksumIn.getValue(), checksumOut.getValue());
99+
}
100+
101+
public void testExceptionDuringChunkedWrite() throws IOException {
102+
final long chunkSize = randomLongBetween(10, 1024);
103+
final AtomicLong writtenBytesCounter = new AtomicLong(0L);
104+
final long bytesToWrite = randomLongBetween(chunkSize - 5, 1000 * chunkSize);
105+
long written = 0;
106+
final AtomicBoolean onFailureCalled = new AtomicBoolean(false);
107+
try (ChunkedBlobOutputStream<Integer> stream = new ChunkedBlobOutputStream<>(bigArrays, chunkSize) {
108+
109+
private final AtomicInteger partIdSupplier = new AtomicInteger();
110+
111+
@Override
112+
protected void flushBuffer() {
113+
writtenBytesCounter.addAndGet(buffer.size());
114+
finishPart(partIdSupplier.incrementAndGet());
115+
}
116+
117+
@Override
118+
protected void onCompletion() {
119+
fail("supposed to fail");
120+
}
121+
122+
@Override
123+
protected void onFailure() {
124+
for (int i = 0; i < partIdSupplier.get(); i++) {
125+
assertEquals((long) i + 1, (long) parts.get(i));
126+
}
127+
assertTrue(onFailureCalled.compareAndSet(false, true));
128+
}
129+
}) {
130+
final byte[] buffer = new byte[randomInt(Math.toIntExact(2 * chunkSize)) + 1];
131+
while (written < bytesToWrite) {
132+
if (rarely()) {
133+
break;
134+
} else if (randomBoolean()) {
135+
random().nextBytes(buffer);
136+
final int offset = randomInt(buffer.length - 2) + 1;
137+
final int length = Math.toIntExact(Math.min(bytesToWrite - written, buffer.length - offset));
138+
stream.write(buffer, offset, length);
139+
written += length;
140+
} else {
141+
int oneByte = randomByte();
142+
stream.write(oneByte);
143+
written++;
144+
}
145+
}
146+
}
147+
assertTrue(onFailureCalled.get());
148+
}
149+
}

0 commit comments

Comments
 (0)