Skip to content

Commit 61e0fb1

Browse files
authored
Mulitpart download workaround (#4945)
* Make Transfer Manager work by default with MultipartS3AsyncClient.
1 parent e715bb6 commit 61e0fb1

File tree

5 files changed

+114
-22
lines changed

5 files changed

+114
-22
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Make Transfer Manager work by default with S3AsyncClient when multipart configuration is enabled."
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Enable multipart configuration by default when creating a new S3TranferManager instance using the .create() method"
6+
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java

+29-21
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import software.amazon.awssdk.core.exception.SdkClientException;
3434
import software.amazon.awssdk.core.exception.SdkException;
3535
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
36+
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient;
3637
import software.amazon.awssdk.services.s3.S3AsyncClient;
3738
import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient;
3839
import software.amazon.awssdk.services.s3.internal.resource.S3AccessPointResource;
@@ -110,11 +111,11 @@ class GenericS3TransferManager implements S3TransferManager {
110111
}
111112

112113
@SdkTestInternalApi
113-
GenericS3TransferManager(S3AsyncClient s3CrtAsyncClient,
114+
GenericS3TransferManager(S3AsyncClient s3AsyncClient,
114115
UploadDirectoryHelper uploadDirectoryHelper,
115116
TransferManagerConfiguration configuration,
116117
DownloadDirectoryHelper downloadDirectoryHelper) {
117-
this.s3AsyncClient = s3CrtAsyncClient;
118+
this.s3AsyncClient = s3AsyncClient;
118119
this.isDefaultS3AsyncClient = false;
119120
this.transferConfiguration = configuration;
120121
this.uploadDirectoryHelper = uploadDirectoryHelper;
@@ -138,13 +139,13 @@ public Upload upload(UploadRequest uploadRequest) {
138139
try {
139140
assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload");
140141

141-
CompletableFuture<PutObjectResponse> crtFuture =
142+
CompletableFuture<PutObjectResponse> future =
142143
s3AsyncClient.putObject(uploadRequest.putObjectRequest(), requestBody);
143144

144-
// Forward upload cancellation to CRT future
145-
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);
145+
// Forward upload cancellation to future
146+
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
146147

147-
CompletableFutureUtils.forwardTransformedResultTo(crtFuture, returnFuture,
148+
CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture,
148149
r -> CompletedUpload.builder()
149150
.response(r)
150151
.build());
@@ -298,13 +299,12 @@ public <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downloadReq
298299
try {
299300
assertNotUnsupportedArn(downloadRequest.getObjectRequest().bucket(), "download");
300301

301-
CompletableFuture<ResultT> crtFuture =
302-
s3AsyncClient.getObject(downloadRequest.getObjectRequest(), responseTransformer);
302+
CompletableFuture<ResultT> future = doGetObject(downloadRequest.getObjectRequest(), responseTransformer);
303303

304-
// Forward download cancellation to CRT future
305-
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);
304+
// Forward download cancellation to future
305+
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
306306

307-
CompletableFutureUtils.forwardTransformedResultTo(crtFuture, returnFuture,
307+
CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture,
308308
r -> CompletedDownload.builder()
309309
.result(r)
310310
.build());
@@ -341,14 +341,12 @@ private TransferProgressUpdater doDownloadFile(
341341

342342
assertNotUnsupportedArn(downloadRequest.getObjectRequest().bucket(), "download");
343343

344-
CompletableFuture<GetObjectResponse> crtFuture =
345-
s3AsyncClient.getObject(downloadRequest.getObjectRequest(),
346-
responseTransformer);
344+
CompletableFuture<GetObjectResponse> future = doGetObject(downloadRequest.getObjectRequest(), responseTransformer);
347345

348-
// Forward download cancellation to CRT future
349-
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);
346+
// Forward download cancellation to future
347+
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
350348

351-
CompletableFutureUtils.forwardTransformedResultTo(crtFuture, returnFuture,
349+
CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture,
352350
res -> CompletedFileDownload.builder()
353351
.response(res)
354352
.build());
@@ -450,13 +448,13 @@ public Copy copy(CopyRequest copyRequest) {
450448
assertNotUnsupportedArn(copyRequest.copyObjectRequest().sourceBucket(), "copy sourceBucket");
451449
assertNotUnsupportedArn(copyRequest.copyObjectRequest().destinationBucket(), "copy destinationBucket");
452450

453-
CompletableFuture<CopyObjectResponse> crtFuture =
451+
CompletableFuture<CopyObjectResponse> future =
454452
s3AsyncClient.copyObject(copyRequest.copyObjectRequest());
455453

456-
// Forward transfer cancellation to CRT future
457-
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);
454+
// Forward transfer cancellation to future
455+
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
458456

459-
CompletableFutureUtils.forwardTransformedResultTo(crtFuture, returnFuture,
457+
CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture,
460458
r -> CompletedCopy.builder()
461459
.response(r)
462460
.build());
@@ -511,4 +509,14 @@ private static boolean isMrapArn(Arn arn) {
511509

512510
return !s3EndpointResource.region().isPresent();
513511
}
512+
513+
// TODO remove once MultipartS3AsyncClient is complete
514+
private <ResultT> CompletableFuture<ResultT> doGetObject(
515+
GetObjectRequest getObjectRequest, AsyncResponseTransformer<GetObjectResponse, ResultT> asyncResponseTransformer) {
516+
S3AsyncClient clientToUse = s3AsyncClient;
517+
if (s3AsyncClient instanceof MultipartS3AsyncClient) {
518+
clientToUse = (S3AsyncClient) ((DelegatingS3AsyncClient) s3AsyncClient).delegate();
519+
}
520+
return clientToUse.getObject(getObjectRequest, asyncResponseTransformer);
521+
}
514522
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerFactory.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import software.amazon.awssdk.core.internal.util.ClassLoaderHelper;
2222
import software.amazon.awssdk.services.s3.S3AsyncClient;
2323
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
24+
import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient;
2425
import software.amazon.awssdk.transfer.s3.S3TransferManager;
2526
import software.amazon.awssdk.utils.Logger;
2627

@@ -56,6 +57,10 @@ public static S3TransferManager createTransferManager(DefaultBuilder tmBuilder)
5657
log.warn(() -> "The provided DefaultS3AsyncClient is not an instance of S3CrtAsyncClient, and thus multipart"
5758
+ " upload/download feature is not enabled and resumable file upload is not supported. To benefit "
5859
+ "from maximum throughput, consider using S3AsyncClient.crtBuilder().build() instead.");
60+
} else if (s3AsyncClient instanceof MultipartS3AsyncClient) {
61+
log.warn(() -> "The provided S3AsyncClient is an instance of MultipartS3AsyncClient, and thus multipart"
62+
+ " download feature is not enabled. To benefit from all features, "
63+
+ "consider using S3AsyncClient.crtBuilder().build() instead.");
5964
} else {
6065
log.debug(() -> "The provided S3AsyncClient is not an instance of S3CrtAsyncClient, and thus multipart"
6166
+ " upload/download feature may not be enabled and resumable file upload may not be supported.");
@@ -68,7 +73,7 @@ private static Supplier<S3AsyncClient> defaultS3AsyncClient() {
6873
if (crtInClasspath()) {
6974
return S3AsyncClient::crtCreate;
7075
}
71-
return S3AsyncClient::create;
76+
return S3AsyncClient.builder().multipartEnabled(true)::build;
7277
}
7378

7479
private static boolean crtInClasspath() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.transfer.s3.internal;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.when;
22+
23+
import java.nio.file.Paths;
24+
import java.util.concurrent.CompletableFuture;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
28+
import software.amazon.awssdk.services.s3.S3AsyncClient;
29+
import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient;
30+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
31+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
32+
import software.amazon.awssdk.services.s3.multipart.MultipartConfiguration;
33+
import software.amazon.awssdk.transfer.s3.S3TransferManager;
34+
import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload;
35+
36+
class MultipartDownloadJavaBasedTest {
37+
private S3AsyncClient mockDelegate;
38+
private MultipartS3AsyncClient s3Multi;
39+
private S3TransferManager tm;
40+
private UploadDirectoryHelper uploadDirectoryHelper;
41+
private DownloadDirectoryHelper downloadDirectoryHelper;
42+
private TransferManagerConfiguration configuration;
43+
44+
@BeforeEach
45+
public void methodSetup() {
46+
mockDelegate = mock(S3AsyncClient.class);
47+
s3Multi = MultipartS3AsyncClient.create(mockDelegate, MultipartConfiguration.builder().build());
48+
uploadDirectoryHelper = mock(UploadDirectoryHelper.class);
49+
configuration = mock(TransferManagerConfiguration.class);
50+
downloadDirectoryHelper = mock(DownloadDirectoryHelper.class);
51+
tm = new GenericS3TransferManager(s3Multi, uploadDirectoryHelper, configuration, downloadDirectoryHelper);
52+
}
53+
54+
@Test
55+
void usingMultipartDownload_shouldNotThrowException() {
56+
GetObjectResponse response = GetObjectResponse.builder().build();
57+
when(mockDelegate.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class)))
58+
.thenReturn(CompletableFuture.completedFuture(response));
59+
60+
CompletedFileDownload completedFileDownload = tm.downloadFile(d -> d.getObjectRequest(g -> g.bucket("bucket")
61+
.key("key"))
62+
.destination(Paths.get(".")))
63+
.completionFuture()
64+
.join();
65+
assertThat(completedFileDownload.response()).isEqualTo(response);
66+
}
67+
}

0 commit comments

Comments
 (0)