diff --git a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py index 1e5369a7820..594dd5bdbeb 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py @@ -110,7 +110,6 @@ async def list_jobs( app: FastAPI, filter_: str, job_id_data: AsyncJobNameData ) -> list[AsyncJobGet]: assert app # nosec - try: task_uuids = await get_celery_client(app).get_task_uuids( task_context=job_id_data.model_dump(), diff --git a/services/storage/src/simcore_service_storage/api/rpc/_data_export.py b/services/storage/src/simcore_service_storage/api/rpc/_data_export.py index 7fe6612e5e3..ac4f591dbe5 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_data_export.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_data_export.py @@ -1,3 +1,5 @@ +import logging + from celery.exceptions import CeleryError # type: ignore[import-untyped] from fastapi import FastAPI from models_library.api_schemas_rpc_async_jobs.async_jobs import ( @@ -19,6 +21,8 @@ from ...modules.datcore_adapter.datcore_adapter_exceptions import DatcoreAdapterError from ...simcore_s3_dsm import SimcoreS3DataManager +_logger = logging.getLogger(__name__) + router = RPCRouter() diff --git a/services/storage/src/simcore_service_storage/api/rpc/_paths.py b/services/storage/src/simcore_service_storage/api/rpc/_paths.py index 34ea3deeedb..b91042cf43b 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_paths.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_paths.py @@ -1,3 +1,4 @@ +import logging from pathlib import Path from fastapi import FastAPI @@ -11,6 +12,7 @@ from ...modules.celery import get_celery_client from .._worker_tasks._paths import compute_path_size as remote_compute_path_size +_logger = logging.getLogger(__name__) router = RPCRouter() @@ -23,7 +25,6 @@ async def compute_path_size( path: Path, ) -> AsyncJobGet: assert app # nosec - task_uuid = await get_celery_client(app).send_task( remote_compute_path_size.__name__, task_context=job_id_data.model_dump(), diff --git a/services/storage/src/simcore_service_storage/modules/celery/client.py b/services/storage/src/simcore_service_storage/modules/celery/client.py index d5b1d1b88af..23388d71d54 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/client.py +++ b/services/storage/src/simcore_service_storage/modules/celery/client.py @@ -38,7 +38,7 @@ _CELERY_TASK_ID_KEY_ENCODING = "utf-8" _MIN_PROGRESS_VALUE = 0.0 -_MAX_PROGRESS_VALUE = 100.0 +_MAX_PROGRESS_VALUE = 1.0 def _build_context_prefix(task_context: TaskContext) -> list[str]: @@ -109,8 +109,12 @@ def _get_progress_report( TaskState.ERROR, TaskState.SUCCESS, ): - return ProgressReport(actual_value=_MAX_PROGRESS_VALUE) - return ProgressReport(actual_value=_MIN_PROGRESS_VALUE) + return ProgressReport( + actual_value=_MAX_PROGRESS_VALUE, total=_MAX_PROGRESS_VALUE + ) + return ProgressReport( + actual_value=_MIN_PROGRESS_VALUE, total=_MAX_PROGRESS_VALUE + ) def _get_state(self, task_context: TaskContext, task_uuid: TaskUUID) -> TaskState: task_id = _build_task_id(task_context, task_uuid) diff --git a/services/storage/src/simcore_service_storage/modules/celery/models.py b/services/storage/src/simcore_service_storage/modules/celery/models.py index 6b72a6e0019..94f961a0e29 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/models.py +++ b/services/storage/src/simcore_service_storage/modules/celery/models.py @@ -1,5 +1,5 @@ from enum import StrEnum, auto -from typing import Any, Final, Self, TypeAlias +from typing import Any, Self, TypeAlias from uuid import UUID from models_library.progress_bar import ProgressReport @@ -9,9 +9,6 @@ TaskID: TypeAlias = str TaskUUID: TypeAlias = UUID -_MIN_PROGRESS: Final[float] = 0.0 -_MAX_PROGRESS: Final[float] = 100.0 - class TaskState(StrEnum): PENDING = auto() @@ -36,13 +33,15 @@ def is_done(self) -> bool: @model_validator(mode="after") def _check_consistency(self) -> Self: value = self.progress_report.actual_value + min_value = 0.0 + max_value = self.progress_report.total valid_states = { - TaskState.PENDING: value == _MIN_PROGRESS, - TaskState.RUNNING: _MIN_PROGRESS <= value <= _MAX_PROGRESS, - TaskState.SUCCESS: value == _MAX_PROGRESS, - TaskState.ABORTED: value == _MAX_PROGRESS, - TaskState.ERROR: value == _MAX_PROGRESS, + TaskState.PENDING: value == min_value, + TaskState.RUNNING: min_value <= value <= max_value, + TaskState.SUCCESS: value == max_value, + TaskState.ABORTED: value == max_value, + TaskState.ERROR: value == max_value, } if not valid_states.get(self.task_state, True): diff --git a/services/storage/src/simcore_service_storage/modules/celery/tasks.py b/services/storage/src/simcore_service_storage/modules/celery/tasks.py index 014151acd74..881285e5a20 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/tasks.py +++ b/services/storage/src/simcore_service_storage/modules/celery/tasks.py @@ -13,6 +13,7 @@ def export_data(task: Task, files: list[StorageFileID]): _logger.info("Exporting files: %s", files) + assert len(files) > 0 for n, file in enumerate(files, start=1): with log_context( _logger, @@ -23,7 +24,7 @@ def export_data(task: Task, files: list[StorageFileID]): get_celery_worker(task.app).set_task_progress( task_name=task.name, task_id=task.request.id, - report=ProgressReport(actual_value=n / len(files) * 100), + report=ProgressReport(actual_value=n / len(files), total=1), ) time.sleep(10) return "done" diff --git a/services/storage/tests/unit/modules/celery/test_celery.py b/services/storage/tests/unit/modules/celery/test_celery.py index 77dd5cf3e2a..c6c4e53135f 100644 --- a/services/storage/tests/unit/modules/celery/test_celery.py +++ b/services/storage/tests/unit/modules/celery/test_celery.py @@ -41,7 +41,7 @@ def sleep_for(seconds: float) -> None: worker.set_task_progress( task_name=task_name, task_id=task_id, - report=ProgressReport(actual_value=n / len(files) * 10), + report=ProgressReport(actual_value=n / len(files), total=1.0), ) await asyncio.get_event_loop().run_in_executor(None, sleep_for, 1) diff --git a/services/storage/tests/unit/test_data_export.py b/services/storage/tests/unit/test_data_export.py index 9b971b40739..6e4c2ca412a 100644 --- a/services/storage/tests/unit/test_data_export.py +++ b/services/storage/tests/unit/test_data_export.py @@ -454,7 +454,7 @@ async def test_get_data_export_status_error( "get_task_status_object": TaskStatus( task_uuid=TaskUUID(_faker.uuid4()), task_state=TaskState.SUCCESS, - progress_report=ProgressReport(actual_value=100), + progress_report=ProgressReport(actual_value=1, total=1), ), "get_task_result_object": "result", "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], @@ -489,7 +489,7 @@ async def test_get_data_export_result_success( "get_task_status_object": TaskStatus( task_uuid=TaskUUID(_faker.uuid4()), task_state=TaskState.RUNNING, - progress_report=ProgressReport(actual_value=50), + progress_report=ProgressReport(actual_value=0.5, total=1.0), ), "get_task_result_object": _faker.text(), "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], @@ -501,7 +501,7 @@ async def test_get_data_export_result_success( "get_task_status_object": TaskStatus( task_uuid=TaskUUID(_faker.uuid4()), task_state=TaskState.ABORTED, - progress_report=ProgressReport(actual_value=100), + progress_report=ProgressReport(actual_value=1.0, total=1.0), ), "get_task_result_object": _faker.text(), "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], @@ -513,7 +513,7 @@ async def test_get_data_export_result_success( "get_task_status_object": TaskStatus( task_uuid=TaskUUID(_faker.uuid4()), task_state=TaskState.ERROR, - progress_report=ProgressReport(actual_value=100), + progress_report=ProgressReport(actual_value=1.0, total=1.0), ), "get_task_result_object": _faker.text(), "get_task_uuids_object": [AsyncJobId(_faker.uuid4())],