Skip to content

Commit 08ec595

Browse files
authored
🐛S3: when copying files there is no callback if the multipart threshold is not reached (#6305)
1 parent df5a1a8 commit 08ec595

File tree

4 files changed

+25
-9
lines changed

4 files changed

+25
-9
lines changed

packages/aws-library/src/aws_library/s3/_client.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from types_aiobotocore_s3.literals import BucketLocationConstraintType
2424
from types_aiobotocore_s3.type_defs import ObjectIdentifierTypeDef
2525

26-
from ._constants import MULTIPART_UPLOADS_MIN_TOTAL_SIZE, PRESIGNED_LINK_MAX_SIZE
26+
from ._constants import MULTIPART_COPY_THRESHOLD, MULTIPART_UPLOADS_MIN_TOTAL_SIZE
2727
from ._error_handler import s3_exception_handler, s3_exception_handler_async_gen
2828
from ._errors import S3DestinationNotEmptyError, S3KeyNotFoundError
2929
from ._models import (
@@ -407,6 +407,7 @@ async def copy_object(
407407
src_object_key: S3ObjectKey,
408408
dst_object_key: S3ObjectKey,
409409
bytes_transfered_cb: CopiedBytesTransferredCallback | None,
410+
object_metadata: S3MetaData | None = None,
410411
) -> None:
411412
"""copy a file in S3 using aioboto3 transfer manager (e.g. works >5Gb and creates multiple threads)"""
412413
copy_options: dict[str, Any] = {
@@ -415,7 +416,7 @@ async def copy_object(
415416
"Key": dst_object_key,
416417
"Config": TransferConfig(
417418
max_concurrency=self.transfer_max_concurrency,
418-
multipart_threshold=PRESIGNED_LINK_MAX_SIZE,
419+
multipart_threshold=MULTIPART_COPY_THRESHOLD,
419420
),
420421
}
421422
if bytes_transfered_cb:
@@ -424,7 +425,16 @@ async def copy_object(
424425
bytes_transfered_cb, file_name=f"{dst_object_key}"
425426
)
426427
}
428+
# NOTE: boto3 copy function uses copy_object until 'multipart_threshold' is reached then switches to multipart copy
429+
# copy_object does not provide any callbacks so we can't track progress so we need to ensure at least the completion
430+
# of the object is tracked
427431
await self._client.copy(**copy_options)
432+
if bytes_transfered_cb:
433+
if object_metadata is None:
434+
object_metadata = await self.get_object_metadata(
435+
bucket=bucket, object_key=dst_object_key
436+
)
437+
bytes_transfered_cb(object_metadata.size, file_name=f"{dst_object_key}")
428438

429439
@s3_exception_handler(_logger)
430440
async def copy_objects_recursively(
@@ -448,6 +458,7 @@ async def copy_objects_recursively(
448458
src_object_key=s3_object.object_key,
449459
dst_object_key=s3_object.object_key.replace(src_prefix, dst_prefix),
450460
bytes_transfered_cb=bytes_transfered_cb,
461+
object_metadata=s3_object,
451462
)
452463
async for s3_object in self._list_all_objects(
453464
bucket=bucket, prefix=src_prefix

packages/aws-library/src/aws_library/s3/_constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
# NOTE: AWS S3 upload limits https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
66
MULTIPART_UPLOADS_MIN_TOTAL_SIZE: Final[ByteSize] = parse_obj_as(ByteSize, "100MiB")
7-
7+
MULTIPART_COPY_THRESHOLD: Final[ByteSize] = parse_obj_as(ByteSize, "100MiB")
88

99
PRESIGNED_LINK_MAX_SIZE: Final[ByteSize] = parse_obj_as(ByteSize, "5GiB")
1010
S3_MAX_FILE_SIZE: Final[ByteSize] = parse_obj_as(ByteSize, "5TiB")

services/storage/src/simcore_service_storage/s3_utils.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@
99
TaskProgress,
1010
)
1111

12-
logger = logging.getLogger(__name__)
12+
_logger = logging.getLogger(__name__)
1313

1414

1515
def update_task_progress(
1616
task_progress: TaskProgress | None,
1717
message: ProgressMessage | None = None,
1818
progress: ProgressPercent | None = None,
1919
) -> None:
20-
logger.debug("%s [%s]", message or "", progress or "n/a")
20+
_logger.debug("%s [%s]", message or "", progress or "n/a")
2121
if task_progress:
2222
task_progress.update(message=message, percent=progress)
2323

@@ -53,7 +53,7 @@ def finalize_transfer(self) -> None:
5353
self._update()
5454

5555
def copy_transfer_cb(self, total_bytes_copied: int, *, file_name: str) -> None:
56-
logger.debug(
56+
_logger.debug(
5757
"Copied %s of %s",
5858
parse_obj_as(ByteSize, total_bytes_copied).human_readable(),
5959
file_name,
@@ -64,6 +64,11 @@ def copy_transfer_cb(self, total_bytes_copied: int, *, file_name: str) -> None:
6464
self._update()
6565

6666
def upload_transfer_cb(self, bytes_transferred: int, *, file_name: str) -> None:
67+
_logger.debug(
68+
"Uploaded %s of %s",
69+
parse_obj_as(ByteSize, bytes_transferred).human_readable(),
70+
file_name,
71+
)
6772
self._file_total_bytes_copied[file_name] += bytes_transferred
6873
self._total_bytes_copied = sum(self._file_total_bytes_copied.values())
6974
if self.total_bytes_to_transfer != 0:

services/storage/tests/unit/test_simcore_s3_dsm.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ def file_size() -> ByteSize:
2828

2929

3030
@pytest.fixture
31-
def mock_copy_transfer_cb() -> Callable[[int], None]:
32-
def copy_transfer_cb(copied_bytes: int) -> None:
31+
def mock_copy_transfer_cb() -> Callable[..., None]:
32+
def copy_transfer_cb(total_bytes_copied: int, *, file_name: str) -> None:
3333
...
3434

3535
return copy_transfer_cb
@@ -43,7 +43,7 @@ async def test__copy_path_s3_s3(
4343
upload_file: Callable[[ByteSize, str], Awaitable[tuple[Path, SimcoreS3FileID]]],
4444
file_size: ByteSize,
4545
user_id: UserID,
46-
mock_copy_transfer_cb: Callable[[int], None],
46+
mock_copy_transfer_cb: Callable[..., None],
4747
aiopg_engine: Engine,
4848
):
4949
def _get_dest_file_id(src: SimcoreS3FileID) -> SimcoreS3FileID:

0 commit comments

Comments
 (0)