Skip to content

Commit 7202315

Browse files
⚗️ Remove asserts in async jobs (#7434)
push!
1 parent d7e6fd3 commit 7202315

File tree

3 files changed

+48
-55
lines changed

3 files changed

+48
-55
lines changed

services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py

+2-28
Original file line numberDiff line numberDiff line change
@@ -14,43 +14,24 @@
1414
from models_library.api_schemas_rpc_async_jobs.exceptions import (
1515
JobAbortedError,
1616
JobError,
17-
JobMissingError,
1817
JobNotDoneError,
1918
JobSchedulerError,
2019
)
2120
from servicelib.logging_utils import log_catch
2221
from servicelib.rabbitmq import RPCRouter
2322

2423
from ...modules.celery import get_celery_client
25-
from ...modules.celery.client import CeleryTaskQueueClient
2624
from ...modules.celery.models import TaskError, TaskState
2725

2826
_logger = logging.getLogger(__name__)
2927
router = RPCRouter()
3028

3129

32-
async def _assert_job_exists(
33-
*,
34-
job_id: AsyncJobId,
35-
job_id_data: AsyncJobNameData,
36-
celery_client: CeleryTaskQueueClient,
37-
) -> None:
38-
"""Raises JobMissingError if job doesn't exist"""
39-
job_ids = await celery_client.get_task_uuids(
40-
task_context=job_id_data.model_dump(),
41-
)
42-
if job_id not in job_ids:
43-
raise JobMissingError(job_id=f"{job_id}")
44-
45-
46-
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
30+
@router.expose(reraise_if_error_type=(JobSchedulerError,))
4731
async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData):
4832
assert app # nosec
4933
assert job_id_data # nosec
5034
try:
51-
await _assert_job_exists(
52-
job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app)
53-
)
5435
await get_celery_client(app).abort_task(
5536
task_context=job_id_data.model_dump(),
5637
task_uuid=job_id,
@@ -59,17 +40,14 @@ async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
5940
raise JobSchedulerError(exc=f"{exc}") from exc
6041

6142

62-
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
43+
@router.expose(reraise_if_error_type=(JobSchedulerError,))
6344
async def status(
6445
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
6546
) -> AsyncJobStatus:
6647
assert app # nosec
6748
assert job_id_data # nosec
6849

6950
try:
70-
await _assert_job_exists(
71-
job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app)
72-
)
7351
task_status = await get_celery_client(app).get_task_status(
7452
task_context=job_id_data.model_dump(),
7553
task_uuid=job_id,
@@ -90,7 +68,6 @@ async def status(
9068
JobNotDoneError,
9169
JobAbortedError,
9270
JobSchedulerError,
93-
JobMissingError,
9471
)
9572
)
9673
async def result(
@@ -101,9 +78,6 @@ async def result(
10178
assert job_id_data # nosec
10279

10380
try:
104-
await _assert_job_exists(
105-
job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app)
106-
)
10781
_status = await get_celery_client(app).get_task_status(
10882
task_context=job_id_data.model_dump(),
10983
task_uuid=job_id,

services/storage/tests/unit/test__worker_tasks_paths.py

+32-20
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from pydantic import ByteSize, TypeAdapter
2222
from pytest_simcore.helpers.storage_utils import FileIDDict, ProjectWithFilesParams
2323
from simcore_service_storage.api._worker_tasks._paths import compute_path_size
24+
from simcore_service_storage.modules.celery.models import TaskId
2425
from simcore_service_storage.modules.celery.utils import set_fastapi_app
2526
from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager
2627

@@ -48,15 +49,20 @@ def _filter_and_group_paths_one_level_deeper(
4849

4950

5051
async def _assert_compute_path_size(
52+
*,
5153
celery_task: Task,
54+
task_id: TaskId,
5255
location_id: LocationID,
5356
user_id: UserID,
54-
*,
5557
path: Path,
5658
expected_total_size: int,
5759
) -> ByteSize:
5860
response = await compute_path_size(
59-
celery_task, user_id=user_id, location_id=location_id, path=path
61+
celery_task,
62+
task_id=task_id,
63+
user_id=user_id,
64+
location_id=location_id,
65+
path=path,
6066
)
6167
assert isinstance(response, ByteSize)
6268
assert response == expected_total_size
@@ -111,9 +117,10 @@ async def test_path_compute_size(
111117
expected_total_size = project_params.allowed_file_sizes[0] * total_num_files
112118
path = Path(project["uuid"])
113119
await _assert_compute_path_size(
114-
fake_celery_task,
115-
location_id,
116-
user_id,
120+
celery_task=fake_celery_task,
121+
task_id=TaskId("fake_task"),
122+
location_id=location_id,
123+
user_id=user_id,
117124
path=path,
118125
expected_total_size=expected_total_size,
119126
)
@@ -128,9 +135,10 @@ async def test_path_compute_size(
128135
selected_node_s3_keys
129136
)
130137
await _assert_compute_path_size(
131-
fake_celery_task,
132-
location_id,
133-
user_id,
138+
celery_task=fake_celery_task,
139+
task_id=TaskId("fake_task"),
140+
location_id=location_id,
141+
user_id=user_id,
134142
path=path,
135143
expected_total_size=expected_total_size,
136144
)
@@ -146,9 +154,10 @@ async def test_path_compute_size(
146154
selected_node_s3_keys
147155
)
148156
await _assert_compute_path_size(
149-
fake_celery_task,
150-
location_id,
151-
user_id,
157+
celery_task=fake_celery_task,
158+
task_id=TaskId("fake_task"),
159+
location_id=location_id,
160+
user_id=user_id,
152161
path=path,
153162
expected_total_size=expected_total_size,
154163
)
@@ -164,9 +173,10 @@ async def test_path_compute_size(
164173
selected_node_s3_keys
165174
)
166175
workspace_total_size = await _assert_compute_path_size(
167-
fake_celery_task,
168-
location_id,
169-
user_id,
176+
celery_task=fake_celery_task,
177+
task_id=TaskId("fake_task"),
178+
location_id=location_id,
179+
user_id=user_id,
170180
path=path,
171181
expected_total_size=expected_total_size,
172182
)
@@ -188,9 +198,10 @@ async def test_path_compute_size(
188198
selected_node_s3_keys
189199
)
190200
accumulated_subfolder_size += await _assert_compute_path_size(
191-
fake_celery_task,
192-
location_id,
193-
user_id,
201+
celery_task=fake_celery_task,
202+
task_id=TaskId("fake_task"),
203+
location_id=location_id,
204+
user_id=user_id,
194205
path=workspace_subfolder,
195206
expected_total_size=expected_total_size,
196207
)
@@ -208,9 +219,10 @@ async def test_path_compute_size_inexistent_path(
208219
fake_datcore_tokens: tuple[str, str],
209220
):
210221
await _assert_compute_path_size(
211-
fake_celery_task,
212-
location_id,
213-
user_id,
222+
celery_task=fake_celery_task,
223+
task_id=TaskId("fake_task"),
224+
location_id=location_id,
225+
user_id=user_id,
214226
path=Path(faker.file_path(absolute=False)),
215227
expected_total_size=0,
216228
)

services/storage/tests/unit/test_data_export.py

+14-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from models_library.api_schemas_rpc_async_jobs.exceptions import (
2323
JobAbortedError,
2424
JobError,
25-
JobMissingError,
2625
JobNotDoneError,
2726
JobSchedulerError,
2827
)
@@ -343,7 +342,6 @@ async def test_abort_data_export_success(
343342
@pytest.mark.parametrize(
344343
"mock_celery_client, expected_exception_type",
345344
[
346-
({"abort_task_object": None, "get_task_uuids_object": []}, JobMissingError),
347345
(
348346
{
349347
"abort_task_object": CeleryError("error"),
@@ -377,6 +375,14 @@ async def test_abort_data_export_error(
377375
@pytest.mark.parametrize(
378376
"mock_celery_client",
379377
[
378+
{
379+
"get_task_status_object": TaskStatus(
380+
task_uuid=TaskUUID(_faker.uuid4()),
381+
task_state=TaskState.PENDING,
382+
progress_report=ProgressReport(actual_value=0),
383+
),
384+
"get_task_uuids_object": [],
385+
},
380386
{
381387
"get_task_status_object": TaskStatus(
382388
task_uuid=TaskUUID(_faker.uuid4()),
@@ -411,10 +417,6 @@ async def test_get_data_export_status(
411417
@pytest.mark.parametrize(
412418
"mock_celery_client, expected_exception_type",
413419
[
414-
(
415-
{"get_task_status_object": None, "get_task_uuids_object": []},
416-
JobMissingError,
417-
),
418420
(
419421
{
420422
"get_task_status_object": CeleryError("error"),
@@ -528,9 +530,14 @@ async def test_get_data_export_result_success(
528530
),
529531
(
530532
{
533+
"get_task_status_object": TaskStatus(
534+
task_uuid=TaskUUID(_faker.uuid4()),
535+
task_state=TaskState.PENDING,
536+
progress_report=ProgressReport(actual_value=0.0),
537+
),
531538
"get_task_uuids_object": [],
532539
},
533-
JobMissingError,
540+
JobNotDoneError,
534541
),
535542
],
536543
indirect=["mock_celery_client"],

0 commit comments

Comments
 (0)