Skip to content

Commit ffeafae

Browse files
Save Memory on Large Repository Metadata Blob Writes (#74693)
This PR adds a new API for doing streaming serialization writes to a repository to enable repository metadata of arbitrary size and at bounded memory during writing. The existing write-APIs require knowledge of the eventual blob size beforehand. This forced us to materialize the serialized blob in memory before writing, costing a lot of memory in case of e.g. very large RepositoryData (and limiting us to 2G max blob size). With this PR the requirement to fully materialize the serialized metadata goes away and the memory overhead becomes completely bounded by the outbound buffer size of the repository implementation. As we move to larger repositories this makes master node stability a lot more predictable since writing out RepositoryData does not take as much memory any longer (same applies to shard level metadata), enables aggregating multiple metadata blobs into a single larger blobs without massive overhead and removes the 2G size limit on RepositoryData. backport of #74313 and #74620
1 parent f21fad6 commit ffeafae

File tree

29 files changed

+1069
-190
lines changed

29 files changed

+1069
-190
lines changed

modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java

+10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.common.blobstore.url;
1010

11+
import org.elasticsearch.core.CheckedConsumer;
1112
import org.elasticsearch.core.SuppressForbidden;
1213
import org.elasticsearch.common.blobstore.BlobContainer;
1314
import org.elasticsearch.common.blobstore.BlobMetadata;
@@ -20,6 +21,7 @@
2021
import java.io.FileNotFoundException;
2122
import java.io.IOException;
2223
import java.io.InputStream;
24+
import java.io.OutputStream;
2325
import java.net.URL;
2426
import java.nio.file.NoSuchFileException;
2527
import java.security.AccessController;
@@ -121,6 +123,14 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
121123
throw new UnsupportedOperationException("URL repository doesn't support this operation");
122124
}
123125

126+
@Override
127+
public void writeBlob(String blobName,
128+
boolean failIfAlreadyExists,
129+
boolean atomic,
130+
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
131+
throw new UnsupportedOperationException("URL repository doesn't support this operation");
132+
}
133+
124134
@Override
125135
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
126136
throw new UnsupportedOperationException("URL repository doesn't support this operation");

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

+10
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
1414
import org.apache.logging.log4j.core.util.Throwables;
15+
import org.elasticsearch.core.CheckedConsumer;
1516
import org.elasticsearch.core.Nullable;
1617
import org.elasticsearch.common.blobstore.BlobContainer;
1718
import org.elasticsearch.common.blobstore.BlobMetadata;
@@ -22,6 +23,7 @@
2223

2324
import java.io.IOException;
2425
import java.io.InputStream;
26+
import java.io.OutputStream;
2527
import java.nio.file.NoSuchFileException;
2628
import java.util.Iterator;
2729
import java.util.Map;
@@ -99,6 +101,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
99101
blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
100102
}
101103

104+
@Override
105+
public void writeBlob(String blobName,
106+
boolean failIfAlreadyExists,
107+
boolean atomic,
108+
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
109+
blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
110+
}
111+
102112
@Override
103113
public DeleteResult delete() throws IOException {
104114
return blobStore.deleteBlobDirectory(keyPath);

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

+59-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.logging.log4j.Logger;
3434
import org.apache.logging.log4j.message.ParameterizedMessage;
3535
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
36+
import org.elasticsearch.common.util.BigArrays;
37+
import org.elasticsearch.core.CheckedConsumer;
3638
import org.elasticsearch.core.Nullable;
3739
import org.elasticsearch.common.UUIDs;
3840
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -46,13 +48,15 @@
4648
import org.elasticsearch.common.unit.ByteSizeUnit;
4749
import org.elasticsearch.common.unit.ByteSizeValue;
4850
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
51+
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
4952
import reactor.core.publisher.Flux;
5053
import reactor.core.publisher.Mono;
5154
import reactor.core.scheduler.Schedulers;
5255

5356
import java.io.FilterInputStream;
5457
import java.io.IOException;
5558
import java.io.InputStream;
59+
import java.io.OutputStream;
5660
import java.net.HttpURLConnection;
5761
import java.net.URI;
5862
import java.net.URISyntaxException;
@@ -83,6 +87,8 @@ public class AzureBlobStore implements BlobStore {
8387

8488
private final AzureStorageService service;
8589

90+
private final BigArrays bigArrays;
91+
8692
private final String clientName;
8793
private final String container;
8894
private final LocationMode locationMode;
@@ -91,10 +97,11 @@ public class AzureBlobStore implements BlobStore {
9197
private final Stats stats = new Stats();
9298
private final BiConsumer<String, URL> statsConsumer;
9399

94-
public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service) {
100+
public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, BigArrays bigArrays) {
95101
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
96102
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
97103
this.service = service;
104+
this.bigArrays = bigArrays;
98105
// locationMode is set per repository, not per client
99106
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
100107
this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings());
@@ -384,6 +391,49 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
384391
executeSingleUpload(blobName, byteBufferFlux, bytes.length(), failIfAlreadyExists);
385392
}
386393

394+
public void writeBlob(String blobName,
395+
boolean failIfAlreadyExists,
396+
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
397+
final BlockBlobAsyncClient blockBlobAsyncClient = asyncClient().getBlobContainerAsyncClient(container)
398+
.getBlobAsyncClient(blobName).getBlockBlobAsyncClient();
399+
try (ChunkedBlobOutputStream<String> out = new ChunkedBlobOutputStream<String>(bigArrays, getUploadBlockSize()) {
400+
401+
@Override
402+
protected void flushBuffer() {
403+
if (buffer.size() == 0) {
404+
return;
405+
}
406+
final String blockId = makeMultipartBlockId();
407+
SocketAccess.doPrivilegedVoidException(() -> blockBlobAsyncClient.stageBlock(
408+
blockId,
409+
Flux.fromArray(BytesReference.toByteBuffers(buffer.bytes())),
410+
buffer.size()
411+
).block());
412+
finishPart(blockId);
413+
}
414+
415+
@Override
416+
protected void onCompletion() {
417+
if (flushedBytes == 0L) {
418+
writeBlob(blobName, buffer.bytes(), failIfAlreadyExists);
419+
} else {
420+
flushBuffer();
421+
SocketAccess.doPrivilegedVoidException(
422+
() -> blockBlobAsyncClient.commitBlockList(parts, failIfAlreadyExists == false).block());
423+
}
424+
}
425+
426+
@Override
427+
protected void onFailure() {
428+
// Nothing to do here, already uploaded blocks will be GCed by Azure after a week.
429+
// see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks
430+
}
431+
}) {
432+
writer.accept(out);
433+
out.markSuccess();
434+
}
435+
}
436+
387437
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
388438
assert inputStream.markSupported()
389439
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
@@ -440,13 +490,11 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo
440490
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";
441491

442492
final List<String> blockIds = new ArrayList<>(nbParts);
443-
final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
444-
final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder();
445493
for (int i = 0; i < nbParts; i++) {
446494
final long length = i < nbParts - 1 ? partSize : lastPartSize;
447495
Flux<ByteBuffer> byteBufferFlux = convertStreamToByteBuffer(inputStream, length, DEFAULT_UPLOAD_BUFFERS_SIZE);
448496

449-
final String blockId = base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID()));
497+
final String blockId = makeMultipartBlockId();
450498
blockBlobAsyncClient.stageBlock(blockId, byteBufferFlux, length).block();
451499
blockIds.add(blockId);
452500
}
@@ -455,6 +503,13 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo
455503
});
456504
}
457505

506+
private static final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
507+
private static final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder();
508+
509+
private String makeMultipartBlockId() {
510+
return base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID()));
511+
}
512+
458513
/**
459514
* Converts the provided input stream into a Flux of ByteBuffer. To avoid having large amounts of outstanding
460515
* memory this Flux reads the InputStream into ByteBuffers of {@code chunkSize} size.

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ protected BlobStore getBlobStore() {
119119

120120
@Override
121121
protected AzureBlobStore createBlobStore() {
122-
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
122+
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, bigArrays);
123123

124124
logger.debug(() -> new ParameterizedMessage(
125125
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",

plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java

+79-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import fixture.azure.AzureHttpHandler;
1616
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
1717
import org.elasticsearch.common.Strings;
18+
import org.elasticsearch.common.util.BigArrays;
1819
import org.elasticsearch.core.SuppressForbidden;
1920
import org.elasticsearch.common.UUIDs;
2021
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -58,6 +59,7 @@
5859
import java.util.concurrent.ConcurrentHashMap;
5960
import java.util.concurrent.TimeUnit;
6061
import java.util.concurrent.atomic.AtomicInteger;
62+
import java.util.concurrent.atomic.AtomicLong;
6163
import java.util.regex.Matcher;
6264
import java.util.regex.Pattern;
6365
import java.util.stream.Collectors;
@@ -172,7 +174,7 @@ int getMaxReadRetries(String clientName) {
172174
.put(MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
173175
.build());
174176

175-
return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service));
177+
return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service, BigArrays.NON_RECYCLING_INSTANCE));
176178
}
177179

178180
public void testReadNonexistentBlobThrowsNoSuchFileException() {
@@ -391,6 +393,82 @@ public void testWriteLargeBlob() throws Exception {
391393
assertThat(blocks.isEmpty(), is(true));
392394
}
393395

396+
public void testWriteLargeBlobStreaming() throws Exception {
397+
final int maxRetries = randomIntBetween(2, 5);
398+
399+
final int blobSize = (int) ByteSizeUnit.MB.toBytes(10);
400+
final byte[] data = randomBytes(blobSize);
401+
402+
final int nbErrors = 2; // we want all requests to fail at least once
403+
final AtomicInteger counterUploads = new AtomicInteger(0);
404+
final AtomicLong bytesReceived = new AtomicLong(0L);
405+
final CountDown countDownComplete = new CountDown(nbErrors);
406+
407+
final Map<String, BytesReference> blocks = new ConcurrentHashMap<>();
408+
httpServer.createContext("/account/container/write_large_blob_streaming", exchange -> {
409+
410+
if ("PUT".equals(exchange.getRequestMethod())) {
411+
final Map<String, String> params = new HashMap<>();
412+
RestUtils.decodeQueryString(exchange.getRequestURI().getRawQuery(), 0, params);
413+
414+
final String blockId = params.get("blockid");
415+
assert Strings.hasText(blockId) == false || AzureFixtureHelper.assertValidBlockId(blockId);
416+
417+
if (Strings.hasText(blockId) && (counterUploads.incrementAndGet() % 2 == 0)) {
418+
final BytesReference blockData = Streams.readFully(exchange.getRequestBody());
419+
blocks.put(blockId, blockData);
420+
bytesReceived.addAndGet(blockData.length());
421+
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
422+
exchange.close();
423+
return;
424+
}
425+
426+
final String complete = params.get("comp");
427+
if ("blocklist".equals(complete) && (countDownComplete.countDown())) {
428+
final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8));
429+
final List<String> blockUids = Arrays.stream(blockList.split("<Latest>"))
430+
.filter(line -> line.contains("</Latest>"))
431+
.map(line -> line.substring(0, line.indexOf("</Latest>")))
432+
.collect(Collectors.toList());
433+
434+
final ByteArrayOutputStream blob = new ByteArrayOutputStream();
435+
for (String blockUid : blockUids) {
436+
BytesReference block = blocks.remove(blockUid);
437+
assert block != null;
438+
block.writeTo(blob);
439+
}
440+
assertArrayEquals(data, blob.toByteArray());
441+
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
442+
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
443+
exchange.close();
444+
return;
445+
}
446+
}
447+
448+
if (randomBoolean()) {
449+
Streams.readFully(exchange.getRequestBody());
450+
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
451+
}
452+
exchange.close();
453+
});
454+
455+
final BlobContainer blobContainer = createBlobContainer(maxRetries);
456+
blobContainer.writeBlob("write_large_blob_streaming", false, randomBoolean(), out -> {
457+
int outstanding = data.length;
458+
while (outstanding > 0) {
459+
if (randomBoolean()) {
460+
int toWrite = Math.toIntExact(Math.min(randomIntBetween(64, data.length), outstanding));
461+
out.write(data, data.length - outstanding, toWrite);
462+
outstanding -= toWrite;
463+
} else {
464+
out.write(data[data.length - outstanding]);
465+
outstanding--;
466+
}
467+
}
468+
});
469+
assertEquals(blobSize, bytesReceived.get());
470+
}
471+
394472
public void testRetryUntilFail() throws Exception {
395473
final int maxRetries = randomIntBetween(2, 5);
396474
final AtomicInteger requestsReceived = new AtomicInteger(0);

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

+10
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
import org.elasticsearch.common.blobstore.DeleteResult;
1616
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
1717
import org.elasticsearch.common.bytes.BytesReference;
18+
import org.elasticsearch.core.CheckedConsumer;
1819

1920
import java.io.IOException;
2021
import java.io.InputStream;
22+
import java.io.OutputStream;
2123
import java.util.Iterator;
2224
import java.util.Map;
2325

@@ -76,6 +78,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea
7678
blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
7779
}
7880

81+
@Override
82+
public void writeBlob(String blobName,
83+
boolean failIfAlreadyExists,
84+
boolean atomic,
85+
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
86+
blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
87+
}
88+
7989
@Override
8090
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
8191
writeBlob(blobName, bytes, failIfAlreadyExists);

0 commit comments

Comments
 (0)