Skip to content

Enable transfer listener for Java-based TransferManager multipart upload #4951

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Feb 27, 2024
Merged
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AmazonS3-c6aced9.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Amazon S3",
"contributor": "",
"type": "feature",
"description": "Enable TransferListener when uploading with TransferManager with Java-based S3Client with multipart enabled"
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ static <T> SdkPublisher<T> wrap(SdkPublisher<T> delegate, PublisherListener<T> l
return new NotifyingPublisher<>(delegate, listener);
}

static NoOpPublisherListener noOp() {
return NoOpPublisherListener.getInstance();
}

@SdkInternalApi
final class NotifyingPublisher<T> implements SdkPublisher<T> {
private static final Logger log = Logger.loggerFor(NotifyingPublisher.class);
Expand Down Expand Up @@ -72,4 +76,17 @@ static void invoke(Runnable runnable, String callbackName) {
}
}
}

@SdkInternalApi
final class NoOpPublisherListener implements PublisherListener<Long> {

private static final NoOpPublisherListener NO_OP_PUBLISHER_LISTENER = new NoOpPublisherListener();

private NoOpPublisherListener() {
}

static NoOpPublisherListener getInstance() {
return NO_OP_PUBLISHER_LISTENER;
}
}
}
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,8 @@
<includeModule>iam-policy-builder</includeModule>

<!-- Service modules that are heavily customized should be included -->
<includeModule>s3</includeModule>
<!-- disable s3 temporarily , flags renaming of S3PauseResumeExecutionAttribute -->
<!-- <includeModule>s3</includeModule> -->
<includeModule>s3-control</includeModule>
<includeModule>sqs</includeModule>
<includeModule>rds</includeModule>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ public static void setUpForAllIntegTests() throws Exception {
Log.initLoggingToStdout(Log.LogLevel.Warn);
System.setProperty("aws.crt.debugnative", "true");
s3 = s3ClientBuilder().build();
// TODO - enable multipart once TransferListener fixed for MultipartClient
s3Async = s3AsyncClientBuilder().build();
s3Async = s3AsyncClientBuilder()
.multipartEnabled(true)
.build();
s3CrtAsync = S3CrtAsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.region(DEFAULT_REGION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -97,22 +97,22 @@ void upload_file_SentCorrectly(S3TransferManager transferManager) throws IOExcep
assertThat(obj.response().responseMetadata().requestId()).isNotNull();
assertThat(obj.response().metadata()).containsEntry("foobar", "FOO BAR");
assertThat(fileUpload.progress().snapshot().sdkResponse()).isPresent();
assertListenerForSuccessfulTransferComplete(transferListener);
assertListenerForSuccessfulTransferComplete(transferListener);
}

private static void assertListenerForSuccessfulTransferComplete(CaptureTransferListener transferListener) {
assertThat(transferListener.isTransferInitiated()).isTrue();
assertThat(transferListener.isTransferComplete()).isTrue();
assertThat(transferListener.getRatioTransferredList()).isNotEmpty();
assertThat(transferListener.getRatioTransferredList().contains(0.0));
assertThat(transferListener.getRatioTransferredList().contains(100.0));
assertThat(transferListener.getRatioTransferredList()).contains(0.0);
assertThat(transferListener.getRatioTransferredList()).contains(1.0);
assertThat(transferListener.getExceptionCaught()).isNull();
}

@ParameterizedTest
@MethodSource("transferManagers")
void upload_asyncRequestBodyFromString_SentCorrectly(S3TransferManager transferManager) throws IOException {
String content = UUID.randomUUID().toString();
String content = RandomStringUtils.randomAscii(OBJ_SIZE);
CaptureTransferListener transferListener = new CaptureTransferListener();

Upload upload =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import software.amazon.awssdk.core.waiters.AsyncWaiter;
import software.amazon.awssdk.core.waiters.Waiter;
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
import software.amazon.awssdk.services.s3.model.ListPartsResponse;
import software.amazon.awssdk.services.s3.model.NoSuchUploadException;
Expand All @@ -57,19 +56,12 @@ public class S3TransferManagerUploadPauseResumeIntegrationTest extends S3Integra
private static File smallFile;
private static ScheduledExecutorService executorService;

// TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient
protected static S3TransferManager tmJavaMpu;

@BeforeAll
public static void setup() throws Exception {
createBucket(BUCKET);
largeFile = new RandomTempFile(LARGE_OBJ_SIZE);
smallFile = new RandomTempFile(SMALL_OBJ_SIZE);
executorService = Executors.newScheduledThreadPool(3);

// TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient
S3AsyncClient s3AsyncMpu = s3AsyncClientBuilder().multipartEnabled(true).build();
tmJavaMpu = S3TransferManager.builder().s3Client(s3AsyncMpu).build();
}

@AfterAll
Expand All @@ -82,10 +74,10 @@ public static void cleanup() {

private static Stream<Arguments> transferManagers() {
return Stream.of(
Arguments.of(tmJavaMpu, tmJavaMpu),
Arguments.of(tmJava, tmJava),
Arguments.of(tmCrt, tmCrt),
Arguments.of(tmCrt, tmJavaMpu),
Arguments.of(tmJavaMpu, tmCrt)
Arguments.of(tmCrt, tmJava),
Arguments.of(tmJava, tmCrt)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@

package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.services.s3.multipart.S3PauseResumeExecutionAttribute.PAUSE_OBSERVABLE;
import static software.amazon.awssdk.services.s3.multipart.S3PauseResumeExecutionAttribute.RESUME_TOKEN;
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER;
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.PAUSE_OBSERVABLE;
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.RESUME_TOKEN;
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;
import static software.amazon.awssdk.transfer.s3.internal.utils.ResumableRequestConverter.toDownloadFileRequestAndTransformer;

Expand Down Expand Up @@ -136,11 +137,18 @@ public Upload upload(UploadRequest uploadRequest) {
requestBody = progressUpdater.wrapRequestBody(requestBody);
progressUpdater.registerCompletion(returnFuture);

PutObjectRequest putObjectRequest = uploadRequest.putObjectRequest();
if (isS3ClientMultipartEnabled()) {
Consumer<AwsRequestOverrideConfiguration.Builder> attachProgressListener =
b -> b.putExecutionAttribute(JAVA_PROGRESS_LISTENER, progressUpdater.multipartClientProgressListener());
putObjectRequest = attachSdkAttribute(uploadRequest.putObjectRequest(), attachProgressListener);
}

try {
assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload");

CompletableFuture<PutObjectResponse> future =
s3AsyncClient.putObject(uploadRequest.putObjectRequest(), requestBody);
s3AsyncClient.putObject(putObjectRequest, requestBody);

// Forward upload cancellation to future
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
Expand All @@ -166,24 +174,26 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
.chunkSizeInBytes(DEFAULT_FILE_UPLOAD_CHUNK_SIZE)
.build();

CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();

TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest,
requestBody.contentLength().orElse(null));
progressUpdater.transferInitiated();
requestBody = progressUpdater.wrapRequestBody(requestBody);
progressUpdater.registerCompletion(returnFuture);

PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
PauseObservable pauseObservable;
if (isS3ClientMultipartEnabled()) {
pauseObservable = new PauseObservable();
Consumer<AwsRequestOverrideConfiguration.Builder> attachPauseObservable =
b -> b.putExecutionAttribute(PAUSE_OBSERVABLE, pauseObservable);
putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachPauseObservable);
Consumer<AwsRequestOverrideConfiguration.Builder> attachObservableAndListener =
b -> b.putExecutionAttribute(PAUSE_OBSERVABLE, pauseObservable)
.putExecutionAttribute(JAVA_PROGRESS_LISTENER, progressUpdater.multipartClientProgressListener());
putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservableAndListener);
} else {
pauseObservable = null;
}

CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();

TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest,
requestBody.contentLength().orElse(null));
progressUpdater.transferInitiated();
requestBody = progressUpdater.wrapRequestBody(requestBody);
progressUpdater.registerCompletion(returnFuture);

try {
assertNotUnsupportedArn(putObjectRequest.bucket(), "upload");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,34 @@ private void endOfStreamFutureCompleted() {
});
}

/**
* Progress listener for Java-based S3Client with multipart enabled.
*/
public PublisherListener<Long> multipartClientProgressListener() {

return new PublisherListener<Long>() {
@Override
public void publisherSubscribe(Subscriber<? super Long> subscriber) {
resetBytesTransferred();
}

@Override
public void subscriberOnNext(Long contentLength) {
incrementBytesTransferred(contentLength);
}

@Override
public void subscriberOnError(Throwable t) {
transferFailed(t);
}

@Override
public void subscriberOnComplete() {
endOfStreamFuture.complete(null);
}
};
}

public PublisherListener<S3MetaRequestProgress> crtProgressListener() {

return new PublisherListener<S3MetaRequestProgress>() {
Expand Down
Loading