Skip to content

Draft for testing: Supply Content-MD5 to S3 uploads #117863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .buildkite/pipelines/pull-request/s3.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
steps:
- label: third-party / s3
command: |
export amazon_s3_bucket=elasticsearch-ci.us-west-2
export amazon_s3_base_path=$BUILDKITE_BRANCH

.ci/scripts/run-gradle.sh s3ThirdPartyTest
env:
USE_3RD_PARTY_S3_CREDENTIALS: "true"
timeout_in_minutes: 30
agents:
provider: gcp
image: family/elasticsearch-ubuntu-2004
machineType: n2-standard-8
buildDirectory: /dev/shm/bk
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ protected Settings nodeSettings() {
protected void createRepository(String repoName) {
Settings.Builder settings = Settings.builder()
.put("bucket", System.getProperty("test.s3.bucket"))
.put("base_path", System.getProperty("test.s3.base", "testpath"));
.put("base_path", System.getProperty("test.s3.base", "testpath"))
.put("buffer_size", S3Repository.MIN_PART_SIZE_USING_MULTIPART);
final String endpoint = USE_FIXTURE ? minio.getAddress() : System.getProperty("test.s3.endpoint");
if (endpoint != null) {
settings.put("endpoint", endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -189,13 +192,18 @@ private void flushBuffer(boolean lastPart) throws IOException {
}
}
assert lastPart == false || successful : "must only write last part if successful";
final var byteStream = buffer.bytes().streamInput();
byteStream.mark(0);
final var md5 = md5DigestOfInputStream(byteStream, Long.MAX_VALUE);
byteStream.reset();
final UploadPartRequest uploadRequest = createPartUploadRequest(
purpose,
buffer.bytes().streamInput(),
byteStream,
uploadId.get(),
parts.size() + 1,
absoluteBlobKey,
buffer.size(),
md5,
lastPart
);
final UploadPartResult uploadResponse;
Expand Down Expand Up @@ -260,6 +268,7 @@ private UploadPartRequest createPartUploadRequest(
int number,
String blobName,
long size,
byte[] partMd5,
boolean lastPart
) {
final UploadPartRequest uploadRequest = new UploadPartRequest();
Expand All @@ -270,6 +279,7 @@ private UploadPartRequest createPartUploadRequest(
uploadRequest.setInputStream(stream);
S3BlobStore.configureRequestForMetrics(uploadRequest, blobStore, Operation.PUT_MULTIPART_OBJECT, purpose);
uploadRequest.setPartSize(size);
uploadRequest.setMd5Digest(Base64.getEncoder().encodeToString(partMd5));
uploadRequest.setLastPart(lastPart);
return uploadRequest;
}
Expand Down Expand Up @@ -457,8 +467,18 @@ void executeSingleUpload(
if (blobSize > s3BlobStore.bufferSizeInBytes()) {
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size");
}
// required to reset the stream for MD5 calculation
if (input.markSupported() == false) {
throw new IllegalArgumentException("input stream mark not supported");
}

final ObjectMetadata md = new ObjectMetadata();

input.mark(0);
final byte[] md5 = md5DigestOfInputStream(input, Long.MAX_VALUE);
input.reset();
md.setContentMD5(Base64.getEncoder().encodeToString(md5));

md.setContentLength(blobSize);
if (s3BlobStore.serverSideEncryption()) {
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
Expand All @@ -485,7 +505,6 @@ void executeMultipartUpload(
final InputStream input,
final long blobSize
) throws IOException {

ensureMultiPartUploadSize(blobSize);
final long partSize = s3BlobStore.bufferSizeInBytes();
final Tuple<Long, Long> multiparts = numberOfMultiparts(blobSize, partSize);
Expand All @@ -498,6 +517,19 @@ void executeMultipartUpload(
final long lastPartSize = multiparts.v2();
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";

// required to reset the stream for MD5 calculation
if (input.markSupported() == false) {
throw new IllegalArgumentException("input stream mark not supported");
}

final byte[][] md5s = new byte[nbParts][];
input.mark(0);
for (int i = 0; i < nbParts - 1; i++) {
md5s[i] = md5DigestOfInputStream(input, partSize);
}
md5s[nbParts - 1] = md5DigestOfInputStream(input, lastPartSize);
input.reset();

final SetOnce<String> uploadId = new SetOnce<>();
final String bucketName = s3BlobStore.bucket();
boolean success = false;
Expand Down Expand Up @@ -525,6 +557,7 @@ void executeMultipartUpload(
i,
blobName,
lastPart ? lastPartSize : partSize,
md5s[i - 1],
lastPart
);
bytesCount += uploadRequest.getPartSize();
Expand Down Expand Up @@ -564,6 +597,28 @@ void executeMultipartUpload(
}
}

// Calculate the MD5 of up to remaining bytes of the given InputStream
private byte[] md5DigestOfInputStream(final InputStream inputStream, long remaining) throws IOException {
try {
final MessageDigest md5 = MessageDigest.getInstance("MD5");
// update in chunks to bound memory usage while amortizing read cost
byte[] buffer = new byte[65536];
int bytesRead;
do {
final int toRead = (int) Math.min(remaining, buffer.length);
bytesRead = inputStream.read(buffer, 0, toRead);
if (bytesRead > 0) {
md5.update(buffer, 0, bytesRead);
remaining -= bytesRead;
}
} while (bytesRead > 0);

return md5.digest();
} catch (NoSuchAlgorithmException e) {
throw new IOException(e);
}
}

// non-static, package private for testing
void ensureMultiPartUploadSize(final long blobSize) {
if (blobSize > MAX_FILE_SIZE_USING_MULTIPART.getBytes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void testExecuteMultipartUpload() throws IOException {
final ArgumentCaptor<CompleteMultipartUploadRequest> compArgCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class);
when(client.completeMultipartUpload(compArgCaptor.capture())).thenReturn(new CompleteMultipartUploadResult());

final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]);
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[1]);
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,12 @@ public void testIndexLatest() throws Exception {

public void testReadFromPositionWithLength() {
final var blobName = randomIdentifier();
final var blobBytes = randomBytesReference(randomIntBetween(100, 2_000));
// forcing multipart temporarily
final var blobBytes = randomBytesReference(randomIntBetween(25 * 1024 * 1024 + 100, 25 * 1024 * 1024 + 2_000));

final var repository = getRepository();
logger.info("---> uploading blob of size {}", blobBytes.length());
logger.info("repository buffer size: {}", repository.getReadBufferSizeInBytes());
executeOnBlobStore(repository, blobStore -> {
blobStore.writeBlob(randomPurpose(), blobName, blobBytes, true);
return null;
Expand Down