Skip to content

Save Memory on Large Repository Metadata Blob Writes #74313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
318bacc
works in core
original-brownbear Jun 16, 2021
998c369
gcs support
original-brownbear Jun 16, 2021
67ad357
hdfs support
original-brownbear Jun 16, 2021
d7c0223
s3
original-brownbear Jun 16, 2021
75fa696
mostly works
original-brownbear Jun 16, 2021
cf4fa9e
works
original-brownbear Jun 16, 2021
206ee12
bck
original-brownbear Jun 16, 2021
097f2b3
Merge remote-tracking branch 'elastic/master' into efficient-storage-…
original-brownbear Jun 16, 2021
8f70df4
Merge remote-tracking branch 'elastic/master' into efficient-storage-…
original-brownbear Jun 18, 2021
1f46a8b
fix cs
original-brownbear Jun 18, 2021
6d7a2df
drier
original-brownbear Jun 18, 2021
0d67021
nicer
original-brownbear Jun 18, 2021
3a1cc7d
cleanup
original-brownbear Jun 18, 2021
1c8084d
stop materializing full messages
original-brownbear Jun 18, 2021
613cdc5
Merge remote-tracking branch 'elastic/master' into save-memory-large-…
original-brownbear Jun 18, 2021
682c8ca
docs
original-brownbear Jun 18, 2021
1d6ec23
fix gcs
original-brownbear Jun 18, 2021
6c06d96
Merge remote-tracking branch 'elastic/master' into save-memory-large-…
original-brownbear Jun 19, 2021
f98d10d
more docs
original-brownbear Jun 19, 2021
98b6119
nicer
original-brownbear Jun 19, 2021
6de3bc5
docs and drier
original-brownbear Jun 19, 2021
ced642a
tests and fixes Azure
original-brownbear Jun 19, 2021
3d06059
drier
original-brownbear Jun 19, 2021
ad68ed8
Merge remote-tracking branch 'elastic/master' into save-memory-large-…
original-brownbear Jun 20, 2021
3af17d7
docs
original-brownbear Jun 20, 2021
7ebffa9
readability
original-brownbear Jun 20, 2021
3cdef23
Merge remote-tracking branch 'elastic/master' into save-memory-large-…
original-brownbear Jun 24, 2021
039db6a
PR comments
original-brownbear Jun 24, 2021
0541320
Merge remote-tracking branch 'elastic/master' into save-memory-large-…
original-brownbear Jun 24, 2021
17ac9fa
Merge remote-tracking branch 'elastic/master' into save-memory-large-…
original-brownbear Jun 24, 2021
e37d207
CR: comments
original-brownbear Jun 24, 2021
25eb3b7
fixes
original-brownbear Jun 28, 2021
ca32351
Merge remote-tracking branch 'elastic/master' into save-memory-large-…
original-brownbear Jun 28, 2021
f50fb5f
Merge remote-tracking branch 'elastic/master' into save-memory-large-…
original-brownbear Jun 29, 2021
4340b39
CR: comments
original-brownbear Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.common.blobstore.url;

import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetadata;
Expand All @@ -20,6 +21,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
Expand Down Expand Up @@ -121,6 +123,14 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

@Override
public void writeBlob(String blobName,
boolean failIfAlreadyExists,
boolean atomic,
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

@Override
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetadata;
Expand All @@ -22,6 +23,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.NoSuchFileException;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -99,6 +101,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
}

@Override
public void writeBlob(String blobName,
boolean failIfAlreadyExists,
boolean atomic,
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
}

@Override
public DeleteResult delete() throws IOException {
return blobStore.deleteBlobDirectory(keyPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand All @@ -46,13 +48,15 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -82,6 +86,8 @@ public class AzureBlobStore implements BlobStore {

private final AzureStorageService service;

private final BigArrays bigArrays;

private final String clientName;
private final String container;
private final LocationMode locationMode;
Expand All @@ -90,10 +96,11 @@ public class AzureBlobStore implements BlobStore {
private final Stats stats = new Stats();
private final BiConsumer<String, URL> statsConsumer;

public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service) {
public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, BigArrays bigArrays) {
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
this.service = service;
this.bigArrays = bigArrays;
// locationMode is set per repository, not per client
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings());
Expand Down Expand Up @@ -383,6 +390,48 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
executeSingleUpload(blobName, byteBufferFlux, bytes.length(), failIfAlreadyExists);
}

public void writeBlob(String blobName,
boolean failIfAlreadyExists,
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
final BlockBlobAsyncClient blockBlobAsyncClient = asyncClient().getBlobContainerAsyncClient(container)
.getBlobAsyncClient(blobName).getBlockBlobAsyncClient();
try (ChunkedBlobOutputStream<String> out = new ChunkedBlobOutputStream<>(bigArrays, getUploadBlockSize()) {

@Override
protected void flushBuffer() {
if (buffer.size() == 0) {
return;
}
final String blockId = makeMultipartBlockId();
SocketAccess.doPrivilegedVoidException(() -> blockBlobAsyncClient.stageBlock(
blockId,
Flux.fromArray(BytesReference.toByteBuffers(buffer.bytes())),
buffer.size()
).block());
finishPart(blockId);
}

@Override
protected void onAllPartsReady() {
if (written == 0L) {
writeBlob(blobName, buffer.bytes(), failIfAlreadyExists);
} else {
flushBuffer();
SocketAccess.doPrivilegedVoidException(
() -> blockBlobAsyncClient.commitBlockList(parts, failIfAlreadyExists == false).block());
}
}

@Override
protected void onFailure() {
// TODO: here and in multi-part upload, should we clean up uploaded blobs?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are garbage collected after a week according to the API docs (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks I didn't know that, I'll add a comment to that effect :) Seems like you can't even delete those dangling blocks if you wanted to

}
}) {
writer.accept(out);
out.markSuccess();
}
}

public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
assert inputStream.markSupported()
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
Expand Down Expand Up @@ -439,13 +488,11 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";

final List<String> blockIds = new ArrayList<>(nbParts);
final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder();
for (int i = 0; i < nbParts; i++) {
final long length = i < nbParts - 1 ? partSize : lastPartSize;
Flux<ByteBuffer> byteBufferFlux = convertStreamToByteBuffer(inputStream, length, DEFAULT_UPLOAD_BUFFERS_SIZE);

final String blockId = base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID()));
final String blockId = makeMultipartBlockId();
blockBlobAsyncClient.stageBlock(blockId, byteBufferFlux, length).block();
blockIds.add(blockId);
}
Expand All @@ -454,6 +501,13 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo
});
}

private static final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
private static final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder();

private String makeMultipartBlockId() {
return base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID()));
}

/**
* Converts the provided input stream into a Flux of ByteBuffer. To avoid having large amounts of outstanding
* memory this Flux reads the InputStream into ByteBuffers of {@code chunkSize} size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected BlobStore getBlobStore() {

@Override
protected AzureBlobStore createBlobStore() {
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, bigArrays);

logger.debug(() -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import fixture.azure.AzureHttpHandler;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -172,7 +174,7 @@ int getMaxReadRetries(String clientName) {
.put(MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
.build());

return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service));
return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service, BigArrays.NON_RECYCLING_INSTANCE));
}

public void testReadNonexistentBlobThrowsNoSuchFileException() {
Expand Down Expand Up @@ -391,6 +393,83 @@ public void testWriteLargeBlob() throws Exception {
assertThat(blocks.isEmpty(), is(true));
}

public void testWriteLargeBlobStreaming() throws Exception {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests could be dried up more against the existing large blob write tests, but this is already quite the complicated PR since it touches all the plugins so I figured it best to clean up more in a follow-up and not touch too much of the existing code (I did dry things up in the production code a little more to make it easier to follow that this doesn't introduce much new interaction with the SDKs though).

final int maxRetries = randomIntBetween(2, 5);

final int blobSize = (int) ByteSizeUnit.MB.toBytes(10);
final byte[] data = randomBytes(blobSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could generate less random bytes and repeat them instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ certainly, lets do that in a follow-up though, then we can dry this up with the other test that already uses a large array as well in one go?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

int nbBlocks = (int) Math.ceil((double) data.length / (double) ByteSizeUnit.MB.toBytes(1));

final int nbErrors = 2; // we want all requests to fail at least once
final AtomicInteger countDownUploads = new AtomicInteger(nbErrors * nbBlocks);
final AtomicLong bytesReceived = new AtomicLong(0L);
final CountDown countDownComplete = new CountDown(nbErrors);

final Map<String, BytesReference> blocks = new ConcurrentHashMap<>();
httpServer.createContext("/account/container/write_large_blob", exchange -> {

if ("PUT".equals(exchange.getRequestMethod())) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getRawQuery(), 0, params);

final String blockId = params.get("blockid");
assert Strings.hasText(blockId) == false || AzureFixtureHelper.assertValidBlockId(blockId);

if (Strings.hasText(blockId) && (countDownUploads.decrementAndGet() % 2 == 0)) {
final BytesReference blockData = Streams.readFully(exchange.getRequestBody());
blocks.put(blockId, blockData);
bytesReceived.addAndGet(blockData.length());
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
exchange.close();
return;
}

final String complete = params.get("comp");
if ("blocklist".equals(complete) && (countDownComplete.countDown())) {
final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8));
final List<String> blockUids = Arrays.stream(blockList.split("<Latest>"))
.filter(line -> line.contains("</Latest>"))
.map(line -> line.substring(0, line.indexOf("</Latest>")))
.collect(Collectors.toList());

final ByteArrayOutputStream blob = new ByteArrayOutputStream();
for (String blockUid : blockUids) {
BytesReference block = blocks.remove(blockUid);
assert block != null;
block.writeTo(blob);
}
assertArrayEquals(data, blob.toByteArray());
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
exchange.close();
return;
}
}

if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody());
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
}
exchange.close();
});

final BlobContainer blobContainer = createBlobContainer(maxRetries);
blobContainer.writeBlob("write_large_blob", false, randomBoolean(), out -> {
int outstanding = data.length;
while (outstanding > 0) {
if (randomBoolean()) {
int toWrite = Math.toIntExact(Math.min(randomIntBetween(64, data.length), outstanding));
out.write(data, data.length - outstanding, toWrite);
outstanding -= toWrite;
} else {
out.write(data[data.length - outstanding]);
outstanding--;
}
}
});
assertEquals(blobSize, bytesReceived.get());
}

public void testRetryUntilFail() throws Exception {
final int maxRetries = randomIntBetween(2, 5);
final AtomicInteger requestsReceived = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.CheckedConsumer;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;

Expand Down Expand Up @@ -76,6 +78,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
}

@Override
public void writeBlob(String blobName,
boolean failIfAlreadyExists,
boolean atomic,
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
}

@Override
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, bytes, failIfAlreadyExists);
Expand Down
Loading