Skip to content

♻️ Further cleanup of async jobs framework #7424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ async def list_jobs(
app: FastAPI, filter_: str, job_id_data: AsyncJobNameData
) -> list[AsyncJobGet]:
assert app # nosec

try:
task_uuids = await get_celery_client(app).get_task_uuids(
task_context=job_id_data.model_dump(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

from celery.exceptions import CeleryError # type: ignore[import-untyped]
from fastapi import FastAPI
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
Expand All @@ -19,6 +21,8 @@
from ...modules.datcore_adapter.datcore_adapter_exceptions import DatcoreAdapterError
from ...simcore_s3_dsm import SimcoreS3DataManager

_logger = logging.getLogger(__name__)

router = RPCRouter()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from pathlib import Path

from fastapi import FastAPI
Expand All @@ -11,6 +12,7 @@
from ...modules.celery import get_celery_client
from .._worker_tasks._paths import compute_path_size as remote_compute_path_size

_logger = logging.getLogger(__name__)
router = RPCRouter()


Expand All @@ -23,7 +25,6 @@ async def compute_path_size(
path: Path,
) -> AsyncJobGet:
assert app # nosec

task_uuid = await get_celery_client(app).send_task(
remote_compute_path_size.__name__,
task_context=job_id_data.model_dump(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"

_MIN_PROGRESS_VALUE = 0.0
_MAX_PROGRESS_VALUE = 100.0
_MAX_PROGRESS_VALUE = 1.0


def _build_context_prefix(task_context: TaskContext) -> list[str]:
Expand Down Expand Up @@ -109,8 +109,12 @@ def _get_progress_report(
TaskState.ERROR,
TaskState.SUCCESS,
):
return ProgressReport(actual_value=_MAX_PROGRESS_VALUE)
return ProgressReport(actual_value=_MIN_PROGRESS_VALUE)
return ProgressReport(
actual_value=_MAX_PROGRESS_VALUE, total=_MAX_PROGRESS_VALUE
)
return ProgressReport(
actual_value=_MIN_PROGRESS_VALUE, total=_MAX_PROGRESS_VALUE
)

def _get_state(self, task_context: TaskContext, task_uuid: TaskUUID) -> TaskState:
task_id = _build_task_id(task_context, task_uuid)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import StrEnum, auto
from typing import Any, Final, Self, TypeAlias
from typing import Any, Self, TypeAlias
from uuid import UUID

from models_library.progress_bar import ProgressReport
Expand All @@ -9,9 +9,6 @@
TaskID: TypeAlias = str
TaskUUID: TypeAlias = UUID

_MIN_PROGRESS: Final[float] = 0.0
_MAX_PROGRESS: Final[float] = 100.0


class TaskState(StrEnum):
PENDING = auto()
Expand All @@ -36,13 +33,15 @@ def is_done(self) -> bool:
@model_validator(mode="after")
def _check_consistency(self) -> Self:
value = self.progress_report.actual_value
min_value = 0.0
max_value = self.progress_report.total

valid_states = {
TaskState.PENDING: value == _MIN_PROGRESS,
TaskState.RUNNING: _MIN_PROGRESS <= value <= _MAX_PROGRESS,
TaskState.SUCCESS: value == _MAX_PROGRESS,
TaskState.ABORTED: value == _MAX_PROGRESS,
TaskState.ERROR: value == _MAX_PROGRESS,
TaskState.PENDING: value == min_value,
TaskState.RUNNING: min_value <= value <= max_value,
TaskState.SUCCESS: value == max_value,
TaskState.ABORTED: value == max_value,
TaskState.ERROR: value == max_value,
}

if not valid_states.get(self.task_state, True):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

def export_data(task: Task, files: list[StorageFileID]):
_logger.info("Exporting files: %s", files)
assert len(files) > 0
for n, file in enumerate(files, start=1):
with log_context(
_logger,
Expand All @@ -23,7 +24,7 @@ def export_data(task: Task, files: list[StorageFileID]):
get_celery_worker(task.app).set_task_progress(
task_name=task.name,
task_id=task.request.id,
report=ProgressReport(actual_value=n / len(files) * 100),
report=ProgressReport(actual_value=n / len(files), total=1),
)
time.sleep(10)
return "done"
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def sleep_for(seconds: float) -> None:
worker.set_task_progress(
task_name=task_name,
task_id=task_id,
report=ProgressReport(actual_value=n / len(files) * 10),
report=ProgressReport(actual_value=n / len(files), total=1.0),
)
await asyncio.get_event_loop().run_in_executor(None, sleep_for, 1)

Expand Down
8 changes: 4 additions & 4 deletions services/storage/tests/unit/test_data_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ async def test_get_data_export_status_error(
"get_task_status_object": TaskStatus(
task_uuid=TaskUUID(_faker.uuid4()),
task_state=TaskState.SUCCESS,
progress_report=ProgressReport(actual_value=100),
progress_report=ProgressReport(actual_value=1, total=1),
),
"get_task_result_object": "result",
"get_task_uuids_object": [AsyncJobId(_faker.uuid4())],
Expand Down Expand Up @@ -489,7 +489,7 @@ async def test_get_data_export_result_success(
"get_task_status_object": TaskStatus(
task_uuid=TaskUUID(_faker.uuid4()),
task_state=TaskState.RUNNING,
progress_report=ProgressReport(actual_value=50),
progress_report=ProgressReport(actual_value=0.5, total=1.0),
),
"get_task_result_object": _faker.text(),
"get_task_uuids_object": [AsyncJobId(_faker.uuid4())],
Expand All @@ -501,7 +501,7 @@ async def test_get_data_export_result_success(
"get_task_status_object": TaskStatus(
task_uuid=TaskUUID(_faker.uuid4()),
task_state=TaskState.ABORTED,
progress_report=ProgressReport(actual_value=100),
progress_report=ProgressReport(actual_value=1.0, total=1.0),
),
"get_task_result_object": _faker.text(),
"get_task_uuids_object": [AsyncJobId(_faker.uuid4())],
Expand All @@ -513,7 +513,7 @@ async def test_get_data_export_result_success(
"get_task_status_object": TaskStatus(
task_uuid=TaskUUID(_faker.uuid4()),
task_state=TaskState.ERROR,
progress_report=ProgressReport(actual_value=100),
progress_report=ProgressReport(actual_value=1.0, total=1.0),
),
"get_task_result_object": _faker.text(),
"get_task_uuids_object": [AsyncJobId(_faker.uuid4())],
Expand Down
Loading