From 546d35d5c740b334f25752ad3170d4eabb969575 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 25 Mar 2025 14:19:08 +0100 Subject: [PATCH 1/9] use internal max and min values --- .../api/rpc/_async_jobs.py | 2 +- .../modules/celery/models.py | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py index ba7c920f8aa..6c928b18a3a 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py @@ -123,7 +123,7 @@ async def result( exc_type = "" exc_msg = "" with log_catch(logger=_logger, reraise=False): - task_error = TaskError.model_validate_json(_result) + task_error = TaskError.model_validate(_result) exc_type = task_error.exc_type exc_msg = task_error.exc_msg raise JobError(job_id=job_id, exc_type=exc_type, exc_msg=exc_msg) diff --git a/services/storage/src/simcore_service_storage/modules/celery/models.py b/services/storage/src/simcore_service_storage/modules/celery/models.py index 6b72a6e0019..94f961a0e29 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/models.py +++ b/services/storage/src/simcore_service_storage/modules/celery/models.py @@ -1,5 +1,5 @@ from enum import StrEnum, auto -from typing import Any, Final, Self, TypeAlias +from typing import Any, Self, TypeAlias from uuid import UUID from models_library.progress_bar import ProgressReport @@ -9,9 +9,6 @@ TaskID: TypeAlias = str TaskUUID: TypeAlias = UUID -_MIN_PROGRESS: Final[float] = 0.0 -_MAX_PROGRESS: Final[float] = 100.0 - class TaskState(StrEnum): PENDING = auto() @@ -36,13 +33,15 @@ def is_done(self) -> bool: @model_validator(mode="after") def _check_consistency(self) -> Self: value = self.progress_report.actual_value + min_value = 0.0 + max_value = self.progress_report.total valid_states = { - TaskState.PENDING: value == _MIN_PROGRESS, - TaskState.RUNNING: _MIN_PROGRESS <= value <= _MAX_PROGRESS, - TaskState.SUCCESS: value == _MAX_PROGRESS, - TaskState.ABORTED: value == _MAX_PROGRESS, - TaskState.ERROR: value == _MAX_PROGRESS, + TaskState.PENDING: value == min_value, + TaskState.RUNNING: min_value <= value <= max_value, + TaskState.SUCCESS: value == max_value, + TaskState.ABORTED: value == max_value, + TaskState.ERROR: value == max_value, } if not valid_states.get(self.task_state, True): From 948090c7ecc13cd6bd00e1bfd0ff3ed5ace0156c Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 25 Mar 2025 14:39:12 +0100 Subject: [PATCH 2/9] further cleanup of ProgressReport --- .../simcore_service_storage/modules/celery/client.py | 10 +++++++--- .../simcore_service_storage/modules/celery/tasks.py | 3 ++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/services/storage/src/simcore_service_storage/modules/celery/client.py b/services/storage/src/simcore_service_storage/modules/celery/client.py index b6c2a090691..498c26827c9 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/client.py +++ b/services/storage/src/simcore_service_storage/modules/celery/client.py @@ -36,7 +36,7 @@ _CELERY_TASK_ID_KEY_ENCODING = "utf-8" _MIN_PROGRESS_VALUE = 0.0 -_MAX_PROGRESS_VALUE = 100.0 +_MAX_PROGRESS_VALUE = 1.0 def _build_context_prefix(task_context: TaskContext) -> list[str]: @@ -99,8 +99,12 @@ def _get_progress_report( TaskState.ERROR, TaskState.SUCCESS, ): - return ProgressReport(actual_value=_MAX_PROGRESS_VALUE) - return ProgressReport(actual_value=_MIN_PROGRESS_VALUE) + return ProgressReport( + actual_value=_MAX_PROGRESS_VALUE, total=_MAX_PROGRESS_VALUE + ) + return ProgressReport( + actual_value=_MIN_PROGRESS_VALUE, total=_MAX_PROGRESS_VALUE + ) def _get_state(self, task_context: TaskContext, task_uuid: TaskUUID) -> TaskState: task_id = _build_task_id(task_context, task_uuid) diff --git a/services/storage/src/simcore_service_storage/modules/celery/tasks.py b/services/storage/src/simcore_service_storage/modules/celery/tasks.py index 014151acd74..c66b3a1ab32 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/tasks.py +++ b/services/storage/src/simcore_service_storage/modules/celery/tasks.py @@ -13,6 +13,7 @@ def export_data(task: Task, files: list[StorageFileID]): _logger.info("Exporting files: %s", files) + assert len(files) > 0 for n, file in enumerate(files, start=1): with log_context( _logger, @@ -23,7 +24,7 @@ def export_data(task: Task, files: list[StorageFileID]): get_celery_worker(task.app).set_task_progress( task_name=task.name, task_id=task.request.id, - report=ProgressReport(actual_value=n / len(files) * 100), + report=ProgressReport(actual_value=n, total=len(files)), ) time.sleep(10) return "done" From 6864fe36768c7ed24d2fde4cda4092fc39441e02 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 25 Mar 2025 15:43:41 +0100 Subject: [PATCH 3/9] fix tests --- services/storage/tests/unit/test_data_export.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/storage/tests/unit/test_data_export.py b/services/storage/tests/unit/test_data_export.py index 05c0f99a176..af047edd3f5 100644 --- a/services/storage/tests/unit/test_data_export.py +++ b/services/storage/tests/unit/test_data_export.py @@ -452,7 +452,7 @@ async def test_get_data_export_status_error( "get_task_status_object": TaskStatus( task_uuid=TaskUUID(_faker.uuid4()), task_state=TaskState.SUCCESS, - progress_report=ProgressReport(actual_value=100), + progress_report=ProgressReport(actual_value=1, total=1), ), "get_task_result_object": "result", "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], @@ -487,7 +487,7 @@ async def test_get_data_export_result_success( "get_task_status_object": TaskStatus( task_uuid=TaskUUID(_faker.uuid4()), task_state=TaskState.RUNNING, - progress_report=ProgressReport(actual_value=50), + progress_report=ProgressReport(actual_value=0.5, total=1.0), ), "get_task_result_object": _faker.text(), "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], @@ -499,7 +499,7 @@ async def test_get_data_export_result_success( "get_task_status_object": TaskStatus( task_uuid=TaskUUID(_faker.uuid4()), task_state=TaskState.ABORTED, - progress_report=ProgressReport(actual_value=100), + progress_report=ProgressReport(actual_value=1.0, total=1.0), ), "get_task_result_object": _faker.text(), "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], @@ -511,7 +511,7 @@ async def test_get_data_export_result_success( "get_task_status_object": TaskStatus( task_uuid=TaskUUID(_faker.uuid4()), task_state=TaskState.ERROR, - progress_report=ProgressReport(actual_value=100), + progress_report=ProgressReport(actual_value=1.0, total=1.0), ), "get_task_result_object": _faker.text(), "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], From 90fd83318316cca2abf9701c58ab5bfd82535b44 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 26 Mar 2025 06:52:36 +0100 Subject: [PATCH 4/9] fix celery tests --- .../storage/src/simcore_service_storage/modules/celery/tasks.py | 2 +- services/storage/tests/unit/modules/celery/test_celery.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/storage/src/simcore_service_storage/modules/celery/tasks.py b/services/storage/src/simcore_service_storage/modules/celery/tasks.py index c66b3a1ab32..881285e5a20 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/tasks.py +++ b/services/storage/src/simcore_service_storage/modules/celery/tasks.py @@ -24,7 +24,7 @@ def export_data(task: Task, files: list[StorageFileID]): get_celery_worker(task.app).set_task_progress( task_name=task.name, task_id=task.request.id, - report=ProgressReport(actual_value=n, total=len(files)), + report=ProgressReport(actual_value=n / len(files), total=1), ) time.sleep(10) return "done" diff --git a/services/storage/tests/unit/modules/celery/test_celery.py b/services/storage/tests/unit/modules/celery/test_celery.py index 77dd5cf3e2a..c6c4e53135f 100644 --- a/services/storage/tests/unit/modules/celery/test_celery.py +++ b/services/storage/tests/unit/modules/celery/test_celery.py @@ -41,7 +41,7 @@ def sleep_for(seconds: float) -> None: worker.set_task_progress( task_name=task_name, task_id=task_id, - report=ProgressReport(actual_value=n / len(files) * 10), + report=ProgressReport(actual_value=n / len(files), total=1.0), ) await asyncio.get_event_loop().run_in_executor(None, sleep_for, 1) From 06fde28a9f2ecce4fd198ab3e9dc63e484c2b05d Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 26 Mar 2025 06:59:32 +0100 Subject: [PATCH 5/9] add log_catch to every rpc endpoint in storage --- .../api/rpc/_async_jobs.py | 136 ++++++++++-------- .../api/rpc/_data_export.py | 52 ++++--- .../simcore_service_storage/api/rpc/_paths.py | 21 +-- 3 files changed, 114 insertions(+), 95 deletions(-) diff --git a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py index 6c928b18a3a..67601fabb14 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py @@ -47,16 +47,19 @@ async def _assert_job_exists( async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData): assert app # nosec assert job_id_data # nosec - try: - await _assert_job_exists( - job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app) - ) - await get_celery_client(app).abort_task( - task_context=job_id_data.model_dump(), - task_uuid=job_id, - ) - except CeleryError as exc: - raise JobSchedulerError(exc=f"{exc}") from exc + with log_catch(logger=_logger): + try: + await _assert_job_exists( + job_id=job_id, + job_id_data=job_id_data, + celery_client=get_celery_client(app), + ) + await get_celery_client(app).abort_task( + task_context=job_id_data.model_dump(), + task_uuid=job_id, + ) + except CeleryError as exc: + raise JobSchedulerError(exc=f"{exc}") from exc @router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError)) @@ -66,22 +69,25 @@ async def status( assert app # nosec assert job_id_data # nosec - try: - await _assert_job_exists( - job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app) + with log_catch(logger=_logger): + try: + await _assert_job_exists( + job_id=job_id, + job_id_data=job_id_data, + celery_client=get_celery_client(app), + ) + task_status = await get_celery_client(app).get_task_status( + task_context=job_id_data.model_dump(), + task_uuid=job_id, + ) + except CeleryError as exc: + raise JobSchedulerError(exc=f"{exc}") from exc + + return AsyncJobStatus( + job_id=job_id, + progress=task_status.progress_report, + done=task_status.is_done, ) - task_status = await get_celery_client(app).get_task_status( - task_context=job_id_data.model_dump(), - task_uuid=job_id, - ) - except CeleryError as exc: - raise JobSchedulerError(exc=f"{exc}") from exc - - return AsyncJobStatus( - job_id=job_id, - progress=task_status.progress_report, - done=task_status.is_done, - ) @router.expose( @@ -100,35 +106,38 @@ async def result( assert job_id # nosec assert job_id_data # nosec - try: - await _assert_job_exists( - job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app) - ) - _status = await get_celery_client(app).get_task_status( - task_context=job_id_data.model_dump(), - task_uuid=job_id, - ) - if not _status.is_done: - raise JobNotDoneError(job_id=job_id) - _result = await get_celery_client(app).get_task_result( - task_context=job_id_data.model_dump(), - task_uuid=job_id, - ) - except CeleryError as exc: - raise JobSchedulerError(exc=f"{exc}") from exc - - if _status.task_state == TaskState.ABORTED: - raise JobAbortedError(job_id=job_id) - if _status.task_state == TaskState.ERROR: - exc_type = "" - exc_msg = "" - with log_catch(logger=_logger, reraise=False): - task_error = TaskError.model_validate(_result) - exc_type = task_error.exc_type - exc_msg = task_error.exc_msg - raise JobError(job_id=job_id, exc_type=exc_type, exc_msg=exc_msg) - - return AsyncJobResult(result=_result) + with log_catch(logger=_logger): + try: + await _assert_job_exists( + job_id=job_id, + job_id_data=job_id_data, + celery_client=get_celery_client(app), + ) + _status = await get_celery_client(app).get_task_status( + task_context=job_id_data.model_dump(), + task_uuid=job_id, + ) + if not _status.is_done: + raise JobNotDoneError(job_id=job_id) + _result = await get_celery_client(app).get_task_result( + task_context=job_id_data.model_dump(), + task_uuid=job_id, + ) + except CeleryError as exc: + raise JobSchedulerError(exc=f"{exc}") from exc + + if _status.task_state == TaskState.ABORTED: + raise JobAbortedError(job_id=job_id) + if _status.task_state == TaskState.ERROR: + exc_type = "" + exc_msg = "" + with log_catch(logger=_logger, reraise=False): + task_error = TaskError.model_validate(_result) + exc_type = task_error.exc_type + exc_msg = task_error.exc_msg + raise JobError(job_id=job_id, exc_type=exc_type, exc_msg=exc_msg) + + return AsyncJobResult(result=_result) @router.expose(reraise_if_error_type=(JobSchedulerError,)) @@ -136,12 +145,13 @@ async def list_jobs( app: FastAPI, filter_: str, job_id_data: AsyncJobNameData ) -> list[AsyncJobGet]: assert app # nosec - - try: - task_uuids = await get_celery_client(app).get_task_uuids( - task_context=job_id_data.model_dump(), - ) - except CeleryError as exc: - raise JobSchedulerError(exc=f"{exc}") from exc - - return [AsyncJobGet(job_id=task_uuid) for task_uuid in task_uuids] + assert filter_ # nosec + with log_catch(logger=_logger): + try: + task_uuids = await get_celery_client(app).get_task_uuids( + task_context=job_id_data.model_dump(), + ) + except CeleryError as exc: + raise JobSchedulerError(exc=f"{exc}") from exc + + return [AsyncJobGet(job_id=task_uuid) for task_uuid in task_uuids] diff --git a/services/storage/src/simcore_service_storage/api/rpc/_data_export.py b/services/storage/src/simcore_service_storage/api/rpc/_data_export.py index 7fe6612e5e3..2a2015e48e1 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_data_export.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_data_export.py @@ -1,3 +1,5 @@ +import logging + from celery.exceptions import CeleryError # type: ignore[import-untyped] from fastapi import FastAPI from models_library.api_schemas_rpc_async_jobs.async_jobs import ( @@ -10,6 +12,7 @@ DataExportTaskStartInput, InvalidFileIdentifierError, ) +from servicelib.logging_utils import log_catch from servicelib.rabbitmq import RPCRouter from ...datcore_dsm import DatCoreDataManager @@ -19,6 +22,8 @@ from ...modules.datcore_adapter.datcore_adapter_exceptions import DatcoreAdapterError from ...simcore_s3_dsm import SimcoreS3DataManager +_logger = logging.getLogger(__name__) + router = RPCRouter() @@ -36,30 +41,31 @@ async def start_data_export( ) -> AsyncJobGet: assert app # nosec - dsm = get_dsm_provider(app).get(data_export_start.location_id) + with log_catch(_logger): + dsm = get_dsm_provider(app).get(data_export_start.location_id) - try: - for _id in data_export_start.file_and_folder_ids: - if isinstance(dsm, DatCoreDataManager): - _ = await dsm.get_file(user_id=job_id_data.user_id, file_id=_id) - elif isinstance(dsm, SimcoreS3DataManager): - await dsm.can_read_file(user_id=job_id_data.user_id, file_id=_id) + try: + for _id in data_export_start.file_and_folder_ids: + if isinstance(dsm, DatCoreDataManager): + _ = await dsm.get_file(user_id=job_id_data.user_id, file_id=_id) + elif isinstance(dsm, SimcoreS3DataManager): + await dsm.can_read_file(user_id=job_id_data.user_id, file_id=_id) - except (FileAccessRightError, DatcoreAdapterError) as err: - raise AccessRightError( - user_id=job_id_data.user_id, - file_id=_id, - location_id=data_export_start.location_id, - ) from err + except (FileAccessRightError, DatcoreAdapterError) as err: + raise AccessRightError( + user_id=job_id_data.user_id, + file_id=_id, + location_id=data_export_start.location_id, + ) from err - try: - task_uuid = await get_celery_client(app).send_task( - "export_data", - task_context=job_id_data.model_dump(), - files=data_export_start.file_and_folder_ids, # ANE: adapt here your signature + try: + task_uuid = await get_celery_client(app).send_task( + "export_data", + task_context=job_id_data.model_dump(), + files=data_export_start.file_and_folder_ids, # ANE: adapt here your signature + ) + except CeleryError as exc: + raise JobSchedulerError(exc=f"{exc}") from exc + return AsyncJobGet( + job_id=task_uuid, ) - except CeleryError as exc: - raise JobSchedulerError(exc=f"{exc}") from exc - return AsyncJobGet( - job_id=task_uuid, - ) diff --git a/services/storage/src/simcore_service_storage/api/rpc/_paths.py b/services/storage/src/simcore_service_storage/api/rpc/_paths.py index 34ea3deeedb..1b991c9ddab 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_paths.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_paths.py @@ -1,3 +1,4 @@ +import logging from pathlib import Path from fastapi import FastAPI @@ -6,11 +7,13 @@ AsyncJobNameData, ) from models_library.projects_nodes_io import LocationID +from servicelib.logging_utils import log_catch from servicelib.rabbitmq import RPCRouter from ...modules.celery import get_celery_client from .._worker_tasks._paths import compute_path_size as remote_compute_path_size +_logger = logging.getLogger(__name__) router = RPCRouter() @@ -23,13 +26,13 @@ async def compute_path_size( path: Path, ) -> AsyncJobGet: assert app # nosec + with log_catch(logger=_logger): + task_uuid = await get_celery_client(app).send_task( + remote_compute_path_size.__name__, + task_context=job_id_data.model_dump(), + user_id=job_id_data.user_id, + location_id=location_id, + path=path, + ) - task_uuid = await get_celery_client(app).send_task( - remote_compute_path_size.__name__, - task_context=job_id_data.model_dump(), - user_id=job_id_data.user_id, - location_id=location_id, - path=path, - ) - - return AsyncJobGet(job_id=task_uuid) + return AsyncJobGet(job_id=task_uuid) From a9575a8e38a9306d70646f65d2bd71bf44e1fab7 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 26 Mar 2025 07:20:36 +0100 Subject: [PATCH 6/9] add temporary timeout for debugging async jobs --- .../common-library/src/common_library/async_tools.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/common-library/src/common_library/async_tools.py b/packages/common-library/src/common_library/async_tools.py index d92944299e7..2a8f582aa98 100644 --- a/packages/common-library/src/common_library/async_tools.py +++ b/packages/common-library/src/common_library/async_tools.py @@ -15,9 +15,12 @@ def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]: @functools.wraps(func) async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: loop = asyncio.get_running_loop() - return await loop.run_in_executor( - executor, functools.partial(func, *args, **kwargs) - ) + return await asyncio.wait_for( + loop.run_in_executor( + executor, functools.partial(func, *args, **kwargs) + ), + timeout=2, + ) # wait_for is temporary for debugging async jobs return wrapper From 3aa660f245c8c1dc08f78035d616009ac98d5735 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 26 Mar 2025 11:34:14 +0100 Subject: [PATCH 7/9] remove cancel error --- .../common-library/src/common_library/async_tools.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/common-library/src/common_library/async_tools.py b/packages/common-library/src/common_library/async_tools.py index 2a8f582aa98..d92944299e7 100644 --- a/packages/common-library/src/common_library/async_tools.py +++ b/packages/common-library/src/common_library/async_tools.py @@ -15,12 +15,9 @@ def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]: @functools.wraps(func) async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: loop = asyncio.get_running_loop() - return await asyncio.wait_for( - loop.run_in_executor( - executor, functools.partial(func, *args, **kwargs) - ), - timeout=2, - ) # wait_for is temporary for debugging async jobs + return await loop.run_in_executor( + executor, functools.partial(func, *args, **kwargs) + ) return wrapper From 7e177476b72aace3a4eead32d937df2abdcea9a2 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 26 Mar 2025 11:49:42 +0100 Subject: [PATCH 8/9] remove assert again --- .../storage/src/simcore_service_storage/api/rpc/_async_jobs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py index d7c2453d445..8906b51ef61 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py @@ -140,7 +140,6 @@ async def list_jobs( app: FastAPI, filter_: str, job_id_data: AsyncJobNameData ) -> list[AsyncJobGet]: assert app # nosec - assert filter_ # nosec try: task_uuids = await get_celery_client(app).get_task_uuids( task_context=job_id_data.model_dump(), From 0d954e9a1af66e66b78db33adbd143a7eb596df9 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 26 Mar 2025 13:16:14 +0100 Subject: [PATCH 9/9] make pylint happy --- services/storage/tests/unit/test__worker_tasks_paths.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/services/storage/tests/unit/test__worker_tasks_paths.py b/services/storage/tests/unit/test__worker_tasks_paths.py index c8d508d4129..83ca0024e6a 100644 --- a/services/storage/tests/unit/test__worker_tasks_paths.py +++ b/services/storage/tests/unit/test__worker_tasks_paths.py @@ -56,7 +56,11 @@ async def _assert_compute_path_size( expected_total_size: int, ) -> ByteSize: response = await compute_path_size( - celery_task, user_id=user_id, location_id=location_id, path=path + celery_task, + task_id=celery_task.id, + user_id=user_id, + location_id=location_id, + path=path, ) assert isinstance(response, ByteSize) assert response == expected_total_size