Skip to content

Introduce ChunkedBlobOutputStream #74620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.repositories.blobstore;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

/**
* Base class for doing chunked writes to a blob store. Some blob stores require either up-front knowledge of the size of the blob that
* will be written or writing it in chunks that are then joined into the final blob at the end of the write. This class provides a basis
* on which to implement an output stream that encapsulates such a chunked write.
*
* @param <T> type of chunk identifier
*/
public abstract class ChunkedBlobOutputStream<T> extends OutputStream {

/**
* List of identifiers of already written chunks.
*/
protected final List<T> parts = new ArrayList<>();

/**
* Size of the write buffer above which it must be flushed to storage.
*/
private final long maxBytesToBuffer;

/**
* Big arrays to be able to allocate buffers from pooled bytes.
*/
private final BigArrays bigArrays;

/**
* Current write buffer.
*/
protected ReleasableBytesStreamOutput buffer;

/**
* Set to true once no more calls to {@link #write} are expected and the blob has been received by {@link #write} in full so that
* {@link #close()} knows whether to clean up existing chunks or finish a chunked write.
*/
protected boolean successful = false;

/**
* Is set to {@code true} once this stream has been closed.
*/
private boolean closed = false;

/**
* Number of bytes flushed to blob storage so far.
*/
protected long flushedBytes = 0L;

protected ChunkedBlobOutputStream(BigArrays bigArrays, long maxBytesToBuffer) {
this.bigArrays = bigArrays;
if (maxBytesToBuffer <= 0) {
throw new IllegalArgumentException("maximum buffer size must be positive");
}
this.maxBytesToBuffer = maxBytesToBuffer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should check maxBytesToBuffer > 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ added a check

buffer = new ReleasableBytesStreamOutput(bigArrays);
}

@Override
public final void write(int b) throws IOException {
buffer.write(b);
maybeFlushBuffer();
}

@Override
public final void write(byte[] b, int off, int len) throws IOException {
buffer.write(b, off, len);
maybeFlushBuffer();
}

@Override
public final void close() throws IOException {
if (closed) {
assert false : "this output stream should only be closed once";
throw new AlreadyClosedException("already closed");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should just ignore double closing? I think that's what is usually done in many streams (but we should keep the assertion)

Copy link
Member Author

@original-brownbear original-brownbear Jun 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of weird to have an assertion here but quietly skip over double closing otherwise? If we had a bug here that would only show in some corner case we'd never see it in logs otherwise?

}
closed = true;
try {
if (successful) {
onCompletion();
} else {
onFailure();
}
} finally {
Releasables.close(buffer);
}
}

/**
* Mark all blob bytes as properly received by {@link #write}, indicating that {@link #close} may finalize the blob.
*/
public final void markSuccess() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really a suggestion:

Suggested change
public final void markSuccess() {
public final void done() {

this.successful = true;
}

/**
* Finish writing the current buffer contents to storage and track them by the given {@code partId}. Depending on whether all contents
* have already been written either prepare the write buffer for additional writes or release the buffer.
*
* @param partId part identifier to track for use when closing
*/
protected final void finishPart(T partId) {
flushedBytes += buffer.size();
parts.add(partId);
buffer.close();
// only need a new buffer if we're not done yet
if (successful) {
buffer = null;
} else {
buffer = new ReleasableBytesStreamOutput(bigArrays);
}
}

/**
* Write the contents of {@link #buffer} to storage. Implementations should call {@link #finishPart} at the end to track the the chunk
* of data just written and ready {@link #buffer} for the next write.
*/
protected abstract void flushBuffer() throws IOException;

/**
* Invoked once all write chunks/parts are ready to be combined into the final blob. Implementations must invoke the necessary logic
* for combining the uploaded chunks into the final blob in this method.
*/
protected abstract void onCompletion() throws IOException;

/**
* Invoked in case writing all chunks of data to storage failed. Implementations should run any cleanup required for the already
* written data in this method.
*/
protected abstract void onFailure();

private void maybeFlushBuffer() throws IOException {
if (buffer.size() >= maxBytesToBuffer) {
flushBuffer();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories.blobstore;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.CRC32;
import java.util.zip.CheckedOutputStream;

public class ChunkedBlobOutputStreamTests extends ESTestCase {

private BigArrays bigArrays;

@Override
public void setUp() throws Exception {
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
super.setUp();
}

@Override
public void tearDown() throws Exception {
super.tearDown();
}

public void testSuccessfulChunkedWrite() throws IOException {
final long chunkSize = randomLongBetween(10, 1024);
final CRC32 checksumIn = new CRC32();
final CRC32 checksumOut = new CRC32();
final CheckedOutputStream out = new CheckedOutputStream(OutputStream.nullOutputStream(), checksumOut);
final AtomicLong writtenBytesCounter = new AtomicLong(0L);
final long bytesToWrite = randomLongBetween(chunkSize - 5, 1000 * chunkSize);
long written = 0;
try (ChunkedBlobOutputStream<Integer> stream = new ChunkedBlobOutputStream<>(bigArrays, chunkSize) {

private final AtomicInteger partIdSupplier = new AtomicInteger();

@Override
protected void flushBuffer() throws IOException {
final BytesReference bytes = buffer.bytes();
bytes.writeTo(out);
writtenBytesCounter.addAndGet(bytes.length());
finishPart(partIdSupplier.incrementAndGet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could make flushBuffer() return the part identifier and have the logic that exist in finishPart(() being private in ChunkedBlobOutputStream too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bit of complexity to doing this I think with all the error handling and special casing so different across implementations, but I'll try and see if I can do something nicer in the main PR (don't want to break the API here now and I'm having ab it of a hard time to reason through all edge cases around API changes from the main PR). But in general I'm ++ to the idea if possible.

}

@Override
protected void onCompletion() throws IOException {
if (buffer.size() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be handled by ChunkedBlobOutputStream itself I think

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I couldn't find a neat way of encapsulating this (without adding even more complexity at least) because of the way the onCompletion will do a normal write if nothing has been buffered where it needs access to the buffer anyway.

flushBuffer();
}
out.flush();
for (int i = 0; i < partIdSupplier.get(); i++) {
assertEquals((long) i + 1, (long) parts.get(i));
}
}

@Override
protected void onFailure() {
fail("not supposed to fail");
}
}) {
final byte[] buffer = new byte[randomInt(Math.toIntExact(2 * chunkSize)) + 1];
while (written < bytesToWrite) {
if (randomBoolean()) {
random().nextBytes(buffer);
final int offset = randomInt(buffer.length - 2) + 1;
final int length = Math.toIntExact(Math.min(bytesToWrite - written, buffer.length - offset));
stream.write(buffer, offset, length);
checksumIn.update(buffer, offset, length);
written += length;
} else {
int oneByte = randomByte();
stream.write(oneByte);
checksumIn.update(oneByte);
written++;
}
}
stream.markSuccess();
}
assertEquals(bytesToWrite, written);
assertEquals(bytesToWrite, writtenBytesCounter.get());
assertEquals(checksumIn.getValue(), checksumOut.getValue());
}

public void testExceptionDuringChunkedWrite() throws IOException {
final long chunkSize = randomLongBetween(10, 1024);
final AtomicLong writtenBytesCounter = new AtomicLong(0L);
final long bytesToWrite = randomLongBetween(chunkSize - 5, 1000 * chunkSize);
long written = 0;
final AtomicBoolean onFailureCalled = new AtomicBoolean(false);
try (ChunkedBlobOutputStream<Integer> stream = new ChunkedBlobOutputStream<>(bigArrays, chunkSize) {

private final AtomicInteger partIdSupplier = new AtomicInteger();

@Override
protected void flushBuffer() {
writtenBytesCounter.addAndGet(buffer.size());
finishPart(partIdSupplier.incrementAndGet());
}

@Override
protected void onCompletion() {
fail("supposed to fail");
}

@Override
protected void onFailure() {
for (int i = 0; i < partIdSupplier.get(); i++) {
assertEquals((long) i + 1, (long) parts.get(i));
}
assertTrue(onFailureCalled.compareAndSet(false, true));
}
}) {
final byte[] buffer = new byte[randomInt(Math.toIntExact(2 * chunkSize)) + 1];
while (written < bytesToWrite) {
if (rarely()) {
break;
} else if (randomBoolean()) {
random().nextBytes(buffer);
final int offset = randomInt(buffer.length - 2) + 1;
final int length = Math.toIntExact(Math.min(bytesToWrite - written, buffer.length - offset));
stream.write(buffer, offset, length);
written += length;
} else {
int oneByte = randomByte();
stream.write(oneByte);
written++;
}
}
}
assertTrue(onFailureCalled.get());
}
}