Skip to content

♻️ Further cleanup of async jobs framework #7424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def result(
exc_type = ""
exc_msg = ""
with log_catch(logger=_logger, reraise=False):
task_error = TaskError.model_validate_json(_result)
task_error = TaskError.model_validate(_result)
exc_type = task_error.exc_type
exc_msg = task_error.exc_msg
raise JobError(job_id=job_id, exc_type=exc_type, exc_msg=exc_msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"

_MIN_PROGRESS_VALUE = 0.0
_MAX_PROGRESS_VALUE = 100.0
_MAX_PROGRESS_VALUE = 1.0


def _build_context_prefix(task_context: TaskContext) -> list[str]:
Expand Down Expand Up @@ -99,8 +99,12 @@ def _get_progress_report(
TaskState.ERROR,
TaskState.SUCCESS,
):
return ProgressReport(actual_value=_MAX_PROGRESS_VALUE)
return ProgressReport(actual_value=_MIN_PROGRESS_VALUE)
return ProgressReport(
actual_value=_MAX_PROGRESS_VALUE, total=_MAX_PROGRESS_VALUE
)
return ProgressReport(
actual_value=_MIN_PROGRESS_VALUE, total=_MAX_PROGRESS_VALUE
)

def _get_state(self, task_context: TaskContext, task_uuid: TaskUUID) -> TaskState:
task_id = _build_task_id(task_context, task_uuid)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import StrEnum, auto
from typing import Any, Final, Self, TypeAlias
from typing import Any, Self, TypeAlias
from uuid import UUID

from models_library.progress_bar import ProgressReport
Expand All @@ -9,9 +9,6 @@
TaskID: TypeAlias = str
TaskUUID: TypeAlias = UUID

_MIN_PROGRESS: Final[float] = 0.0
_MAX_PROGRESS: Final[float] = 100.0


class TaskState(StrEnum):
PENDING = auto()
Expand All @@ -36,13 +33,15 @@ def is_done(self) -> bool:
@model_validator(mode="after")
def _check_consistency(self) -> Self:
value = self.progress_report.actual_value
min_value = 0.0
max_value = self.progress_report.total

valid_states = {
TaskState.PENDING: value == _MIN_PROGRESS,
TaskState.RUNNING: _MIN_PROGRESS <= value <= _MAX_PROGRESS,
TaskState.SUCCESS: value == _MAX_PROGRESS,
TaskState.ABORTED: value == _MAX_PROGRESS,
TaskState.ERROR: value == _MAX_PROGRESS,
TaskState.PENDING: value == min_value,
TaskState.RUNNING: min_value <= value <= max_value,
TaskState.SUCCESS: value == max_value,
TaskState.ABORTED: value == max_value,
TaskState.ERROR: value == max_value,
}

if not valid_states.get(self.task_state, True):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

def export_data(task: Task, files: list[StorageFileID]):
_logger.info("Exporting files: %s", files)
assert len(files) > 0
for n, file in enumerate(files, start=1):
with log_context(
_logger,
Expand All @@ -23,7 +24,7 @@ def export_data(task: Task, files: list[StorageFileID]):
get_celery_worker(task.app).set_task_progress(
task_name=task.name,
task_id=task.request.id,
report=ProgressReport(actual_value=n / len(files) * 100),
report=ProgressReport(actual_value=n, total=len(files)),
)
time.sleep(10)
return "done"
Loading