Skip to content

Commit 0086fd0

Browse files
committed
Merge branch 'master' into further-cleanup-of-async-jobs-framework
2 parents 0d954e9 + 7202315 commit 0086fd0

File tree

3 files changed

+44
-59
lines changed

3 files changed

+44
-59
lines changed

services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py

+2-32
Original file line numberDiff line numberDiff line change
@@ -14,45 +14,24 @@
1414
from models_library.api_schemas_rpc_async_jobs.exceptions import (
1515
JobAbortedError,
1616
JobError,
17-
JobMissingError,
1817
JobNotDoneError,
1918
JobSchedulerError,
2019
)
2120
from servicelib.logging_utils import log_catch
2221
from servicelib.rabbitmq import RPCRouter
2322

2423
from ...modules.celery import get_celery_client
25-
from ...modules.celery.client import CeleryTaskQueueClient
2624
from ...modules.celery.models import TaskError, TaskState
2725

2826
_logger = logging.getLogger(__name__)
2927
router = RPCRouter()
3028

3129

32-
async def _assert_job_exists(
33-
*,
34-
job_id: AsyncJobId,
35-
job_id_data: AsyncJobNameData,
36-
celery_client: CeleryTaskQueueClient,
37-
) -> None:
38-
"""Raises JobMissingError if job doesn't exist"""
39-
job_ids = await celery_client.get_task_uuids(
40-
task_context=job_id_data.model_dump(),
41-
)
42-
if job_id not in job_ids:
43-
raise JobMissingError(job_id=f"{job_id}")
44-
45-
46-
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
30+
@router.expose(reraise_if_error_type=(JobSchedulerError,))
4731
async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData):
4832
assert app # nosec
4933
assert job_id_data # nosec
5034
try:
51-
await _assert_job_exists(
52-
job_id=job_id,
53-
job_id_data=job_id_data,
54-
celery_client=get_celery_client(app),
55-
)
5635
await get_celery_client(app).abort_task(
5736
task_context=job_id_data.model_dump(),
5837
task_uuid=job_id,
@@ -61,19 +40,14 @@ async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
6140
raise JobSchedulerError(exc=f"{exc}") from exc
6241

6342

64-
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
43+
@router.expose(reraise_if_error_type=(JobSchedulerError,))
6544
async def status(
6645
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
6746
) -> AsyncJobStatus:
6847
assert app # nosec
6948
assert job_id_data # nosec
7049

7150
try:
72-
await _assert_job_exists(
73-
job_id=job_id,
74-
job_id_data=job_id_data,
75-
celery_client=get_celery_client(app),
76-
)
7751
task_status = await get_celery_client(app).get_task_status(
7852
task_context=job_id_data.model_dump(),
7953
task_uuid=job_id,
@@ -94,7 +68,6 @@ async def status(
9468
JobNotDoneError,
9569
JobAbortedError,
9670
JobSchedulerError,
97-
JobMissingError,
9871
)
9972
)
10073
async def result(
@@ -105,9 +78,6 @@ async def result(
10578
assert job_id_data # nosec
10679

10780
try:
108-
await _assert_job_exists(
109-
job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app)
110-
)
11181
_status = await get_celery_client(app).get_task_status(
11282
task_context=job_id_data.model_dump(),
11383
task_uuid=job_id,

services/storage/tests/unit/test__worker_tasks_paths.py

+28-20
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from pydantic import ByteSize, TypeAdapter
2222
from pytest_simcore.helpers.storage_utils import FileIDDict, ProjectWithFilesParams
2323
from simcore_service_storage.api._worker_tasks._paths import compute_path_size
24+
from simcore_service_storage.modules.celery.models import TaskId
2425
from simcore_service_storage.modules.celery.utils import set_fastapi_app
2526
from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager
2627

@@ -48,16 +49,17 @@ def _filter_and_group_paths_one_level_deeper(
4849

4950

5051
async def _assert_compute_path_size(
52+
*,
5153
celery_task: Task,
54+
task_id: TaskId,
5255
location_id: LocationID,
5356
user_id: UserID,
54-
*,
5557
path: Path,
5658
expected_total_size: int,
5759
) -> ByteSize:
5860
response = await compute_path_size(
5961
celery_task,
60-
task_id=celery_task.id,
62+
task_id=task_id,
6163
user_id=user_id,
6264
location_id=location_id,
6365
path=path,
@@ -115,9 +117,10 @@ async def test_path_compute_size(
115117
expected_total_size = project_params.allowed_file_sizes[0] * total_num_files
116118
path = Path(project["uuid"])
117119
await _assert_compute_path_size(
118-
fake_celery_task,
119-
location_id,
120-
user_id,
120+
celery_task=fake_celery_task,
121+
task_id=TaskId("fake_task"),
122+
location_id=location_id,
123+
user_id=user_id,
121124
path=path,
122125
expected_total_size=expected_total_size,
123126
)
@@ -132,9 +135,10 @@ async def test_path_compute_size(
132135
selected_node_s3_keys
133136
)
134137
await _assert_compute_path_size(
135-
fake_celery_task,
136-
location_id,
137-
user_id,
138+
celery_task=fake_celery_task,
139+
task_id=TaskId("fake_task"),
140+
location_id=location_id,
141+
user_id=user_id,
138142
path=path,
139143
expected_total_size=expected_total_size,
140144
)
@@ -150,9 +154,10 @@ async def test_path_compute_size(
150154
selected_node_s3_keys
151155
)
152156
await _assert_compute_path_size(
153-
fake_celery_task,
154-
location_id,
155-
user_id,
157+
celery_task=fake_celery_task,
158+
task_id=TaskId("fake_task"),
159+
location_id=location_id,
160+
user_id=user_id,
156161
path=path,
157162
expected_total_size=expected_total_size,
158163
)
@@ -168,9 +173,10 @@ async def test_path_compute_size(
168173
selected_node_s3_keys
169174
)
170175
workspace_total_size = await _assert_compute_path_size(
171-
fake_celery_task,
172-
location_id,
173-
user_id,
176+
celery_task=fake_celery_task,
177+
task_id=TaskId("fake_task"),
178+
location_id=location_id,
179+
user_id=user_id,
174180
path=path,
175181
expected_total_size=expected_total_size,
176182
)
@@ -192,9 +198,10 @@ async def test_path_compute_size(
192198
selected_node_s3_keys
193199
)
194200
accumulated_subfolder_size += await _assert_compute_path_size(
195-
fake_celery_task,
196-
location_id,
197-
user_id,
201+
celery_task=fake_celery_task,
202+
task_id=TaskId("fake_task"),
203+
location_id=location_id,
204+
user_id=user_id,
198205
path=workspace_subfolder,
199206
expected_total_size=expected_total_size,
200207
)
@@ -212,9 +219,10 @@ async def test_path_compute_size_inexistent_path(
212219
fake_datcore_tokens: tuple[str, str],
213220
):
214221
await _assert_compute_path_size(
215-
fake_celery_task,
216-
location_id,
217-
user_id,
222+
celery_task=fake_celery_task,
223+
task_id=TaskId("fake_task"),
224+
location_id=location_id,
225+
user_id=user_id,
218226
path=Path(faker.file_path(absolute=False)),
219227
expected_total_size=0,
220228
)

services/storage/tests/unit/test_data_export.py

+14-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from models_library.api_schemas_rpc_async_jobs.exceptions import (
2323
JobAbortedError,
2424
JobError,
25-
JobMissingError,
2625
JobNotDoneError,
2726
JobSchedulerError,
2827
)
@@ -343,7 +342,6 @@ async def test_abort_data_export_success(
343342
@pytest.mark.parametrize(
344343
"mock_celery_client, expected_exception_type",
345344
[
346-
({"abort_task_object": None, "get_task_uuids_object": []}, JobMissingError),
347345
(
348346
{
349347
"abort_task_object": CeleryError("error"),
@@ -377,6 +375,14 @@ async def test_abort_data_export_error(
377375
@pytest.mark.parametrize(
378376
"mock_celery_client",
379377
[
378+
{
379+
"get_task_status_object": TaskStatus(
380+
task_uuid=TaskUUID(_faker.uuid4()),
381+
task_state=TaskState.PENDING,
382+
progress_report=ProgressReport(actual_value=0),
383+
),
384+
"get_task_uuids_object": [],
385+
},
380386
{
381387
"get_task_status_object": TaskStatus(
382388
task_uuid=TaskUUID(_faker.uuid4()),
@@ -411,10 +417,6 @@ async def test_get_data_export_status(
411417
@pytest.mark.parametrize(
412418
"mock_celery_client, expected_exception_type",
413419
[
414-
(
415-
{"get_task_status_object": None, "get_task_uuids_object": []},
416-
JobMissingError,
417-
),
418420
(
419421
{
420422
"get_task_status_object": CeleryError("error"),
@@ -528,9 +530,14 @@ async def test_get_data_export_result_success(
528530
),
529531
(
530532
{
533+
"get_task_status_object": TaskStatus(
534+
task_uuid=TaskUUID(_faker.uuid4()),
535+
task_state=TaskState.PENDING,
536+
progress_report=ProgressReport(actual_value=0.0),
537+
),
531538
"get_task_uuids_object": [],
532539
},
533-
JobMissingError,
540+
JobNotDoneError,
534541
),
535542
],
536543
indirect=["mock_celery_client"],

0 commit comments

Comments
 (0)