Skip to content

Commit 3819fcb

Browse files
Add Ability to Write a BytesReference to BlobContainer (elastic#66501)
Except when writing actual segment files to the blob store we always write `BytesReference` instead of a stream. Only having the stream API available forces needless copies on us. I fixed the straight-forward needless copying for HDFS and FS repos in this PR, we could do similar fixes for GCS and Azure as well and thus significantly reduce the peak memory use of these writes on master nodes in particular.
1 parent a5dc382 commit 3819fcb

File tree

18 files changed

+117
-75
lines changed

18 files changed

+117
-75
lines changed

modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.blobstore.BlobPath;
2626
import org.elasticsearch.common.blobstore.DeleteResult;
2727
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
28+
import org.elasticsearch.common.bytes.BytesReference;
2829

2930
import java.io.BufferedInputStream;
3031
import java.io.FileNotFoundException;
@@ -132,7 +133,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
132133
}
133134

134135
@Override
135-
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
136+
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
136137
throw new UnsupportedOperationException("URL repository doesn't support this operation");
137138
}
138139

plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.common.blobstore.BlobContainer;
2929
import org.elasticsearch.common.blobstore.BlobPath;
3030
import org.elasticsearch.common.blobstore.BlobStore;
31+
import org.elasticsearch.common.bytes.BytesArray;
3132
import org.elasticsearch.common.regex.Regex;
3233
import org.elasticsearch.common.settings.MockSecureSettings;
3334
import org.elasticsearch.common.settings.Settings;
@@ -36,9 +37,7 @@
3637
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
3738
import org.elasticsearch.rest.RestStatus;
3839

39-
import java.io.ByteArrayInputStream;
4040
import java.io.IOException;
41-
import java.io.InputStream;
4241
import java.nio.charset.StandardCharsets;
4342
import java.util.Base64;
4443
import java.util.Collection;
@@ -221,10 +220,8 @@ public void testLargeBlobCountDeletion() throws Exception {
221220
final BlobContainer container = store.blobContainer(new BlobPath());
222221
for (int i = 0; i < numberOfBlobs; i++) {
223222
byte[] bytes = randomBytes(randomInt(100));
224-
try (InputStream inputStream = new ByteArrayInputStream(bytes)) {
225-
String blobName = randomAlphaOfLength(10);
226-
container.writeBlob(blobName, inputStream, bytes.length, false);
227-
}
223+
String blobName = randomAlphaOfLength(10);
224+
container.writeBlob(blobName, new BytesArray(bytes), false);
228225
}
229226

230227
container.delete();

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.common.blobstore.BlobPath;
3030
import org.elasticsearch.common.blobstore.DeleteResult;
3131
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
32+
import org.elasticsearch.common.bytes.BytesReference;
3233

3334
import java.io.IOException;
3435
import java.io.InputStream;
@@ -101,8 +102,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
101102
}
102103

103104
@Override
104-
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
105-
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
105+
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
106+
writeBlob(blobName, bytes, failIfAlreadyExists);
106107
}
107108

108109
@Override

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.blobstore.BlobStoreException;
2626
import org.elasticsearch.common.blobstore.DeleteResult;
2727
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
28+
import org.elasticsearch.common.bytes.BytesReference;
2829

2930
import java.io.IOException;
3031
import java.io.InputStream;
@@ -83,8 +84,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
8384
}
8485

8586
@Override
86-
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
87-
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
87+
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
88+
writeBlob(blobName, bytes, failIfAlreadyExists);
8889
}
8990

9091
@Override

plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
3535
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
3636
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
37+
import org.elasticsearch.common.bytes.BytesReference;
3738
import org.elasticsearch.common.io.Streams;
3839
import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;
3940

@@ -150,12 +151,28 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
150151
}
151152

152153
@Override
153-
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
154+
public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
155+
Path blob = new Path(path, blobName);
156+
// we pass CREATE, which means it fails if a blob already exists.
157+
final EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)
158+
: EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
159+
store.execute((Operation<Void>) fileContext -> {
160+
try {
161+
writeToPath(bytes, blob, fileContext, flags);
162+
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
163+
throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
164+
}
165+
return null;
166+
});
167+
}
168+
169+
@Override
170+
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
154171
final String tempBlob = FsBlobContainer.tempBlobName(blobName);
155172
final Path tempBlobPath = new Path(path, tempBlob);
156173
final Path blob = new Path(path, blobName);
157174
store.execute((Operation<Void>) fileContext -> {
158-
writeToPath(inputStream, blobSize, fileContext, tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK));
175+
writeToPath(bytes, tempBlobPath, fileContext, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK));
159176
try {
160177
fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE);
161178
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
@@ -165,6 +182,13 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
165182
});
166183
}
167184

185+
private void writeToPath(BytesReference bytes, Path blobPath, FileContext fileContext,
186+
EnumSet<CreateFlag> createFlags) throws IOException {
187+
try (FSDataOutputStream stream = fileContext.create(blobPath, createFlags)) {
188+
bytes.writeTo(stream);
189+
}
190+
}
191+
168192
private void writeToPath(InputStream inputStream, long blobSize, FileContext fileContext, Path blobPath,
169193
EnumSet<CreateFlag> createFlags) throws IOException {
170194
final byte[] buffer = new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize];

plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.elasticsearch.threadpool.ThreadPool;
5555

5656
import java.io.IOException;
57-
import java.io.InputStream;
5857
import java.util.ArrayList;
5958
import java.util.Collection;
6059
import java.util.Collections;
@@ -158,12 +157,9 @@ public void testEnforcedCooldownPeriod() throws IOException {
158157
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
159158
final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(),
160159
SnapshotsService.OLD_SNAPSHOT_FORMAT));
161-
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
162-
try (InputStream stream = serialized.streamInput()) {
160+
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () ->
163161
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
164-
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true);
165-
}
166-
})));
162+
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), serialized, true))));
167163

168164
final String newSnapshotName = "snapshot-new";
169165
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.common.blobstore.DeleteResult;
4747
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
4848
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
49+
import org.elasticsearch.common.bytes.BytesReference;
4950
import org.elasticsearch.common.collect.Tuple;
5051
import org.elasticsearch.common.unit.ByteSizeUnit;
5152
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -143,8 +144,8 @@ long getLargeBlobThresholdInBytes() {
143144
}
144145

145146
@Override
146-
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
147-
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
147+
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
148+
writeBlob(blobName, bytes, failIfAlreadyExists);
148149
}
149150

150151
@Override

server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
2323
import org.elasticsearch.action.support.PlainActionFuture;
2424
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
25+
import org.elasticsearch.common.bytes.BytesArray;
2526
import org.elasticsearch.common.settings.Settings;
2627
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
2728
import org.elasticsearch.snapshots.SnapshotState;
2829
import org.elasticsearch.test.ESIntegTestCase;
2930

30-
import java.io.ByteArrayInputStream;
3131
import java.util.concurrent.ExecutionException;
3232

3333
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
@@ -85,7 +85,7 @@ private String startBlockedCleanup(String repoName) throws Exception {
8585
logger.info("--> creating a garbage data blob");
8686
final PlainActionFuture<Void> garbageFuture = PlainActionFuture.newFuture();
8787
repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore()
88-
.blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true)));
88+
.blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new BytesArray(new byte[1]), true)));
8989
garbageFuture.get();
9090

9191
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
@@ -120,7 +120,7 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti
120120
final int generation = i;
121121
repository.threadPool().generic().execute(ActionRunnable.run(createOldIndexNFuture, () -> repository.blobStore()
122122
.blobContainer(repository.basePath()).writeBlob(BlobStoreRepository.INDEX_FILE_PREFIX + generation,
123-
new ByteArrayInputStream(new byte[1]), 1, true)));
123+
new BytesArray(new byte[1]), true)));
124124
createOldIndexNFuture.get();
125125
}
126126

server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.common.blobstore;
2121

22+
import org.elasticsearch.common.bytes.BytesReference;
23+
2224
import java.io.IOException;
2325
import java.io.InputStream;
2426
import java.nio.file.FileAlreadyExistsException;
@@ -110,25 +112,35 @@ default long readBlobPreferredLength() {
110112
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
111113

112114
/**
113-
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
114-
* using an atomic write operation if the implementation supports it.
115+
* Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name.
115116
*
116-
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
117-
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
117+
* @param blobName
118+
* The name of the blob to write the contents of the input stream to.
119+
* @param bytes
120+
* The bytes to write
121+
* @param failIfAlreadyExists
122+
* whether to throw a FileAlreadyExistsException if the given blob already exists
123+
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
124+
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
125+
*/
126+
default void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
127+
writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);
128+
}
129+
130+
/**
131+
* Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name,
132+
* using an atomic write operation if the implementation supports it.
118133
*
119134
* @param blobName
120135
* The name of the blob to write the contents of the input stream to.
121-
* @param inputStream
122-
* The input stream from which to retrieve the bytes to write to the blob.
123-
* @param blobSize
124-
* The size of the blob to be written, in bytes. It is implementation dependent whether
125-
* this value is used in writing the blob to the repository.
136+
* @param bytes
137+
* The bytes to write
126138
* @param failIfAlreadyExists
127139
* whether to throw a FileAlreadyExistsException if the given blob already exists
128140
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
129141
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
130142
*/
131-
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
143+
void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException;
132144

133145
/**
134146
* Deletes this container and all its contents from the repository.

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.blobstore.DeleteResult;
2727
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
2828
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
29+
import org.elasticsearch.common.bytes.BytesReference;
2930
import org.elasticsearch.common.io.Streams;
3031
import org.elasticsearch.core.internal.io.IOUtils;
3132

@@ -190,12 +191,26 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
190191
}
191192

192193
@Override
193-
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists)
194-
throws IOException {
194+
public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
195+
final Path file = path.resolve(blobName);
196+
try {
197+
writeToPath(bytes, file);
198+
} catch (FileAlreadyExistsException faee) {
199+
if (failIfAlreadyExists) {
200+
throw faee;
201+
}
202+
deleteBlobsIgnoringIfNotExists(Collections.singletonList(blobName));
203+
writeToPath(bytes, file);
204+
}
205+
IOUtils.fsync(path, true);
206+
}
207+
208+
@Override
209+
public void writeBlobAtomic(final String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
195210
final String tempBlob = tempBlobName(blobName);
196211
final Path tempBlobPath = path.resolve(tempBlob);
197212
try {
198-
writeToPath(inputStream, tempBlobPath, blobSize);
213+
writeToPath(bytes, tempBlobPath);
199214
moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists);
200215
} catch (IOException ex) {
201216
try {
@@ -209,6 +224,13 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream
209224
}
210225
}
211226

227+
private void writeToPath(BytesReference bytes, Path tempBlobPath) throws IOException {
228+
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
229+
bytes.writeTo(outputStream);
230+
}
231+
IOUtils.fsync(tempBlobPath, false);
232+
}
233+
212234
private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
213235
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
214236
final int bufferSize = blobStore.bufferSizeInBytes();

server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.blobstore.BlobMetadata;
2424
import org.elasticsearch.common.blobstore.BlobPath;
2525
import org.elasticsearch.common.blobstore.DeleteResult;
26+
import org.elasticsearch.common.bytes.BytesReference;
2627

2728
import java.io.IOException;
2829
import java.io.InputStream;
@@ -72,8 +73,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
7273
}
7374

7475
@Override
75-
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
76-
delegate.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists);
76+
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
77+
delegate.writeBlobAtomic(blobName, bytes, failIfAlreadyExists);
7778
}
7879

7980
@Override

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,9 +1273,7 @@ public String startVerification() {
12731273
byte[] testBytes = Strings.toUTF8Bytes(seed);
12741274
BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
12751275
BytesArray bytes = new BytesArray(testBytes);
1276-
try (InputStream stream = bytes.streamInput()) {
1277-
testContainer.writeBlobAtomic("master.dat", stream, bytes.length(), true);
1278-
}
1276+
testContainer.writeBlobAtomic("master.dat", new BytesArray(testBytes), true);
12791277
return seed;
12801278
}
12811279
} catch (Exception exp) {
@@ -1880,11 +1878,9 @@ private long latestGeneration(Collection<String> rootBlobs) {
18801878

18811879
private void writeAtomic(BlobContainer container, final String blobName, final BytesReference bytesRef,
18821880
boolean failIfAlreadyExists) throws IOException {
1883-
try (InputStream stream = bytesRef.streamInput()) {
1884-
logger.trace(() ->
1885-
new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path()));
1886-
container.writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists);
1887-
}
1881+
logger.trace(() ->
1882+
new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path()));
1883+
container.writeBlobAtomic(blobName, bytesRef, failIfAlreadyExists);
18881884
}
18891885

18901886
@Override
@@ -2291,10 +2287,7 @@ public void verify(String seed, DiscoveryNode localNode) {
22912287
} else {
22922288
BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
22932289
try {
2294-
BytesArray bytes = new BytesArray(seed);
2295-
try (InputStream stream = bytes.streamInput()) {
2296-
testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", stream, bytes.length(), true);
2297-
}
2290+
testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed), true);
22982291
} catch (Exception exp) {
22992292
throw new RepositoryVerificationException(metadata.name(), "store location [" + blobStore() +
23002293
"] is not accessible on the node [" + localNode + "]", exp);

0 commit comments

Comments
 (0)