From 4c88efb84b68672c059e1e3a6fde0f0ac6725958 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 11:52:22 +0200 Subject: [PATCH 01/29] rename --- .../modules/celery/client.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/services/storage/src/simcore_service_storage/modules/celery/client.py b/services/storage/src/simcore_service_storage/modules/celery/client.py index 305731f946a..1cec33c907a 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/client.py +++ b/services/storage/src/simcore_service_storage/modules/celery/client.py @@ -35,7 +35,7 @@ class CeleryTaskClient: _celery_app: Celery _celery_settings: CelerySettings - _task_store: TaskInfoStore + _task_info_store: TaskInfoStore async def submit_task( self, @@ -63,7 +63,9 @@ async def submit_task( if task_metadata.ephemeral else self._celery_settings.CELERY_RESULT_EXPIRES ) - await self._task_store.create_task(task_id, task_metadata, expiry=expiry) + await self._task_info_store.create_task( + task_id, task_metadata, expiry=expiry + ) return task_uuid @make_async() @@ -96,9 +98,9 @@ async def get_task_result( async_result = self._celery_app.AsyncResult(task_id) result = async_result.result if async_result.ready(): - task_metadata = await self._task_store.get_task_metadata(task_id) + task_metadata = await self._task_info_store.get_task_metadata(task_id) if task_metadata is not None and task_metadata.ephemeral: - await self._task_store.remove_task(task_id) + await self._task_info_store.remove_task(task_id) await self._forget_task(task_id) return result @@ -107,7 +109,7 @@ async def _get_task_progress_report( ) -> ProgressReport: if task_state in (TaskState.STARTED, TaskState.RETRY, TaskState.ABORTED): task_id = build_task_id(task_context, task_uuid) - progress = await self._task_store.get_task_progress(task_id) + progress = await self._task_info_store.get_task_progress(task_id) if progress is not None: return progress if task_state in ( @@ -153,4 +155,4 @@ async def list_tasks(self, task_context: TaskContext) -> list[Task]: logging.DEBUG, msg=f"Listing tasks: {task_context=}", ): - return await self._task_store.list_tasks(task_context) + return await self._task_info_store.list_tasks(task_context) From 391ca3b3bb80603c706cae719e64efb9d6fa1fa8 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 14:08:11 +0200 Subject: [PATCH 02/29] add delete endpoint --- .../rpc_interfaces/async_jobs/async_jobs.py | 24 ++++++++++-- .../storage/tests/unit/test_async_jobs.py | 2 +- .../simcore_service_webserver/tasks/_rest.py | 26 ++++++++++++- .../unit/with_dbs/01/storage/test_storage.py | 37 ++++++++++++++++++- 4 files changed, 81 insertions(+), 8 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py index f6e1954c936..1d750924731 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py @@ -36,7 +36,7 @@ _logger = logging.getLogger(__name__) -async def cancel( +async def abort( rabbitmq_rpc_client: RabbitMQRPCClient, *, rpc_namespace: RPCNamespace, @@ -45,7 +45,23 @@ async def cancel( ) -> None: await rabbitmq_rpc_client.request( rpc_namespace, - TypeAdapter(RPCMethodName).validate_python("cancel"), + TypeAdapter(RPCMethodName).validate_python("abort"), + job_id=job_id, + job_id_data=job_id_data, + timeout_s=_DEFAULT_TIMEOUT_S, + ) + + +async def delete( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + rpc_namespace: RPCNamespace, + job_id: AsyncJobId, + job_id_data: AsyncJobNameData, +) -> None: + await rabbitmq_rpc_client.request( + rpc_namespace, + TypeAdapter(RPCMethodName).validate_python("delete"), job_id=job_id, job_id_data=job_id_data, timeout_s=_DEFAULT_TIMEOUT_S, @@ -222,7 +238,7 @@ async def wait_and_get_result( ) except (TimeoutError, CancelledError) as error: try: - await cancel( + await abort( rabbitmq_rpc_client, rpc_namespace=rpc_namespace, job_id=job_id, @@ -254,7 +270,7 @@ async def submit_and_wait( except (TimeoutError, CancelledError) as error: if async_job_rpc_get is not None: try: - await cancel( + await abort( rabbitmq_rpc_client, rpc_namespace=rpc_namespace, job_id=async_job_rpc_get.job_id, diff --git a/services/storage/tests/unit/test_async_jobs.py b/services/storage/tests/unit/test_async_jobs.py index 95319a6533f..351424d6396 100644 --- a/services/storage/tests/unit/test_async_jobs.py +++ b/services/storage/tests/unit/test_async_jobs.py @@ -264,7 +264,7 @@ async def test_async_jobs_cancel( payload=60 * 10, # test hangs if not cancelled properly ) - await async_jobs.cancel( + await async_jobs.abort( storage_rabbitmq_rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, job_id=async_job_get.job_id, diff --git a/services/web/server/src/simcore_service_webserver/tasks/_rest.py b/services/web/server/src/simcore_service_webserver/tasks/_rest.py index 71850d627a7..4dff5986f5a 100644 --- a/services/web/server/src/simcore_service_webserver/tasks/_rest.py +++ b/services/web/server/src/simcore_service_webserver/tasks/_rest.py @@ -136,6 +136,30 @@ async def get_async_job_status(request: web.Request) -> web.Response: @routes.delete( _task_prefix + "/{task_id}", + name="delete_async_job", +) +@login_required +@permission_required("storage.files.*") +@handle_export_data_exceptions +async def delete_async_job(request: web.Request) -> web.Response: + + _req_ctx = RequestContext.model_validate(request) + + rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) + async_job_get = parse_request_path_parameters_as(_StorageAsyncJobId, request) + await async_jobs.delete( + rabbitmq_rpc_client=rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=async_job_get.task_id, + job_id_data=AsyncJobNameData( + user_id=_req_ctx.user_id, product_name=_req_ctx.product_name + ), + ) + return web.Response(status=status.HTTP_204_NO_CONTENT) + + +@routes.post( + _task_prefix + "/{task_id}:abort", name="abort_async_job", ) @login_required @@ -147,7 +171,7 @@ async def abort_async_job(request: web.Request) -> web.Response: rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) async_job_get = parse_request_path_parameters_as(_StorageAsyncJobId, request) - await async_jobs.cancel( + await async_jobs.abort( rabbitmq_rpc_client=rabbitmq_rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, job_id=async_job_get.task_id, diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index 73b6c3c086a..8c08e23aa77 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -550,7 +550,40 @@ async def test_abort_async_jobs( _job_id = AsyncJobId(faker.uuid4()) create_storage_rpc_client_mock( "simcore_service_webserver.tasks._rest", - f"async_jobs.{async_jobs.cancel.__name__}", + f"async_jobs.{async_jobs.abort.__name__}", + backend_result_or_exception, + ) + + response = await client.post(f"/{API_VERSION}/tasks/{_job_id}:abort") + assert response.status == expected_status + + +@pytest.mark.parametrize("user_role", _user_roles) +@pytest.mark.parametrize( + "backend_result_or_exception, expected_status", + [ + ( + AsyncJobAbort(result=True, job_id=AsyncJobId(_faker.uuid4())), + status.HTTP_204_NO_CONTENT, + ), + (JobSchedulerError(exc=_faker.text()), status.HTTP_500_INTERNAL_SERVER_ERROR), + (JobMissingError(job_id=_faker.uuid4()), status.HTTP_404_NOT_FOUND), + ], + ids=lambda x: type(x).__name__, +) +async def test_delete_async_jobs( + user_role: UserRole, + logged_user: UserInfoDict, + client: TestClient, + create_storage_rpc_client_mock: Callable[[str, str, Any], None], + faker: Faker, + backend_result_or_exception: Any, + expected_status: int, +): + _job_id = AsyncJobId(faker.uuid4()) + create_storage_rpc_client_mock( + "simcore_service_webserver.tasks._rest", + f"async_jobs.{async_jobs.delete.__name__}", backend_result_or_exception, ) @@ -647,7 +680,7 @@ async def test_get_user_async_jobs( ( "DELETE", "abort_href", - async_jobs.cancel.__name__, + async_jobs.abort.__name__, AsyncJobAbort(result=True, job_id=AsyncJobId(_faker.uuid4())), status.HTTP_204_NO_CONTENT, None, From b965df18e44549df533cec1e56118703d1c8f10c Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 14:32:00 +0200 Subject: [PATCH 03/29] add task deletion --- .../api/rpc/_async_jobs.py | 15 ++++++++++++++- .../modules/celery/client.py | 15 ++++++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py index 080d5edf045..c5bd6614aab 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py @@ -32,7 +32,7 @@ @router.expose(reraise_if_error_type=(JobSchedulerError,)) -async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData): +async def abort(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData): assert app # nosec assert job_id_data # nosec try: @@ -44,6 +44,19 @@ async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData raise JobSchedulerError(exc=f"{exc}") from exc +@router.expose(reraise_if_error_type=(JobSchedulerError,)) +async def deletet(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData): + assert app # nosec + assert job_id_data # nosec + try: + await get_celery_client(app).delete_task( + task_context=job_id_data.model_dump(), + task_uuid=job_id, + ) + except CeleryError as exc: + raise JobSchedulerError(exc=f"{exc}") from exc + + @router.expose(reraise_if_error_type=(JobSchedulerError,)) async def status( app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData diff --git a/services/storage/src/simcore_service_storage/modules/celery/client.py b/services/storage/src/simcore_service_storage/modules/celery/client.py index 1cec33c907a..73c7a804cd6 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/client.py +++ b/services/storage/src/simcore_service_storage/modules/celery/client.py @@ -78,10 +78,20 @@ async def abort_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> No with log_context( _logger, logging.DEBUG, - msg=f"Abort task: {task_context=} {task_uuid=}", + msg=f"task abortion: {task_context=} {task_uuid=}", ): await self._abort_task(task_context, task_uuid) + async def delete_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None: + with log_context( + _logger, + logging.DEBUG, + msg=f"task deletion: {task_context=} {task_uuid=}", + ): + task_id = build_task_id(task_context, task_uuid) + await self._task_info_store.remove_task(task_id) + await self._forget_task(task_id) + @make_async() def _forget_task(self, task_id: TaskID) -> None: AbortableAsyncResult(task_id, app=self._celery_app).forget() @@ -100,8 +110,7 @@ async def get_task_result( if async_result.ready(): task_metadata = await self._task_info_store.get_task_metadata(task_id) if task_metadata is not None and task_metadata.ephemeral: - await self._task_info_store.remove_task(task_id) - await self._forget_task(task_id) + await self.delete_task(task_context, task_uuid) return result async def _get_task_progress_report( From 79126a8c12ff816975186826a296afc3bb1e4029 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 15:23:12 +0200 Subject: [PATCH 04/29] fix api specs --- api/specs/web-server/_long_running_tasks.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/api/specs/web-server/_long_running_tasks.py b/api/specs/web-server/_long_running_tasks.py index 884c81708da..9b8b3e8127b 100644 --- a/api/specs/web-server/_long_running_tasks.py +++ b/api/specs/web-server/_long_running_tasks.py @@ -53,8 +53,20 @@ def get_async_job_status( @router.delete( "/tasks/{task_id}", - name="cancel_and_delete_task", - description="Cancels and deletes a task", + name="delete_task", + description="Deletes a task", + responses=_export_data_responses, + status_code=status.HTTP_204_NO_CONTENT, +) +def selete_async_job( + _path_params: Annotated[_PathParam, Depends()], +): ... + + +@router.post( + "/tasks/{task_id}:abort", + name="abort_task", + description="Aborts a task", responses=_export_data_responses, status_code=status.HTTP_204_NO_CONTENT, ) From ae5581adb0410f0de229d5a8338da2c48bfef2f2 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 15:35:39 +0200 Subject: [PATCH 05/29] update openapi-spec --- .../api/v0/openapi.yaml | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 73a7abf032c..c4c19009212 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -3127,8 +3127,49 @@ paths: delete: tags: - long-running-tasks - summary: Cancel And Delete Task - description: Cancels and deletes a task + summary: Delete Task + description: Deletes a task + operationId: selete_async_job + parameters: + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + responses: + '204': + description: Successful Response + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Not Found + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Forbidden + '410': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Gone + '500': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Internal Server Error + /v0/tasks/{task_id}:abort: + post: + tags: + - long-running-tasks + summary: Abort Task + description: Aborts a task operationId: abort_async_job parameters: - name: task_id From 3357e590e7e1cba6614e07847a0a4314391c1ec9 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 16:22:25 +0200 Subject: [PATCH 06/29] update method --- .../src/servicelib/fastapi/long_running_tasks/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py index c82bde0fe4e..bf1dfaef2f1 100644 --- a/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py @@ -106,7 +106,7 @@ async def _task_result(session: httpx.AsyncClient, result_url: URL) -> Any: @retry(**_DEFAULT_FASTAPI_RETRY_POLICY) async def _abort_task(session: httpx.AsyncClient, abort_url: URL) -> None: - response = await session.delete(f"{abort_url}") + response = await session.post(f"{abort_url}") response.raise_for_status() From 49b7f0d41c3548393d4f3de731ed9655ff14fa78 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 16:29:03 +0200 Subject: [PATCH 07/29] update method --- .../web/server/tests/unit/with_dbs/01/storage/test_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index 8c08e23aa77..3033767711a 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -678,7 +678,7 @@ async def test_get_user_async_jobs( TaskStatus, ), ( - "DELETE", + "POST", "abort_href", async_jobs.abort.__name__, AsyncJobAbort(result=True, job_id=AsyncJobId(_faker.uuid4())), From 36f7c51b6b036ec78f84a3ec77a2889339d0395a Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 16:29:58 +0200 Subject: [PATCH 08/29] fix name --- api/specs/web-server/_long_running_tasks.py | 2 +- .../server/src/simcore_service_webserver/api/v0/openapi.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/api/specs/web-server/_long_running_tasks.py b/api/specs/web-server/_long_running_tasks.py index 9b8b3e8127b..9afba00bd00 100644 --- a/api/specs/web-server/_long_running_tasks.py +++ b/api/specs/web-server/_long_running_tasks.py @@ -58,7 +58,7 @@ def get_async_job_status( responses=_export_data_responses, status_code=status.HTTP_204_NO_CONTENT, ) -def selete_async_job( +def delete_async_job( _path_params: Annotated[_PathParam, Depends()], ): ... diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index c4c19009212..35b7277ecc4 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -3129,7 +3129,7 @@ paths: - long-running-tasks summary: Delete Task description: Deletes a task - operationId: selete_async_job + operationId: delete_async_job parameters: - name: task_id in: path From 11b44b342ffc5ccdb9d2ae4ff84867fe6186c59f Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 16:36:44 +0200 Subject: [PATCH 09/29] revert --- .../src/servicelib/fastapi/long_running_tasks/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py index bf1dfaef2f1..c82bde0fe4e 100644 --- a/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py @@ -106,7 +106,7 @@ async def _task_result(session: httpx.AsyncClient, result_url: URL) -> Any: @retry(**_DEFAULT_FASTAPI_RETRY_POLICY) async def _abort_task(session: httpx.AsyncClient, abort_url: URL) -> None: - response = await session.post(f"{abort_url}") + response = await session.delete(f"{abort_url}") response.raise_for_status() From bd3149fb148e4c4545405e06f423fdad73eef943 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 21:47:54 +0200 Subject: [PATCH 10/29] rename --- api/specs/web-server/_long_running_tasks.py | 6 +++--- .../rabbitmq/rpc_interfaces/async_jobs/async_jobs.py | 4 ++-- .../src/simcore_service_storage/api/rpc/_async_jobs.py | 2 +- services/storage/tests/unit/test_async_jobs.py | 2 +- .../server/src/simcore_service_webserver/tasks/_rest.py | 8 ++++---- .../server/tests/unit/with_dbs/01/storage/test_storage.py | 4 ++-- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/api/specs/web-server/_long_running_tasks.py b/api/specs/web-server/_long_running_tasks.py index 9afba00bd00..b262c219727 100644 --- a/api/specs/web-server/_long_running_tasks.py +++ b/api/specs/web-server/_long_running_tasks.py @@ -64,9 +64,9 @@ def delete_async_job( @router.post( - "/tasks/{task_id}:abort", - name="abort_task", - description="Aborts a task", + "/tasks/{task_id}:cancel", + name="cancel_task", + description="Cancels a task", responses=_export_data_responses, status_code=status.HTTP_204_NO_CONTENT, ) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py index 1d750924731..d04e34e265a 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py @@ -36,7 +36,7 @@ _logger = logging.getLogger(__name__) -async def abort( +async def cancel( rabbitmq_rpc_client: RabbitMQRPCClient, *, rpc_namespace: RPCNamespace, @@ -45,7 +45,7 @@ async def abort( ) -> None: await rabbitmq_rpc_client.request( rpc_namespace, - TypeAdapter(RPCMethodName).validate_python("abort"), + TypeAdapter(RPCMethodName).validate_python("cancel"), job_id=job_id, job_id_data=job_id_data, timeout_s=_DEFAULT_TIMEOUT_S, diff --git a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py index c5bd6614aab..361a2ff937c 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py @@ -32,7 +32,7 @@ @router.expose(reraise_if_error_type=(JobSchedulerError,)) -async def abort(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData): +async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData): assert app # nosec assert job_id_data # nosec try: diff --git a/services/storage/tests/unit/test_async_jobs.py b/services/storage/tests/unit/test_async_jobs.py index 351424d6396..95319a6533f 100644 --- a/services/storage/tests/unit/test_async_jobs.py +++ b/services/storage/tests/unit/test_async_jobs.py @@ -264,7 +264,7 @@ async def test_async_jobs_cancel( payload=60 * 10, # test hangs if not cancelled properly ) - await async_jobs.abort( + await async_jobs.cancel( storage_rabbitmq_rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, job_id=async_job_get.job_id, diff --git a/services/web/server/src/simcore_service_webserver/tasks/_rest.py b/services/web/server/src/simcore_service_webserver/tasks/_rest.py index 4dff5986f5a..e1c8cae25d0 100644 --- a/services/web/server/src/simcore_service_webserver/tasks/_rest.py +++ b/services/web/server/src/simcore_service_webserver/tasks/_rest.py @@ -159,19 +159,19 @@ async def delete_async_job(request: web.Request) -> web.Response: @routes.post( - _task_prefix + "/{task_id}:abort", - name="abort_async_job", + _task_prefix + "/{task_id}:cancel", + name="cancel_async_job", ) @login_required @permission_required("storage.files.*") @handle_export_data_exceptions -async def abort_async_job(request: web.Request) -> web.Response: +async def cancel_async_job(request: web.Request) -> web.Response: _req_ctx = RequestContext.model_validate(request) rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) async_job_get = parse_request_path_parameters_as(_StorageAsyncJobId, request) - await async_jobs.abort( + await async_jobs.cancel( rabbitmq_rpc_client=rabbitmq_rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, job_id=async_job_get.task_id, diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index 3033767711a..780a47291a8 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -550,11 +550,11 @@ async def test_abort_async_jobs( _job_id = AsyncJobId(faker.uuid4()) create_storage_rpc_client_mock( "simcore_service_webserver.tasks._rest", - f"async_jobs.{async_jobs.abort.__name__}", + f"async_jobs.{async_jobs.cancel.__name__}", backend_result_or_exception, ) - response = await client.post(f"/{API_VERSION}/tasks/{_job_id}:abort") + response = await client.post(f"/{API_VERSION}/tasks/{_job_id}:cancel") assert response.status == expected_status From cfd9074d00f24ccb9279416ee244c503a0968a52 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 23:05:13 +0200 Subject: [PATCH 11/29] remove aborted task --- .../simcore_service_storage/modules/celery/client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/services/storage/src/simcore_service_storage/modules/celery/client.py b/services/storage/src/simcore_service_storage/modules/celery/client.py index 73c7a804cd6..c47c98f85b7 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/client.py +++ b/services/storage/src/simcore_service_storage/modules/celery/client.py @@ -69,10 +69,8 @@ async def submit_task( return task_uuid @make_async() - def _abort_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None: - AbortableAsyncResult( - build_task_id(task_context, task_uuid), app=self._celery_app - ).abort() + def _abort_task(self, task_id: TaskID) -> None: + AbortableAsyncResult(task_id, app=self._celery_app).abort() async def abort_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None: with log_context( @@ -80,7 +78,9 @@ async def abort_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> No logging.DEBUG, msg=f"task abortion: {task_context=} {task_uuid=}", ): - await self._abort_task(task_context, task_uuid) + task_id = build_task_id(task_context, task_uuid) + await self._abort_task(task_id) + await self._task_info_store.remove_task(task_id) async def delete_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None: with log_context( From 920ff30d0cabe646c7a6c0751f6627f7bba31b4b Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 23:40:57 +0200 Subject: [PATCH 12/29] update specs --- .../src/simcore_service_webserver/api/v0/openapi.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 35b7277ecc4..dd7bdf6bace 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -3164,12 +3164,12 @@ paths: schema: $ref: '#/components/schemas/EnvelopedError' description: Internal Server Error - /v0/tasks/{task_id}:abort: + /v0/tasks/{task_id}:cancel: post: tags: - long-running-tasks - summary: Abort Task - description: Aborts a task + summary: Cancel Task + description: Cancels a task operationId: abort_async_job parameters: - name: task_id From 3b320dbeaf755cc396eb8dddcd8f6759413eab5d Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 23:46:36 +0200 Subject: [PATCH 13/29] rename --- .../web/server/tests/unit/with_dbs/01/storage/test_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index 780a47291a8..c71ef895f73 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -680,7 +680,7 @@ async def test_get_user_async_jobs( ( "POST", "abort_href", - async_jobs.abort.__name__, + async_jobs.cancel.__name__, AsyncJobAbort(result=True, job_id=AsyncJobId(_faker.uuid4())), status.HTTP_204_NO_CONTENT, None, From 6403a98a23034f93c130f1b494da8a6dc8c00d4a Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 23:47:23 +0200 Subject: [PATCH 14/29] rename --- .../storage/src/simcore_service_storage/api/rpc/_async_jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py index 361a2ff937c..3350fec8e7c 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py @@ -45,7 +45,7 @@ async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData @router.expose(reraise_if_error_type=(JobSchedulerError,)) -async def deletet(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData): +async def delete(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData): assert app # nosec assert job_id_data # nosec try: From 0bd2df394246689294f30f4caef05fc82dc039d0 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 23 Apr 2025 23:48:51 +0200 Subject: [PATCH 15/29] rename --- .../web/server/src/simcore_service_webserver/tasks/_rest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/tasks/_rest.py b/services/web/server/src/simcore_service_webserver/tasks/_rest.py index e1c8cae25d0..b67cb0cc053 100644 --- a/services/web/server/src/simcore_service_webserver/tasks/_rest.py +++ b/services/web/server/src/simcore_service_webserver/tasks/_rest.py @@ -160,12 +160,12 @@ async def delete_async_job(request: web.Request) -> web.Response: @routes.post( _task_prefix + "/{task_id}:cancel", - name="cancel_async_job", + name="abort_async_job", ) @login_required @permission_required("storage.files.*") @handle_export_data_exceptions -async def cancel_async_job(request: web.Request) -> web.Response: +async def abort_async_job(request: web.Request) -> web.Response: _req_ctx = RequestContext.model_validate(request) From 84a64fcd5c87d96d72225d3c10463f7d9ea555ba Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 09:24:38 +0200 Subject: [PATCH 16/29] rename --- .../rabbitmq/rpc_interfaces/async_jobs/async_jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py index d04e34e265a..9bfd75f273b 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py @@ -238,7 +238,7 @@ async def wait_and_get_result( ) except (TimeoutError, CancelledError) as error: try: - await abort( + await cancel( rabbitmq_rpc_client, rpc_namespace=rpc_namespace, job_id=job_id, @@ -270,7 +270,7 @@ async def submit_and_wait( except (TimeoutError, CancelledError) as error: if async_job_rpc_get is not None: try: - await abort( + await cancel( rabbitmq_rpc_client, rpc_namespace=rpc_namespace, job_id=async_job_rpc_get.job_id, From 507eecd591b520a5569727e7ffe45653315cf2e6 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 09:28:20 +0200 Subject: [PATCH 17/29] rename --- api/specs/web-server/_long_running_tasks.py | 2 +- .../src/simcore_service_webserver/api/v0/openapi.yaml | 2 +- .../server/src/simcore_service_webserver/storage/_rest.py | 4 ++-- .../web/server/src/simcore_service_webserver/tasks/_rest.py | 6 +++--- .../server/tests/unit/with_dbs/01/storage/test_storage.py | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/api/specs/web-server/_long_running_tasks.py b/api/specs/web-server/_long_running_tasks.py index b262c219727..64d024a17cb 100644 --- a/api/specs/web-server/_long_running_tasks.py +++ b/api/specs/web-server/_long_running_tasks.py @@ -70,7 +70,7 @@ def delete_async_job( responses=_export_data_responses, status_code=status.HTTP_204_NO_CONTENT, ) -def abort_async_job( +def cancel_async_job( _path_params: Annotated[_PathParam, Depends()], ): ... diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index dd7bdf6bace..8643f5523b8 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -3170,7 +3170,7 @@ paths: - long-running-tasks summary: Cancel Task description: Cancels a task - operationId: abort_async_job + operationId: cancel_async_job parameters: - name: task_id in: path diff --git a/services/web/server/src/simcore_service_webserver/storage/_rest.py b/services/web/server/src/simcore_service_webserver/storage/_rest.py index 9fa80f4bc4d..b8a1f18a398 100644 --- a/services/web/server/src/simcore_service_webserver/storage/_rest.py +++ b/services/web/server/src/simcore_service_webserver/storage/_rest.py @@ -185,7 +185,7 @@ def _create_data_response_from_async_job( task_id=async_job_id, task_name=async_job_id, status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=async_job_id)))}", - abort_href=f"{request.url.with_path(str(request.app.router['abort_async_job'].url_for(task_id=async_job_id)))}", + abort_href=f"{request.url.with_path(str(request.app.router['cancel_async_job'].url_for(task_id=async_job_id)))}", result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=async_job_id)))}", ), status=status.HTTP_202_ACCEPTED, @@ -505,7 +505,7 @@ def allow_only_simcore(cls, v: int) -> int: task_id=_job_id, task_name=_job_id, status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=_job_id)))}", - abort_href=f"{request.url.with_path(str(request.app.router['abort_async_job'].url_for(task_id=_job_id)))}", + abort_href=f"{request.url.with_path(str(request.app.router['cancel_async_job'].url_for(task_id=_job_id)))}", result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=_job_id)))}", ), status=status.HTTP_202_ACCEPTED, diff --git a/services/web/server/src/simcore_service_webserver/tasks/_rest.py b/services/web/server/src/simcore_service_webserver/tasks/_rest.py index b67cb0cc053..520d92732ed 100644 --- a/services/web/server/src/simcore_service_webserver/tasks/_rest.py +++ b/services/web/server/src/simcore_service_webserver/tasks/_rest.py @@ -87,7 +87,7 @@ async def get_async_jobs(request: web.Request) -> web.Response: task_id=f"{job.job_id}", task_name=job.job_name, status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=str(job.job_id))))}", - abort_href=f"{request.url.with_path(str(request.app.router['abort_async_job'].url_for(task_id=str(job.job_id))))}", + abort_href=f"{request.url.with_path(str(request.app.router['cancel_async_job'].url_for(task_id=str(job.job_id))))}", result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=str(job.job_id))))}", ) for job in user_async_jobs @@ -160,12 +160,12 @@ async def delete_async_job(request: web.Request) -> web.Response: @routes.post( _task_prefix + "/{task_id}:cancel", - name="abort_async_job", + name="cancel_async_job", ) @login_required @permission_required("storage.files.*") @handle_export_data_exceptions -async def abort_async_job(request: web.Request) -> web.Response: +async def cancel_async_job(request: web.Request) -> web.Response: _req_ctx = RequestContext.model_validate(request) diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index c71ef895f73..d19b59365d8 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -538,7 +538,7 @@ async def test_get_async_jobs_status( ], ids=lambda x: type(x).__name__, ) -async def test_abort_async_jobs( +async def test_cancel_async_jobs( user_role: UserRole, logged_user: UserInfoDict, client: TestClient, From 9b7b15ce347aeda0fbab85d6041df68742c7e90b Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 09:31:14 +0200 Subject: [PATCH 18/29] update description --- api/specs/web-server/_long_running_tasks.py | 2 +- .../server/src/simcore_service_webserver/api/v0/openapi.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/api/specs/web-server/_long_running_tasks.py b/api/specs/web-server/_long_running_tasks.py index 64d024a17cb..63d53b5598c 100644 --- a/api/specs/web-server/_long_running_tasks.py +++ b/api/specs/web-server/_long_running_tasks.py @@ -66,7 +66,7 @@ def delete_async_job( @router.post( "/tasks/{task_id}:cancel", name="cancel_task", - description="Cancels a task", + description="Cancels (aborts) a task", responses=_export_data_responses, status_code=status.HTTP_204_NO_CONTENT, ) diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 8643f5523b8..7da8e2403a3 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -3169,7 +3169,7 @@ paths: tags: - long-running-tasks summary: Cancel Task - description: Cancels a task + description: Cancels (aborts) a task operationId: cancel_async_job parameters: - name: task_id From c30c8d846a227910e2eac52f0d71884687367517 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 11:05:10 +0200 Subject: [PATCH 19/29] continue --- .github/CODEOWNERS | 1 + api/specs/web-server/_long_running_tasks.py | 18 +++--------------- .../modules/celery/client.py | 1 - .../simcore_service_webserver/tasks/_rest.py | 14 ++++++++++++-- 4 files changed, 16 insertions(+), 18 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 8705bf17e2e..ac15d41fcff 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -40,6 +40,7 @@ Makefile @pcrespov @sanderegg /services/static-webserver/ @GitHK /services/static-webserver/client/ @odeimaiz /services/storage/ @sanderegg +/services/storage/modules/celery @giancarloromeo /services/web/server/ @pcrespov @sanderegg @GitHK @matusdrobuliak66 /tests/e2e-frontend/ @odeimaiz /tests/e2e-playwright/ @matusdrobuliak66 diff --git a/api/specs/web-server/_long_running_tasks.py b/api/specs/web-server/_long_running_tasks.py index 63d53b5598c..620d8457f75 100644 --- a/api/specs/web-server/_long_running_tasks.py +++ b/api/specs/web-server/_long_running_tasks.py @@ -53,24 +53,12 @@ def get_async_job_status( @router.delete( "/tasks/{task_id}", - name="delete_task", - description="Deletes a task", + name="cancel_and_delete_task", + description="Cancels and deletes a task", responses=_export_data_responses, status_code=status.HTTP_204_NO_CONTENT, ) -def delete_async_job( - _path_params: Annotated[_PathParam, Depends()], -): ... - - -@router.post( - "/tasks/{task_id}:cancel", - name="cancel_task", - description="Cancels (aborts) a task", - responses=_export_data_responses, - status_code=status.HTTP_204_NO_CONTENT, -) -def cancel_async_job( +def cancel_and_delete_async_job( _path_params: Annotated[_PathParam, Depends()], ): ... diff --git a/services/storage/src/simcore_service_storage/modules/celery/client.py b/services/storage/src/simcore_service_storage/modules/celery/client.py index c47c98f85b7..8042cdcfe49 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/client.py +++ b/services/storage/src/simcore_service_storage/modules/celery/client.py @@ -80,7 +80,6 @@ async def abort_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> No ): task_id = build_task_id(task_context, task_uuid) await self._abort_task(task_id) - await self._task_info_store.remove_task(task_id) async def delete_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None: with log_context( diff --git a/services/web/server/src/simcore_service_webserver/tasks/_rest.py b/services/web/server/src/simcore_service_webserver/tasks/_rest.py index 520d92732ed..97e29792339 100644 --- a/services/web/server/src/simcore_service_webserver/tasks/_rest.py +++ b/services/web/server/src/simcore_service_webserver/tasks/_rest.py @@ -136,17 +136,27 @@ async def get_async_job_status(request: web.Request) -> web.Response: @routes.delete( _task_prefix + "/{task_id}", - name="delete_async_job", + name="cancel_and_delete_async_job", ) @login_required @permission_required("storage.files.*") @handle_export_data_exceptions -async def delete_async_job(request: web.Request) -> web.Response: +async def cancel_and_delete_async_job(request: web.Request) -> web.Response: _req_ctx = RequestContext.model_validate(request) rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) async_job_get = parse_request_path_parameters_as(_StorageAsyncJobId, request) + + await async_jobs.cancel( + rabbitmq_rpc_client=rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=async_job_get.task_id, + job_id_data=AsyncJobNameData( + user_id=_req_ctx.user_id, product_name=_req_ctx.product_name + ), + ) + await async_jobs.delete( rabbitmq_rpc_client=rabbitmq_rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, From 9969b7d719057d40e9b10141fc03a359edb8bcee Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 11:14:52 +0200 Subject: [PATCH 20/29] update test --- .../unit/with_dbs/01/storage/test_storage.py | 30 +------------------ 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index d19b59365d8..d07ab817187 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -536,9 +536,8 @@ async def test_get_async_jobs_status( (JobSchedulerError(exc=_faker.text()), status.HTTP_500_INTERNAL_SERVER_ERROR), (JobMissingError(job_id=_faker.uuid4()), status.HTTP_404_NOT_FOUND), ], - ids=lambda x: type(x).__name__, ) -async def test_cancel_async_jobs( +async def test_cancel_and_delete_async_jobs( user_role: UserRole, logged_user: UserInfoDict, client: TestClient, @@ -554,33 +553,6 @@ async def test_cancel_async_jobs( backend_result_or_exception, ) - response = await client.post(f"/{API_VERSION}/tasks/{_job_id}:cancel") - assert response.status == expected_status - - -@pytest.mark.parametrize("user_role", _user_roles) -@pytest.mark.parametrize( - "backend_result_or_exception, expected_status", - [ - ( - AsyncJobAbort(result=True, job_id=AsyncJobId(_faker.uuid4())), - status.HTTP_204_NO_CONTENT, - ), - (JobSchedulerError(exc=_faker.text()), status.HTTP_500_INTERNAL_SERVER_ERROR), - (JobMissingError(job_id=_faker.uuid4()), status.HTTP_404_NOT_FOUND), - ], - ids=lambda x: type(x).__name__, -) -async def test_delete_async_jobs( - user_role: UserRole, - logged_user: UserInfoDict, - client: TestClient, - create_storage_rpc_client_mock: Callable[[str, str, Any], None], - faker: Faker, - backend_result_or_exception: Any, - expected_status: int, -): - _job_id = AsyncJobId(faker.uuid4()) create_storage_rpc_client_mock( "simcore_service_webserver.tasks._rest", f"async_jobs.{async_jobs.delete.__name__}", From 004b02adb52e857e6aea2428eb909ac963bc93e0 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 11:24:11 +0200 Subject: [PATCH 21/29] rename --- .../api/v0/openapi.yaml | 47 ++----------------- .../storage/_rest.py | 2 +- .../simcore_service_webserver/tasks/_rest.py | 2 +- .../unit/with_dbs/01/storage/test_storage.py | 2 +- 4 files changed, 6 insertions(+), 47 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 7da8e2403a3..f5c66fb95a5 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -3127,50 +3127,9 @@ paths: delete: tags: - long-running-tasks - summary: Delete Task - description: Deletes a task - operationId: delete_async_job - parameters: - - name: task_id - in: path - required: true - schema: - type: string - title: Task Id - responses: - '204': - description: Successful Response - '404': - content: - application/json: - schema: - $ref: '#/components/schemas/EnvelopedError' - description: Not Found - '403': - content: - application/json: - schema: - $ref: '#/components/schemas/EnvelopedError' - description: Forbidden - '410': - content: - application/json: - schema: - $ref: '#/components/schemas/EnvelopedError' - description: Gone - '500': - content: - application/json: - schema: - $ref: '#/components/schemas/EnvelopedError' - description: Internal Server Error - /v0/tasks/{task_id}:cancel: - post: - tags: - - long-running-tasks - summary: Cancel Task - description: Cancels (aborts) a task - operationId: cancel_async_job + summary: Cancel And Delete Task + description: Cancels and deletes a task + operationId: cancel_and_delete_async_job parameters: - name: task_id in: path diff --git a/services/web/server/src/simcore_service_webserver/storage/_rest.py b/services/web/server/src/simcore_service_webserver/storage/_rest.py index b8a1f18a398..72c9bcf6f60 100644 --- a/services/web/server/src/simcore_service_webserver/storage/_rest.py +++ b/services/web/server/src/simcore_service_webserver/storage/_rest.py @@ -505,7 +505,7 @@ def allow_only_simcore(cls, v: int) -> int: task_id=_job_id, task_name=_job_id, status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=_job_id)))}", - abort_href=f"{request.url.with_path(str(request.app.router['cancel_async_job'].url_for(task_id=_job_id)))}", + abort_href=f"{request.url.with_path(str(request.app.router['cancel_and_delete_async_job'].url_for(task_id=_job_id)))}", result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=_job_id)))}", ), status=status.HTTP_202_ACCEPTED, diff --git a/services/web/server/src/simcore_service_webserver/tasks/_rest.py b/services/web/server/src/simcore_service_webserver/tasks/_rest.py index 97e29792339..ddb97a37629 100644 --- a/services/web/server/src/simcore_service_webserver/tasks/_rest.py +++ b/services/web/server/src/simcore_service_webserver/tasks/_rest.py @@ -87,7 +87,7 @@ async def get_async_jobs(request: web.Request) -> web.Response: task_id=f"{job.job_id}", task_name=job.job_name, status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=str(job.job_id))))}", - abort_href=f"{request.url.with_path(str(request.app.router['cancel_async_job'].url_for(task_id=str(job.job_id))))}", + abort_href=f"{request.url.with_path(str(request.app.router['cancel_and_delete_async_job'].url_for(task_id=str(job.job_id))))}", result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=str(job.job_id))))}", ) for job in user_async_jobs diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index d07ab817187..5f5ad25e0ff 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -650,7 +650,7 @@ async def test_get_user_async_jobs( TaskStatus, ), ( - "POST", + "DELETE", "abort_href", async_jobs.cancel.__name__, AsyncJobAbort(result=True, job_id=AsyncJobId(_faker.uuid4())), From 4001b9258c704b618e575160220d006677f3b005 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 11:25:54 +0200 Subject: [PATCH 22/29] update name --- .../web/server/src/simcore_service_webserver/storage/_rest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/web/server/src/simcore_service_webserver/storage/_rest.py b/services/web/server/src/simcore_service_webserver/storage/_rest.py index 72c9bcf6f60..15182d327f2 100644 --- a/services/web/server/src/simcore_service_webserver/storage/_rest.py +++ b/services/web/server/src/simcore_service_webserver/storage/_rest.py @@ -185,7 +185,7 @@ def _create_data_response_from_async_job( task_id=async_job_id, task_name=async_job_id, status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=async_job_id)))}", - abort_href=f"{request.url.with_path(str(request.app.router['cancel_async_job'].url_for(task_id=async_job_id)))}", + abort_href=f"{request.url.with_path(str(request.app.router['cancel_and_delete_async_job'].url_for(task_id=async_job_id)))}", result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=async_job_id)))}", ), status=status.HTTP_202_ACCEPTED, From b0fa11bd4a10305ef6b365365e9a72829a7c7dfb Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 11:55:59 +0200 Subject: [PATCH 23/29] move forget --- .../storage/src/simcore_service_storage/modules/celery/_task.py | 1 + .../storage/src/simcore_service_storage/modules/celery/client.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/services/storage/src/simcore_service_storage/modules/celery/_task.py b/services/storage/src/simcore_service_storage/modules/celery/_task.py index a6f7c1a365e..e367a3a73da 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/_task.py +++ b/services/storage/src/simcore_service_storage/modules/celery/_task.py @@ -67,6 +67,7 @@ async def abort_monitor(): main_task, max_delay=_DEFAULT_CANCEL_TASK_TIMEOUT.total_seconds(), ) + AbortableAsyncResult(task_id, app=app).forget() raise TaskAbortedError await asyncio.sleep( _DEFAULT_ABORT_TASK_TIMEOUT.total_seconds() diff --git a/services/storage/src/simcore_service_storage/modules/celery/client.py b/services/storage/src/simcore_service_storage/modules/celery/client.py index 8042cdcfe49..089af5fe379 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/client.py +++ b/services/storage/src/simcore_service_storage/modules/celery/client.py @@ -89,7 +89,6 @@ async def delete_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> N ): task_id = build_task_id(task_context, task_uuid) await self._task_info_store.remove_task(task_id) - await self._forget_task(task_id) @make_async() def _forget_task(self, task_id: TaskID) -> None: From f4aed48e1e339914c6f36d70719d21fe68116cc7 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 12:48:48 +0200 Subject: [PATCH 24/29] simplify --- api/specs/web-server/_long_running_tasks.py | 6 ++-- .../rpc_interfaces/async_jobs/async_jobs.py | 16 --------- .../api/rpc/_async_jobs.py | 15 +------- .../modules/celery/client.py | 18 ++++------ .../storage/tests/unit/test_modules_celery.py | 2 +- .../simcore_service_webserver/tasks/_rest.py | 36 ++----------------- .../unit/with_dbs/01/storage/test_storage.py | 8 +---- 7 files changed, 14 insertions(+), 87 deletions(-) diff --git a/api/specs/web-server/_long_running_tasks.py b/api/specs/web-server/_long_running_tasks.py index 620d8457f75..f4b7512c8aa 100644 --- a/api/specs/web-server/_long_running_tasks.py +++ b/api/specs/web-server/_long_running_tasks.py @@ -53,12 +53,12 @@ def get_async_job_status( @router.delete( "/tasks/{task_id}", - name="cancel_and_delete_task", - description="Cancels and deletes a task", + name="cancel_task", + description="Cancels a task", responses=_export_data_responses, status_code=status.HTTP_204_NO_CONTENT, ) -def cancel_and_delete_async_job( +def cancel_async_job( _path_params: Annotated[_PathParam, Depends()], ): ... diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py index 9bfd75f273b..f6e1954c936 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py @@ -52,22 +52,6 @@ async def cancel( ) -async def delete( - rabbitmq_rpc_client: RabbitMQRPCClient, - *, - rpc_namespace: RPCNamespace, - job_id: AsyncJobId, - job_id_data: AsyncJobNameData, -) -> None: - await rabbitmq_rpc_client.request( - rpc_namespace, - TypeAdapter(RPCMethodName).validate_python("delete"), - job_id=job_id, - job_id_data=job_id_data, - timeout_s=_DEFAULT_TIMEOUT_S, - ) - - async def status( rabbitmq_rpc_client: RabbitMQRPCClient, *, diff --git a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py index 3350fec8e7c..3186237eb7e 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py @@ -36,20 +36,7 @@ async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData assert app # nosec assert job_id_data # nosec try: - await get_celery_client(app).abort_task( - task_context=job_id_data.model_dump(), - task_uuid=job_id, - ) - except CeleryError as exc: - raise JobSchedulerError(exc=f"{exc}") from exc - - -@router.expose(reraise_if_error_type=(JobSchedulerError,)) -async def delete(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData): - assert app # nosec - assert job_id_data # nosec - try: - await get_celery_client(app).delete_task( + await get_celery_client(app).cancel_task( task_context=job_id_data.model_dump(), task_uuid=job_id, ) diff --git a/services/storage/src/simcore_service_storage/modules/celery/client.py b/services/storage/src/simcore_service_storage/modules/celery/client.py index 089af5fe379..8d0441ba043 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/client.py +++ b/services/storage/src/simcore_service_storage/modules/celery/client.py @@ -72,22 +72,15 @@ async def submit_task( def _abort_task(self, task_id: TaskID) -> None: AbortableAsyncResult(task_id, app=self._celery_app).abort() - async def abort_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None: + async def cancel_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None: with log_context( _logger, logging.DEBUG, - msg=f"task abortion: {task_context=} {task_uuid=}", - ): - task_id = build_task_id(task_context, task_uuid) - await self._abort_task(task_id) - - async def delete_task(self, task_context: TaskContext, task_uuid: TaskUUID) -> None: - with log_context( - _logger, - logging.DEBUG, - msg=f"task deletion: {task_context=} {task_uuid=}", + msg=f"task cancellation: {task_context=} {task_uuid=}", ): task_id = build_task_id(task_context, task_uuid) + if not (await self.get_task_status(task_context, task_uuid)).is_done: + await self._abort_task(task_id) await self._task_info_store.remove_task(task_id) @make_async() @@ -108,7 +101,8 @@ async def get_task_result( if async_result.ready(): task_metadata = await self._task_info_store.get_task_metadata(task_id) if task_metadata is not None and task_metadata.ephemeral: - await self.delete_task(task_context, task_uuid) + await self._forget_task(task_id) + await self._task_info_store.remove_task(task_id) return result async def _get_task_progress_report( diff --git a/services/storage/tests/unit/test_modules_celery.py b/services/storage/tests/unit/test_modules_celery.py index d5f3ce70b98..0a09b282950 100644 --- a/services/storage/tests/unit/test_modules_celery.py +++ b/services/storage/tests/unit/test_modules_celery.py @@ -178,7 +178,7 @@ async def test_aborting_task_results_with_aborted_state( task_context=task_context, ) - await celery_client.abort_task(task_context, task_uuid) + await celery_client.cancel_task(task_context, task_uuid) for attempt in Retrying( retry=retry_if_exception_type(AssertionError), diff --git a/services/web/server/src/simcore_service_webserver/tasks/_rest.py b/services/web/server/src/simcore_service_webserver/tasks/_rest.py index ddb97a37629..23da63cedbb 100644 --- a/services/web/server/src/simcore_service_webserver/tasks/_rest.py +++ b/services/web/server/src/simcore_service_webserver/tasks/_rest.py @@ -136,40 +136,6 @@ async def get_async_job_status(request: web.Request) -> web.Response: @routes.delete( _task_prefix + "/{task_id}", - name="cancel_and_delete_async_job", -) -@login_required -@permission_required("storage.files.*") -@handle_export_data_exceptions -async def cancel_and_delete_async_job(request: web.Request) -> web.Response: - - _req_ctx = RequestContext.model_validate(request) - - rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) - async_job_get = parse_request_path_parameters_as(_StorageAsyncJobId, request) - - await async_jobs.cancel( - rabbitmq_rpc_client=rabbitmq_rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id=async_job_get.task_id, - job_id_data=AsyncJobNameData( - user_id=_req_ctx.user_id, product_name=_req_ctx.product_name - ), - ) - - await async_jobs.delete( - rabbitmq_rpc_client=rabbitmq_rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id=async_job_get.task_id, - job_id_data=AsyncJobNameData( - user_id=_req_ctx.user_id, product_name=_req_ctx.product_name - ), - ) - return web.Response(status=status.HTTP_204_NO_CONTENT) - - -@routes.post( - _task_prefix + "/{task_id}:cancel", name="cancel_async_job", ) @login_required @@ -181,6 +147,7 @@ async def cancel_async_job(request: web.Request) -> web.Response: rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) async_job_get = parse_request_path_parameters_as(_StorageAsyncJobId, request) + await async_jobs.cancel( rabbitmq_rpc_client=rabbitmq_rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, @@ -189,6 +156,7 @@ async def cancel_async_job(request: web.Request) -> web.Response: user_id=_req_ctx.user_id, product_name=_req_ctx.product_name ), ) + return web.Response(status=status.HTTP_204_NO_CONTENT) diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index 5f5ad25e0ff..b1dfdd4479b 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -537,7 +537,7 @@ async def test_get_async_jobs_status( (JobMissingError(job_id=_faker.uuid4()), status.HTTP_404_NOT_FOUND), ], ) -async def test_cancel_and_delete_async_jobs( +async def test_cancel_async_jobs( user_role: UserRole, logged_user: UserInfoDict, client: TestClient, @@ -553,12 +553,6 @@ async def test_cancel_and_delete_async_jobs( backend_result_or_exception, ) - create_storage_rpc_client_mock( - "simcore_service_webserver.tasks._rest", - f"async_jobs.{async_jobs.delete.__name__}", - backend_result_or_exception, - ) - response = await client.delete(f"/{API_VERSION}/tasks/{_job_id}") assert response.status == expected_status From c19271542100dbf4ab50f65ded6296c39e42ccda Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 12:51:24 +0200 Subject: [PATCH 25/29] continue --- api/specs/web-server/_long_running_tasks.py | 4 ++-- .../server/src/simcore_service_webserver/api/v0/openapi.yaml | 2 +- .../web/server/src/simcore_service_webserver/storage/_rest.py | 4 ++-- .../web/server/src/simcore_service_webserver/tasks/_rest.py | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/api/specs/web-server/_long_running_tasks.py b/api/specs/web-server/_long_running_tasks.py index f4b7512c8aa..f204c1de5b4 100644 --- a/api/specs/web-server/_long_running_tasks.py +++ b/api/specs/web-server/_long_running_tasks.py @@ -53,8 +53,8 @@ def get_async_job_status( @router.delete( "/tasks/{task_id}", - name="cancel_task", - description="Cancels a task", + name="cancel_and_delete_task", + description="Cancels and deletes a task", responses=_export_data_responses, status_code=status.HTTP_204_NO_CONTENT, ) diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index f5c66fb95a5..3681c1e66e1 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -3129,7 +3129,7 @@ paths: - long-running-tasks summary: Cancel And Delete Task description: Cancels and deletes a task - operationId: cancel_and_delete_async_job + operationId: cancel_async_job parameters: - name: task_id in: path diff --git a/services/web/server/src/simcore_service_webserver/storage/_rest.py b/services/web/server/src/simcore_service_webserver/storage/_rest.py index 15182d327f2..b8a1f18a398 100644 --- a/services/web/server/src/simcore_service_webserver/storage/_rest.py +++ b/services/web/server/src/simcore_service_webserver/storage/_rest.py @@ -185,7 +185,7 @@ def _create_data_response_from_async_job( task_id=async_job_id, task_name=async_job_id, status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=async_job_id)))}", - abort_href=f"{request.url.with_path(str(request.app.router['cancel_and_delete_async_job'].url_for(task_id=async_job_id)))}", + abort_href=f"{request.url.with_path(str(request.app.router['cancel_async_job'].url_for(task_id=async_job_id)))}", result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=async_job_id)))}", ), status=status.HTTP_202_ACCEPTED, @@ -505,7 +505,7 @@ def allow_only_simcore(cls, v: int) -> int: task_id=_job_id, task_name=_job_id, status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=_job_id)))}", - abort_href=f"{request.url.with_path(str(request.app.router['cancel_and_delete_async_job'].url_for(task_id=_job_id)))}", + abort_href=f"{request.url.with_path(str(request.app.router['cancel_async_job'].url_for(task_id=_job_id)))}", result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=_job_id)))}", ), status=status.HTTP_202_ACCEPTED, diff --git a/services/web/server/src/simcore_service_webserver/tasks/_rest.py b/services/web/server/src/simcore_service_webserver/tasks/_rest.py index 23da63cedbb..a4c95a6e1cc 100644 --- a/services/web/server/src/simcore_service_webserver/tasks/_rest.py +++ b/services/web/server/src/simcore_service_webserver/tasks/_rest.py @@ -87,7 +87,7 @@ async def get_async_jobs(request: web.Request) -> web.Response: task_id=f"{job.job_id}", task_name=job.job_name, status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=str(job.job_id))))}", - abort_href=f"{request.url.with_path(str(request.app.router['cancel_and_delete_async_job'].url_for(task_id=str(job.job_id))))}", + abort_href=f"{request.url.with_path(str(request.app.router['cancel_async_job'].url_for(task_id=str(job.job_id))))}", result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=str(job.job_id))))}", ) for job in user_async_jobs From c0310791e9e09be42f220a3d0683a97ede9697e3 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 13:11:52 +0200 Subject: [PATCH 26/29] fix test --- .../web/server/tests/unit/with_dbs/01/storage/test_storage.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index b1dfdd4479b..d7a0c1087b4 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -536,6 +536,7 @@ async def test_get_async_jobs_status( (JobSchedulerError(exc=_faker.text()), status.HTTP_500_INTERNAL_SERVER_ERROR), (JobMissingError(job_id=_faker.uuid4()), status.HTTP_404_NOT_FOUND), ], + ids=lambda x: type(x).__name__, ) async def test_cancel_async_jobs( user_role: UserRole, From 7fccacbec88eb7f82282ae9b32c35cf9cc3543fe Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 14:03:47 +0200 Subject: [PATCH 27/29] move build_task_id --- .../src/simcore_service_storage/modules/celery/client.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/services/storage/src/simcore_service_storage/modules/celery/client.py b/services/storage/src/simcore_service_storage/modules/celery/client.py index 8d0441ba043..f68baf558fe 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/client.py +++ b/services/storage/src/simcore_service_storage/modules/celery/client.py @@ -127,10 +127,7 @@ async def _get_task_progress_report( ) @make_async() - def _get_task_celery_state( - self, task_context: TaskContext, task_uuid: TaskUUID - ) -> TaskState: - task_id = build_task_id(task_context, task_uuid) + def _get_task_celery_state(self, task_id: TaskID) -> TaskState: return TaskState(self._celery_app.AsyncResult(task_id).state) async def get_task_status( @@ -141,7 +138,8 @@ async def get_task_status( logging.DEBUG, msg=f"Getting task status: {task_context=} {task_uuid=}", ): - task_state = await self._get_task_celery_state(task_context, task_uuid) + task_id = build_task_id(task_context, task_uuid) + task_state = await self._get_task_celery_state(task_id) return TaskStatus( task_uuid=task_uuid, task_state=task_state, From 75dc910ecd856cb76d9e479e13fe67ba5359e38f Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 15:07:55 +0200 Subject: [PATCH 28/29] test is gone --- services/storage/tests/unit/test_async_jobs.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/services/storage/tests/unit/test_async_jobs.py b/services/storage/tests/unit/test_async_jobs.py index 95319a6533f..36f29a15bd8 100644 --- a/services/storage/tests/unit/test_async_jobs.py +++ b/services/storage/tests/unit/test_async_jobs.py @@ -277,6 +277,14 @@ async def test_async_jobs_cancel( job_id_data=job_id_data, ) + jobs = await async_jobs.list_jobs( + storage_rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + filter_="", # currently not used + job_id_data=job_id_data, + ) + assert async_job_get.job_id not in [job.job_id for job in jobs] + with pytest.raises(JobAbortedError): await async_jobs.result( storage_rabbitmq_rpc_client, From 12a64d242d3ff10062b0b598f2cda70fa7844311 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 24 Apr 2025 15:14:39 +0200 Subject: [PATCH 29/29] more test --- services/storage/tests/unit/test_modules_celery.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/storage/tests/unit/test_modules_celery.py b/services/storage/tests/unit/test_modules_celery.py index 0a09b282950..b1819aabb44 100644 --- a/services/storage/tests/unit/test_modules_celery.py +++ b/services/storage/tests/unit/test_modules_celery.py @@ -166,7 +166,7 @@ async def test_submitting_task_with_failure_results_with_error( assert f"{raw_result}" == "Something strange happened: BOOM!" -async def test_aborting_task_results_with_aborted_state( +async def test_cancelling_a_running_task_aborts_and_deletes( celery_client: CeleryTaskClient, ): task_context = TaskContext(user_id=42) @@ -193,6 +193,8 @@ async def test_aborting_task_results_with_aborted_state( await celery_client.get_task_status(task_context, task_uuid) ).task_state == TaskState.ABORTED + assert task_uuid not in await celery_client.list_tasks(task_context) + async def test_listing_task_uuids_contains_submitted_task( celery_client: CeleryTaskClient,