diff --git a/.changes/next-release/feature-AmazonS3-c6aced9.json b/.changes/next-release/feature-AmazonS3-c6aced9.json new file mode 100644 index 000000000000..b54448b35f21 --- /dev/null +++ b/.changes/next-release/feature-AmazonS3-c6aced9.json @@ -0,0 +1,6 @@ +{ + "category": "Amazon S3", + "contributor": "", + "type": "feature", + "description": "Enable TransferListener when uploading with TransferManager with Java-based S3Client with multipart enabled" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/PublisherListener.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/PublisherListener.java index bbee3e203447..da8a4861cfd0 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/PublisherListener.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/PublisherListener.java @@ -45,6 +45,10 @@ static SdkPublisher wrap(SdkPublisher delegate, PublisherListener l return new NotifyingPublisher<>(delegate, listener); } + static NoOpPublisherListener noOp() { + return NoOpPublisherListener.getInstance(); + } + @SdkInternalApi final class NotifyingPublisher implements SdkPublisher { private static final Logger log = Logger.loggerFor(NotifyingPublisher.class); @@ -72,4 +76,17 @@ static void invoke(Runnable runnable, String callbackName) { } } } + + @SdkInternalApi + final class NoOpPublisherListener implements PublisherListener { + + private static final NoOpPublisherListener NO_OP_PUBLISHER_LISTENER = new NoOpPublisherListener(); + + private NoOpPublisherListener() { + } + + static NoOpPublisherListener getInstance() { + return NO_OP_PUBLISHER_LISTENER; + } + } } diff --git a/pom.xml b/pom.xml index 0855e447ef13..6e0e3bd00284 100644 --- a/pom.xml +++ b/pom.xml @@ -659,7 +659,8 @@ iam-policy-builder - s3 + + s3-control sqs rds diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3IntegrationTestBase.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3IntegrationTestBase.java index eb45d1f7370d..26f963177190 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3IntegrationTestBase.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3IntegrationTestBase.java @@ -68,8 +68,9 @@ public static void setUpForAllIntegTests() throws Exception { Log.initLoggingToStdout(Log.LogLevel.Warn); System.setProperty("aws.crt.debugnative", "true"); s3 = s3ClientBuilder().build(); - // TODO - enable multipart once TransferListener fixed for MultipartClient - s3Async = s3AsyncClientBuilder().build(); + s3Async = s3AsyncClientBuilder() + .multipartEnabled(true) + .build(); s3CrtAsync = S3CrtAsyncClient.builder() .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) .region(DEFAULT_REGION) diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java index 2966a2bff8a1..4598e388af39 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java @@ -23,9 +23,9 @@ import java.nio.file.Files; import java.util.HashMap; import java.util.Map; -import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.stream.Stream; +import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.params.ParameterizedTest; @@ -97,22 +97,22 @@ void upload_file_SentCorrectly(S3TransferManager transferManager) throws IOExcep assertThat(obj.response().responseMetadata().requestId()).isNotNull(); assertThat(obj.response().metadata()).containsEntry("foobar", "FOO BAR"); assertThat(fileUpload.progress().snapshot().sdkResponse()).isPresent(); - assertListenerForSuccessfulTransferComplete(transferListener); + assertListenerForSuccessfulTransferComplete(transferListener); } private static void assertListenerForSuccessfulTransferComplete(CaptureTransferListener transferListener) { assertThat(transferListener.isTransferInitiated()).isTrue(); assertThat(transferListener.isTransferComplete()).isTrue(); assertThat(transferListener.getRatioTransferredList()).isNotEmpty(); - assertThat(transferListener.getRatioTransferredList().contains(0.0)); - assertThat(transferListener.getRatioTransferredList().contains(100.0)); + assertThat(transferListener.getRatioTransferredList()).contains(0.0); + assertThat(transferListener.getRatioTransferredList()).contains(1.0); assertThat(transferListener.getExceptionCaught()).isNull(); } @ParameterizedTest @MethodSource("transferManagers") void upload_asyncRequestBodyFromString_SentCorrectly(S3TransferManager transferManager) throws IOException { - String content = UUID.randomUUID().toString(); + String content = RandomStringUtils.randomAscii(OBJ_SIZE); CaptureTransferListener transferListener = new CaptureTransferListener(); Upload upload = diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadPauseResumeIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadPauseResumeIntegrationTest.java index e872080f2e32..0e995048e1ae 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadPauseResumeIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadPauseResumeIntegrationTest.java @@ -35,7 +35,6 @@ import software.amazon.awssdk.core.waiters.AsyncWaiter; import software.amazon.awssdk.core.waiters.Waiter; import software.amazon.awssdk.core.waiters.WaiterAcceptor; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse; import software.amazon.awssdk.services.s3.model.ListPartsResponse; import software.amazon.awssdk.services.s3.model.NoSuchUploadException; @@ -57,19 +56,12 @@ public class S3TransferManagerUploadPauseResumeIntegrationTest extends S3Integra private static File smallFile; private static ScheduledExecutorService executorService; - // TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient - protected static S3TransferManager tmJavaMpu; - @BeforeAll public static void setup() throws Exception { createBucket(BUCKET); largeFile = new RandomTempFile(LARGE_OBJ_SIZE); smallFile = new RandomTempFile(SMALL_OBJ_SIZE); executorService = Executors.newScheduledThreadPool(3); - - // TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient - S3AsyncClient s3AsyncMpu = s3AsyncClientBuilder().multipartEnabled(true).build(); - tmJavaMpu = S3TransferManager.builder().s3Client(s3AsyncMpu).build(); } @AfterAll @@ -82,10 +74,10 @@ public static void cleanup() { private static Stream transferManagers() { return Stream.of( - Arguments.of(tmJavaMpu, tmJavaMpu), + Arguments.of(tmJava, tmJava), Arguments.of(tmCrt, tmCrt), - Arguments.of(tmCrt, tmJavaMpu), - Arguments.of(tmJavaMpu, tmCrt) + Arguments.of(tmCrt, tmJava), + Arguments.of(tmJava, tmCrt) ); } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java index ba10ac39e79e..9f4412a804ac 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java @@ -15,8 +15,9 @@ package software.amazon.awssdk.transfer.s3.internal; -import static software.amazon.awssdk.services.s3.multipart.S3PauseResumeExecutionAttribute.PAUSE_OBSERVABLE; -import static software.amazon.awssdk.services.s3.multipart.S3PauseResumeExecutionAttribute.RESUME_TOKEN; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.PAUSE_OBSERVABLE; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.RESUME_TOKEN; import static software.amazon.awssdk.transfer.s3.SizeConstant.MB; import static software.amazon.awssdk.transfer.s3.internal.utils.ResumableRequestConverter.toDownloadFileRequestAndTransformer; @@ -136,11 +137,18 @@ public Upload upload(UploadRequest uploadRequest) { requestBody = progressUpdater.wrapRequestBody(requestBody); progressUpdater.registerCompletion(returnFuture); + PutObjectRequest putObjectRequest = uploadRequest.putObjectRequest(); + if (isS3ClientMultipartEnabled()) { + Consumer attachProgressListener = + b -> b.putExecutionAttribute(JAVA_PROGRESS_LISTENER, progressUpdater.multipartClientProgressListener()); + putObjectRequest = attachSdkAttribute(uploadRequest.putObjectRequest(), attachProgressListener); + } + try { assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload"); CompletableFuture future = - s3AsyncClient.putObject(uploadRequest.putObjectRequest(), requestBody); + s3AsyncClient.putObject(putObjectRequest, requestBody); // Forward upload cancellation to future CompletableFutureUtils.forwardExceptionTo(returnFuture, future); @@ -166,24 +174,26 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) { .chunkSizeInBytes(DEFAULT_FILE_UPLOAD_CHUNK_SIZE) .build(); + CompletableFuture returnFuture = new CompletableFuture<>(); + + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, + requestBody.contentLength().orElse(null)); + progressUpdater.transferInitiated(); + requestBody = progressUpdater.wrapRequestBody(requestBody); + progressUpdater.registerCompletion(returnFuture); + PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest(); PauseObservable pauseObservable; if (isS3ClientMultipartEnabled()) { pauseObservable = new PauseObservable(); - Consumer attachPauseObservable = - b -> b.putExecutionAttribute(PAUSE_OBSERVABLE, pauseObservable); - putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachPauseObservable); + Consumer attachObservableAndListener = + b -> b.putExecutionAttribute(PAUSE_OBSERVABLE, pauseObservable) + .putExecutionAttribute(JAVA_PROGRESS_LISTENER, progressUpdater.multipartClientProgressListener()); + putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservableAndListener); } else { pauseObservable = null; } - CompletableFuture returnFuture = new CompletableFuture<>(); - - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, - requestBody.contentLength().orElse(null)); - progressUpdater.transferInitiated(); - requestBody = progressUpdater.wrapRequestBody(requestBody); - progressUpdater.registerCompletion(returnFuture); try { assertNotUnsupportedArn(putObjectRequest.bucket(), "upload"); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java index 0c0c9f0d65d8..67970031c9aa 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java @@ -111,6 +111,34 @@ private void endOfStreamFutureCompleted() { }); } + /** + * Progress listener for Java-based S3Client with multipart enabled. + */ + public PublisherListener multipartClientProgressListener() { + + return new PublisherListener() { + @Override + public void publisherSubscribe(Subscriber subscriber) { + resetBytesTransferred(); + } + + @Override + public void subscriberOnNext(Long contentLength) { + incrementBytesTransferred(contentLength); + } + + @Override + public void subscriberOnError(Throwable t) { + transferFailed(t); + } + + @Override + public void subscriberOnComplete() { + endOfStreamFuture.complete(null); + } + }; + } + public PublisherListener crtProgressListener() { return new PublisherListener() { diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3JavaMultipartTransferProgressListenerTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3JavaMultipartTransferProgressListenerTest.java new file mode 100644 index 000000000000..22e57473dbef --- /dev/null +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3JavaMultipartTransferProgressListenerTest.java @@ -0,0 +1,212 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.any; +import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionException; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.transfer.s3.CaptureTransferListener; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.FileUpload; +import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; + +@WireMockTest +public class S3JavaMultipartTransferProgressListenerTest { + + public static final String ERROR_CODE = "NoSuchBucket"; + public static final String ERROR_MESSAGE = "We encountered an internal error. Please try again."; + public static final String ERROR_BODY = "\n" + + "\n" + + " " + ERROR_CODE + "\n" + + " " + ERROR_MESSAGE + "\n" + + ""; + private static final String EXAMPLE_BUCKET = "Example-Bucket"; + private static final String TEST_KEY = "16mib_file.dat"; + private static final int OBJ_SIZE = 16 * 1024 * 1024; + private static RandomTempFile testFile; + private static URI testEndpoint; + + @BeforeAll + public static void init(WireMockRuntimeInfo wm) throws IOException { + testEndpoint = URI.create(wm.getHttpBaseUrl()); + testFile = new RandomTempFile(TEST_KEY, OBJ_SIZE); + } + + private static S3AsyncClient s3AsyncClient(boolean multipartEnabled) { + return S3AsyncClient.builder() + .multipartEnabled(multipartEnabled) + .region(Region.US_EAST_1) + .endpointOverride(testEndpoint) + .credentialsProvider( + StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret"))) + .build(); + } + + private static void assertMockOnFailure(TransferListener transferListenerMock) { + Mockito.verify(transferListenerMock, times(1)).transferFailed(ArgumentMatchers.any()); + Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any()); + Mockito.verify(transferListenerMock, times(0)).transferComplete(ArgumentMatchers.any()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void listeners_reports_ErrorsWithValidPayload(boolean multipartEnabled) throws InterruptedException { + S3AsyncClient s3Async = s3AsyncClient(multipartEnabled); + + TransferListener transferListenerMock = mock(TransferListener.class); + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(404).withBody(ERROR_BODY))); + S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class), + mock(TransferManagerConfiguration.class), + mock(DownloadDirectoryHelper.class)); + CaptureTransferListener transferListener = new CaptureTransferListener(); + + FileUpload fileUpload = + tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key(TEST_KEY)) + .source(testFile) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .addTransferListener(transferListenerMock) + .build()); + + assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> fileUpload.completionFuture().join()); + Thread.sleep(500); + assertThat(transferListener.getExceptionCaught()).isInstanceOf(NoSuchBucketException.class); + assertThat(transferListener.isTransferComplete()).isFalse(); + assertThat(transferListener.isTransferInitiated()).isTrue(); + + assertMockOnFailure(transferListenerMock); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void listeners_reports_ErrorsWithValidInValidPayload(boolean multipartEnabled) throws InterruptedException { + S3AsyncClient s3Async = s3AsyncClient(multipartEnabled); + + TransferListener transferListenerMock = mock(TransferListener.class); + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(404).withBody("?"))); + S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class), + mock(TransferManagerConfiguration.class), + mock(DownloadDirectoryHelper.class)); + CaptureTransferListener transferListener = new CaptureTransferListener(); + + FileUpload fileUpload = + tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key(TEST_KEY)) + .source(testFile) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .addTransferListener(transferListenerMock) + .build()); + + assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> fileUpload.completionFuture().join()); + Thread.sleep(500); + + assertThat(transferListener.getExceptionCaught()).isInstanceOf(S3Exception.class); + assertThat(transferListener.isTransferComplete()).isFalse(); + assertThat(transferListener.isTransferInitiated()).isTrue(); + assertMockOnFailure(transferListenerMock); + + } + + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void listeners_reports_ErrorsWhenCancelled(boolean multipartEnabled) throws InterruptedException { + S3AsyncClient s3Async = s3AsyncClient(multipartEnabled); + + TransferListener transferListenerMock = mock(TransferListener.class); + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody("{}"))); + S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class), + mock(TransferManagerConfiguration.class), + mock(DownloadDirectoryHelper.class)); + CaptureTransferListener transferListener = new CaptureTransferListener(); + + tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key(TEST_KEY)) + .source(testFile) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .addTransferListener(transferListenerMock) + .build()).completionFuture().cancel(true); + + Thread.sleep(500); + + assertThat(transferListener.getExceptionCaught()).isInstanceOf(CancellationException.class); + assertThat(transferListener.isTransferComplete()).isFalse(); + assertThat(transferListener.isTransferInitiated()).isTrue(); + assertMockOnFailure(transferListenerMock); + + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void listeners_reports_ProgressWhenSuccess(boolean multipartEnabled) throws InterruptedException { + S3AsyncClient s3Async = s3AsyncClient(multipartEnabled); + + TransferListener transferListenerMock = mock(TransferListener.class); + String createMpuUrl = "/" + EXAMPLE_BUCKET + "/" + TEST_KEY + "?uploads"; + String createMpuResponse = "1234"; + stubFor(post(urlEqualTo(createMpuUrl)).willReturn(aResponse().withStatus(200).withBody(createMpuResponse))); + stubFor(any(anyUrl()).atPriority(6).willReturn(aResponse().withStatus(200).withBody(""))); + S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class), + mock(TransferManagerConfiguration.class), + mock(DownloadDirectoryHelper.class)); + CaptureTransferListener transferListener = new CaptureTransferListener(); + + tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key(TEST_KEY)) + .source(testFile) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .addTransferListener(transferListenerMock) + .build()).completionFuture().join(); + + Thread.sleep(500); + assertThat(transferListener.getExceptionCaught()).isNull(); + assertThat(transferListener.isTransferComplete()).isTrue(); + assertThat(transferListener.isTransferInitiated()).isTrue(); + Mockito.verify(transferListenerMock, times(0)).transferFailed(ArgumentMatchers.any()); + Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any()); + Mockito.verify(transferListenerMock, times(1)).transferComplete(ArgumentMatchers.any()); + + int numTimesBytesTransferred = multipartEnabled ? 2 : 1; + Mockito.verify(transferListenerMock, times(numTimesBytesTransferred)).bytesTransferred(ArgumentMatchers.any()); + } +} diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerUploadPauseAndResumeTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerUploadPauseAndResumeTest.java index a79a8b4a5083..dccc03e44dfa 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerUploadPauseAndResumeTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerUploadPauseAndResumeTest.java @@ -22,7 +22,7 @@ import static org.mockito.Mockito.when; import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN; -import static software.amazon.awssdk.services.s3.multipart.S3PauseResumeExecutionAttribute.RESUME_TOKEN; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.RESUME_TOKEN; import static software.amazon.awssdk.transfer.s3.SizeConstant.MB; import java.io.File; diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/CopyObjectHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/CopyObjectHelper.java index afb1ca0e4e8c..fbfc2850e7c1 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/CopyObjectHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/CopyObjectHelper.java @@ -146,8 +146,7 @@ private void doCopyInParts(CopyObjectRequest copyObjectRequest, optimalPartSize); CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(new CompletableFuture[0])) .thenCompose(ignore -> completeMultipartUpload(copyObjectRequest, uploadId, completedParts)) - .handle(genericMultipartHelper.handleExceptionOrResponse(copyObjectRequest, returnFuture, - uploadId)) + .handle(genericMultipartHelper.handleExceptionOrResponse(copyObjectRequest, returnFuture, uploadId)) .exceptionally(throwable -> { genericMultipartHelper.handleException(returnFuture, () -> "Unexpected exception occurred", throwable); diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/GenericMultipartHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/GenericMultipartHelper.java index 1906408a59b4..bc4d1eda7373 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/GenericMultipartHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/GenericMultipartHelper.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.services.s3.internal.multipart; import static software.amazon.awssdk.services.s3.internal.multipart.SdkPojoConversionUtils.toCompleteMultipartUploadRequest; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -23,6 +24,7 @@ import java.util.function.Function; import java.util.function.Supplier; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.async.listener.PublisherListener; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -90,10 +92,11 @@ public CompletableFuture completeMultipartUploa return s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest); } - public BiFunction handleExceptionOrResponse( - RequestT request, - CompletableFuture returnFuture, - String uploadId) { + public BiFunction handleExceptionOrResponse(RequestT request, + CompletableFuture returnFuture, String uploadId) { + PublisherListener progressListener = request.overrideConfiguration() + .map(c -> c.executionAttributes().getAttribute(JAVA_PROGRESS_LISTENER)) + .orElseGet(PublisherListener::noOp); return (completeMultipartUploadResponse, throwable) -> { if (throwable != null) { @@ -103,6 +106,7 @@ public BiFunction handleExcept } else { returnFuture.complete(responseConverter.apply( completeMultipartUploadResponse)); + progressListener.subscriberOnComplete(); } return null; diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java index 59be53e13642..4fe64a7bca05 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.services.s3.internal.multipart; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER; + import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -27,6 +29,7 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.listener.PublisherListener; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.PutObjectRequest; @@ -58,6 +61,7 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber< private final CompletableFuture returnFuture; private final Map completedParts; private final Map existingParts; + private final PublisherListener progressListener; private Subscription subscription; private volatile boolean isDone; private volatile boolean isPaused; @@ -75,6 +79,9 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber< this.numExistingParts = NumericUtils.saturatedCast(mpuRequestContext.numPartsCompleted()); this.completedParts = new ConcurrentHashMap<>(); this.multipartUploadHelper = multipartUploadHelper; + this.progressListener = putObjectRequest.overrideConfiguration().map(c -> c.executionAttributes() + .getAttribute(JAVA_PROGRESS_LISTENER)) + .orElseGet(PublisherListener::noOp); } private int determinePartCount(long contentLength, long partSize) { @@ -138,6 +145,7 @@ public void onNext(AsyncRequestBody asyncRequestBody) { partNumber.getAndIncrement(); asyncRequestBody.subscribe(new CancelledSubscriber<>()); subscription.request(1); + asyncRequestBody.contentLength().ifPresent(progressListener::subscriberOnNext); return; } @@ -149,7 +157,7 @@ public void onNext(AsyncRequestBody asyncRequestBody) { Consumer completedPartConsumer = completedPart -> completedParts.put(completedPart.partNumber(), completedPart); multipartUploadHelper.sendIndividualUploadPartRequest(uploadId, completedPartConsumer, futures, - Pair.of(uploadRequest, asyncRequestBody)) + Pair.of(uploadRequest, asyncRequestBody), progressListener) .whenComplete((r, t) -> { if (t != null) { if (shouldFailRequest()) { @@ -195,8 +203,7 @@ private void completeMultipartUploadIfFinished(int requestsInFlight) { } else { parts = existingParts.values().toArray(new CompletedPart[0]); } - completeMpuFuture = multipartUploadHelper.completeMultipartUpload(returnFuture, uploadId, parts, - putObjectRequest); + completeMpuFuture = multipartUploadHelper.completeMultipartUpload(returnFuture, uploadId, parts, putObjectRequest); } } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java index c5bb7fe286cc..71cc364915cc 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java @@ -19,10 +19,12 @@ import static software.amazon.awssdk.services.s3.internal.multipart.SdkPojoConversionUtils.toAbortMultipartUploadRequest; import java.util.Collection; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.listener.PublisherListener; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompletedPart; @@ -94,18 +96,22 @@ CompletableFuture completeMultipartUpload(Compl CompletableFuture sendIndividualUploadPartRequest(String uploadId, Consumer completedPartsConsumer, Collection> futures, - Pair requestPair) { + Pair requestPair, + PublisherListener progressListener) { UploadPartRequest uploadPartRequest = requestPair.left(); Integer partNumber = uploadPartRequest.partNumber(); + Optional contentLength = requestPair.right().contentLength(); log.debug(() -> "Sending uploadPartRequest: " + uploadPartRequest.partNumber() + " uploadId: " + uploadId + " " - + "contentLength " + requestPair.right().contentLength()); + + "contentLength " + contentLength); CompletableFuture uploadPartFuture = s3AsyncClient.uploadPart(uploadPartRequest, requestPair.right()); CompletableFuture convertFuture = - uploadPartFuture.thenApply(uploadPartResponse -> convertUploadPartResponse(completedPartsConsumer, partNumber, - uploadPartResponse)); + uploadPartFuture.thenApply(uploadPartResponse -> { + contentLength.ifPresent(progressListener::subscriberOnNext); + return convertUploadPartResponse(completedPartsConsumer, partNumber, uploadPartResponse); + }); futures.add(convertFuture); CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartFuture); return convertFuture; diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java index 9cb1aa62a100..05051ab25cac 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java @@ -15,8 +15,8 @@ package software.amazon.awssdk.services.s3.internal.multipart; -import static software.amazon.awssdk.services.s3.multipart.S3PauseResumeExecutionAttribute.PAUSE_OBSERVABLE; -import static software.amazon.awssdk.services.s3.multipart.S3PauseResumeExecutionAttribute.RESUME_TOKEN; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.PAUSE_OBSERVABLE; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.RESUME_TOKEN; import java.util.Map; import java.util.concurrent.CompletableFuture; diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java index 0c8c3c70b516..18e3a9291d58 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java @@ -16,6 +16,8 @@ package software.amazon.awssdk.services.s3.internal.multipart; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER; + import java.util.Collection; import java.util.Comparator; import java.util.Queue; @@ -28,6 +30,7 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.async.listener.PublisherListener; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; @@ -116,6 +119,7 @@ private class UnknownContentLengthAsyncRequestBodySubscriber implements Subscrib private final long maximumChunkSizeInByte; private final PutObjectRequest putObjectRequest; private final CompletableFuture returnFuture; + private final PublisherListener progressListener; private Subscription subscription; private AsyncRequestBody firstRequestBody; @@ -128,6 +132,9 @@ private class UnknownContentLengthAsyncRequestBodySubscriber implements Subscrib this.maximumChunkSizeInByte = maximumChunkSizeInByte; this.putObjectRequest = putObjectRequest; this.returnFuture = returnFuture; + this.progressListener = putObjectRequest.overrideConfiguration() + .map(c -> c.executionAttributes().getAttribute(JAVA_PROGRESS_LISTENER)) + .orElseGet(PublisherListener::noOp); } @Override @@ -193,7 +200,7 @@ public void onNext(AsyncRequestBody asyncRequestBody) { private void sendUploadPartRequest(String uploadId, AsyncRequestBody asyncRequestBody) { multipartUploadHelper.sendIndividualUploadPartRequest(uploadId, completedParts::add, futures, - uploadPart(asyncRequestBody)) + uploadPart(asyncRequestBody), progressListener) .whenComplete((r, t) -> { if (t != null) { if (failureActionInitiated.compareAndSet(false, true)) { diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/S3PauseResumeExecutionAttribute.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/S3MultipartExecutionAttribute.java similarity index 78% rename from services/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/S3PauseResumeExecutionAttribute.java rename to services/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/S3MultipartExecutionAttribute.java index 3aae35725557..7bfacd9d387e 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/S3PauseResumeExecutionAttribute.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/S3MultipartExecutionAttribute.java @@ -16,11 +16,14 @@ package software.amazon.awssdk.services.s3.multipart; import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.core.async.listener.PublisherListener; import software.amazon.awssdk.core.interceptor.ExecutionAttribute; import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute; @SdkProtectedApi -public final class S3PauseResumeExecutionAttribute extends SdkExecutionAttribute { +public final class S3MultipartExecutionAttribute extends SdkExecutionAttribute { public static final ExecutionAttribute RESUME_TOKEN = new ExecutionAttribute<>("ResumeToken"); public static final ExecutionAttribute PAUSE_OBSERVABLE = new ExecutionAttribute<>("PauseObservable"); + public static final ExecutionAttribute> JAVA_PROGRESS_LISTENER = + new ExecutionAttribute<>("JavaProgressListener"); } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriberTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriberTest.java index 0ffbab391b3c..180754b66388 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriberTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriberTest.java @@ -103,7 +103,8 @@ private S3ResumeToken configureSubscriberAndPause(int numExistingParts, KnownContentLengthAsyncRequestBodySubscriber subscriber = subscriber(putObjectRequest, asyncRequestBody, existingParts); when(multipartUploadHelper.completeMultipartUpload(any(CompletableFuture.class), any(String.class), - any(CompletedPart[].class), any(PutObjectRequest.class))).thenReturn(completeMpuFuture); + any(CompletedPart[].class), any(PutObjectRequest.class))) + .thenReturn(completeMpuFuture); subscriber.onComplete(); return subscriber.pause(); } @@ -139,4 +140,4 @@ private void verifyResumeToken(S3ResumeToken s3ResumeToken, int numExistingParts assertThat(s3ResumeToken.totalNumParts()).isEqualTo(TOTAL_NUM_PARTS); assertThat(s3ResumeToken.numPartsCompleted()).isEqualTo(numExistingParts); } -} +} \ No newline at end of file diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java index d17ab358c527..f96ddc4ddc48 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java @@ -28,8 +28,8 @@ import static software.amazon.awssdk.services.s3.internal.multipart.MpuTestUtils.stubSuccessfulCompleteMultipartCall; import static software.amazon.awssdk.services.s3.internal.multipart.MpuTestUtils.stubSuccessfulCreateMultipartCall; import static software.amazon.awssdk.services.s3.internal.multipart.MpuTestUtils.stubSuccessfulUploadPartCalls; -import static software.amazon.awssdk.services.s3.multipart.S3PauseResumeExecutionAttribute.PAUSE_OBSERVABLE; -import static software.amazon.awssdk.services.s3.multipart.S3PauseResumeExecutionAttribute.RESUME_TOKEN; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.PAUSE_OBSERVABLE; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.RESUME_TOKEN; import java.io.IOException; import java.nio.ByteBuffer;