diff --git a/api/specs/web-server/_storage.py b/api/specs/web-server/_storage.py index 56a175d7552..893a25169cb 100644 --- a/api/specs/web-server/_storage.py +++ b/api/specs/web-server/_storage.py @@ -22,6 +22,7 @@ PresignedLink, ) from models_library.api_schemas_webserver.storage import ( + BatchDeletePathsBodyParams, DataExportPost, ListPathsQueryParams, StorageLocationPathParams, @@ -80,6 +81,19 @@ async def compute_path_size(_path: Annotated[StoragePathComputeSizeParams, Depen """Compute the size of a path""" +@router.post( + "/storage/locations/{location_id}/-/paths:batchDelete", + response_model=Envelope[TaskGet], + status_code=status.HTTP_202_ACCEPTED, + description="Deletes Paths", +) +async def batch_delete_paths( + _path: Annotated[StorageLocationPathParams, Depends()], + _body: Annotated[BatchDeletePathsBodyParams, Depends()], +): + """deletes files/folders if user has the rights to""" + + @router.get( "/storage/locations/{location_id}/datasets", response_model=Envelope[list[DatasetMetaData]], diff --git a/packages/models-library/src/models_library/api_schemas_webserver/storage.py b/packages/models-library/src/models_library/api_schemas_webserver/storage.py index 3049bf4d0bd..a867a872312 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/storage.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/storage.py @@ -36,6 +36,10 @@ class ListPathsQueryParams(InputSchema, CursorQueryParameters): ] = DEFAULT_NUMBER_OF_PATHS_PER_PAGE +class BatchDeletePathsBodyParams(InputSchema): + paths: set[Path] + + class DataExportPost(InputSchema): paths: list[StorageFileID] diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py index 0d03e83a1f6..c1049bfc1bb 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py @@ -32,3 +32,23 @@ async def compute_path_size( path=path, ) return async_job_rpc_get, job_id_data + + +async def delete_paths( + client: RabbitMQRPCClient, + *, + user_id: UserID, + product_name: ProductName, + location_id: LocationID, + paths: set[Path], +) -> tuple[AsyncJobGet, AsyncJobNameData]: + job_id_data = AsyncJobNameData(user_id=user_id, product_name=product_name) + async_job_rpc_get = await submit( + rabbitmq_rpc_client=client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + method_name=RPCMethodName("delete_paths"), + job_id_data=job_id_data, + location_id=location_id, + paths=paths, + ) + return async_job_rpc_get, job_id_data diff --git a/services/storage/src/simcore_service_storage/api/_worker_tasks/_paths.py b/services/storage/src/simcore_service_storage/api/_worker_tasks/_paths.py index fae0bdc770c..2f3d05da547 100644 --- a/services/storage/src/simcore_service_storage/api/_worker_tasks/_paths.py +++ b/services/storage/src/simcore_service_storage/api/_worker_tasks/_paths.py @@ -2,11 +2,13 @@ from pathlib import Path from celery import Task # type: ignore[import-untyped] -from models_library.projects_nodes_io import LocationID +from models_library.projects_nodes_io import LocationID, StorageFileID from models_library.users import UserID -from pydantic import ByteSize +from pydantic import ByteSize, TypeAdapter from servicelib.logging_utils import log_context +from servicelib.utils import limited_gather +from ...constants import MAX_CONCURRENT_S3_TASKS from ...dsm import get_dsm_provider from ...modules.celery.models import TaskId from ...modules.celery.utils import get_fastapi_app @@ -25,3 +27,26 @@ async def compute_path_size( ): dsm = get_dsm_provider(get_fastapi_app(task.app)).get(location_id) return await dsm.compute_path_size(user_id, path=Path(path)) + + +async def delete_paths( + task: Task, + task_id: TaskId, + user_id: UserID, + location_id: LocationID, + paths: set[Path], +) -> None: + assert task_id # nosec + with log_context( + _logger, + logging.INFO, + msg=f"delete {paths=} in {location_id=} for {user_id=}", + ): + dsm = get_dsm_provider(get_fastapi_app(task.app)).get(location_id) + files_ids: set[StorageFileID] = { + TypeAdapter(StorageFileID).validate_python(f"{path}") for path in paths + } + await limited_gather( + *[dsm.delete_file(user_id, file_id) for file_id in files_ids], + limit=MAX_CONCURRENT_S3_TASKS, + ) diff --git a/services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py b/services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py index 3848e4114a4..bf3c4ad9990 100644 --- a/services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py +++ b/services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py @@ -7,7 +7,7 @@ from ...modules.celery._task import define_task from ...modules.celery.tasks import export_data from ._files import complete_upload_file -from ._paths import compute_path_size +from ._paths import compute_path_size, delete_paths from ._simcore_s3 import deep_copy_files_from_project _logger = logging.getLogger(__name__) @@ -22,5 +22,6 @@ def setup_worker_tasks(app: Celery) -> None: ): define_task(app, export_data) define_task(app, compute_path_size) + define_task(app, delete_paths) define_task(app, complete_upload_file) define_task(app, deep_copy_files_from_project) diff --git a/services/storage/src/simcore_service_storage/api/rpc/_paths.py b/services/storage/src/simcore_service_storage/api/rpc/_paths.py index e975191bc57..0390156dac4 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_paths.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_paths.py @@ -11,6 +11,7 @@ from ...modules.celery import get_celery_client from .._worker_tasks._paths import compute_path_size as remote_compute_path_size +from .._worker_tasks._paths import delete_paths as remote_delete_paths _logger = logging.getLogger(__name__) router = RPCRouter() @@ -20,7 +21,6 @@ async def compute_path_size( app: FastAPI, job_id_data: AsyncJobNameData, - # user_id: UserID, location_id: LocationID, path: Path, ) -> AsyncJobGet: @@ -33,3 +33,20 @@ async def compute_path_size( ) return AsyncJobGet(job_id=task_uuid) + + +@router.expose(reraise_if_error_type=None) +async def delete_paths( + app: FastAPI, + job_id_data: AsyncJobNameData, + location_id: LocationID, + paths: set[Path], +) -> AsyncJobGet: + task_uuid = await get_celery_client(app).send_task( + remote_delete_paths.__name__, + task_context=job_id_data.model_dump(), + user_id=job_id_data.user_id, + location_id=location_id, + paths=paths, + ) + return AsyncJobGet(job_id=task_uuid) diff --git a/services/storage/src/simcore_service_storage/modules/celery/_celery_types.py b/services/storage/src/simcore_service_storage/modules/celery/_celery_types.py index 7fb44e087d5..ca8bdf3de8f 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/_celery_types.py +++ b/services/storage/src/simcore_service_storage/modules/celery/_celery_types.py @@ -59,3 +59,4 @@ def register_celery_types() -> None: _register_pydantic_types(FileMetaData) _register_pydantic_types(FoldersBody) _register_pydantic_types(TaskError) + register_type(set, _class_full_name(set), encoder=list, decoder=set) diff --git a/services/storage/tests/unit/test_rpc_handlers_paths.py b/services/storage/tests/unit/test_rpc_handlers_paths.py index ef345c723e1..d33bb6bd996 100644 --- a/services/storage/tests/unit/test_rpc_handlers_paths.py +++ b/services/storage/tests/unit/test_rpc_handlers_paths.py @@ -7,7 +7,6 @@ # pylint:disable=unused-variable -import asyncio import datetime import random from pathlib import Path @@ -31,7 +30,10 @@ from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import ( wait_and_get_result, ) -from servicelib.rabbitmq.rpc_interfaces.storage.paths import compute_path_size +from servicelib.rabbitmq.rpc_interfaces.storage.paths import ( + compute_path_size, + delete_paths, +) from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager @@ -74,7 +76,6 @@ async def _assert_compute_path_size( location_id=location_id, path=path, ) - await asyncio.sleep(1) async for job_composed_result in wait_and_get_result( storage_rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, @@ -91,6 +92,39 @@ async def _assert_compute_path_size( return received_size pytest.fail("Job did not finish") + return ByteSize(0) # for mypy + + +async def _assert_delete_paths( + storage_rpc_client: RabbitMQRPCClient, + location_id: LocationID, + user_id: UserID, + product_name: ProductName, + *, + paths: set[Path], +) -> None: + async_job, async_job_name = await delete_paths( + storage_rpc_client, + product_name=product_name, + user_id=user_id, + location_id=location_id, + paths=paths, + ) + async for job_composed_result in wait_and_get_result( + storage_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + method_name=RPCMethodName(compute_path_size.__name__), + job_id=async_job.job_id, + job_id_data=AsyncJobNameData(user_id=user_id, product_name=product_name), + client_timeout=datetime.timedelta(seconds=120), + ): + if job_composed_result.done: + response = await job_composed_result.result() + assert isinstance(response, AsyncJobResult) + assert response.result is None + return + + pytest.fail("Job did not finish") @pytest.mark.parametrize( @@ -246,3 +280,107 @@ async def test_path_compute_size_inexistent_path( expected_total_size=0, product_name=product_name, ) + + +@pytest.mark.parametrize( + "location_id", + [SimcoreS3DataManager.get_location_id()], + ids=[SimcoreS3DataManager.get_location_name()], + indirect=True, +) +async def test_delete_paths_empty_set( + initialized_app: FastAPI, + storage_rabbitmq_rpc_client: RabbitMQRPCClient, + user_id: UserID, + location_id: LocationID, + product_name: ProductName, + with_storage_celery_worker: CeleryTaskQueueWorker, +): + await _assert_delete_paths( + storage_rabbitmq_rpc_client, + location_id, + user_id, + product_name, + paths=set(), + ) + + +@pytest.mark.parametrize( + "location_id", + [SimcoreS3DataManager.get_location_id()], + ids=[SimcoreS3DataManager.get_location_name()], + indirect=True, +) +@pytest.mark.parametrize( + "project_params", + [ + ProjectWithFilesParams( + num_nodes=1, + allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("1b"),), + workspace_files_count=15, + ) + ], + ids=str, +) +async def test_delete_paths( + initialized_app: FastAPI, + storage_rabbitmq_rpc_client: RabbitMQRPCClient, + user_id: UserID, + location_id: LocationID, + with_random_project_with_files: tuple[ + dict[str, Any], + dict[NodeID, dict[SimcoreS3FileID, FileIDDict]], + ], + project_params: ProjectWithFilesParams, + product_name: ProductName, + with_storage_celery_worker: CeleryTaskQueueWorker, +): + assert ( + len(project_params.allowed_file_sizes) == 1 + ), "test preconditions are not filled! allowed file sizes should have only 1 option for this test" + project, list_of_files = with_random_project_with_files + + total_num_files = sum( + len(files_in_node) for files_in_node in list_of_files.values() + ) + + # get size of a full project + expected_total_size = project_params.allowed_file_sizes[0] * total_num_files + path = Path(project["uuid"]) + await _assert_compute_path_size( + storage_rabbitmq_rpc_client, + location_id, + user_id, + path=path, + expected_total_size=expected_total_size, + product_name=product_name, + ) + + # now select multiple random files to delete + selected_paths = random.sample( + list( + list_of_files[ + NodeID(random.choice(list(project["workbench"]))) # noqa: S311 + ] + ), + round(project_params.workspace_files_count / 2), + ) + + await _assert_delete_paths( + storage_rabbitmq_rpc_client, + location_id, + user_id, + product_name, + paths=set({Path(_) for _ in selected_paths}), + ) + + # the size is reduced by the amount of deleted files + await _assert_compute_path_size( + storage_rabbitmq_rpc_client, + location_id, + user_id, + path=path, + expected_total_size=expected_total_size + - len(selected_paths) * project_params.allowed_file_sizes[0], + product_name=product_name, + ) diff --git a/services/web/server/VERSION b/services/web/server/VERSION index bf54d53ec26..62545fa777a 100644 --- a/services/web/server/VERSION +++ b/services/web/server/VERSION @@ -1 +1 @@ -0.61.4 +0.61.5 diff --git a/services/web/server/setup.cfg b/services/web/server/setup.cfg index ccbfa6b24c9..ae7dc66a0ee 100644 --- a/services/web/server/setup.cfg +++ b/services/web/server/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.61.4 +current_version = 0.61.5 commit = True message = services/webserver api version: {current_version} → {new_version} tag = False diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 6facdd9ddf1..ca947f2f17d 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -2,7 +2,7 @@ openapi: 3.1.0 info: title: simcore-service-webserver description: Main service with an interface (http-API & websockets) to the web front-end - version: 0.61.4 + version: 0.61.5 servers: - url: '' description: webserver @@ -6168,6 +6168,38 @@ paths: application/json: schema: $ref: '#/components/schemas/Envelope_TaskGet_' + /v0/storage/locations/{location_id}/-/paths:batchDelete: + post: + tags: + - storage + summary: Batch Delete Paths + description: Deletes Paths + operationId: batch_delete_paths + parameters: + - name: location_id + in: path + required: true + schema: + type: integer + title: Location Id + requestBody: + required: true + content: + application/json: + schema: + type: array + uniqueItems: true + items: + type: string + format: path + title: Paths + responses: + '202': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Envelope_TaskGet_' /v0/storage/locations/{location_id}/datasets: get: tags: diff --git a/services/web/server/src/simcore_service_webserver/storage/_rest.py b/services/web/server/src/simcore_service_webserver/storage/_rest.py index fbc419d9015..32729e0923b 100644 --- a/services/web/server/src/simcore_service_webserver/storage/_rest.py +++ b/services/web/server/src/simcore_service_webserver/storage/_rest.py @@ -13,6 +13,7 @@ TaskGet, ) from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobGet, AsyncJobNameData, ) from models_library.api_schemas_storage.storage_schemas import ( @@ -22,7 +23,9 @@ LinkType, ) from models_library.api_schemas_webserver.storage import ( + BatchDeletePathsBodyParams, DataExportPost, + StorageLocationPathParams, StoragePathComputeSizeParams, ) from models_library.projects_nodes_io import LocationID @@ -42,6 +45,9 @@ from servicelib.rabbitmq.rpc_interfaces.storage.paths import ( compute_path_size as remote_compute_path_size, ) +from servicelib.rabbitmq.rpc_interfaces.storage.paths import ( + delete_paths as remote_delete_paths, +) from servicelib.request_keys import RQT_USERID_KEY from servicelib.rest_responses import unwrap_envelope from yarl import URL @@ -172,6 +178,23 @@ async def list_paths(request: web.Request) -> web.Response: return create_data_response(payload, status=resp_status) +def _create_data_response_from_async_job( + request: web.Request, + async_job: AsyncJobGet, +) -> web.Response: + async_job_id = f"{async_job.job_id}" + return create_data_response( + TaskGet( + task_id=async_job_id, + task_name=async_job_id, + status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=async_job_id)))}", + abort_href=f"{request.url.with_path(str(request.app.router['abort_async_job'].url_for(task_id=async_job_id)))}", + result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=async_job_id)))}", + ), + status=status.HTTP_202_ACCEPTED, + ) + + @routes.post( f"{_storage_locations_prefix}/{{location_id}}/paths/{{path}}:size", name="compute_path_size", @@ -193,17 +216,29 @@ async def compute_path_size(request: web.Request) -> web.Response: path=path_params.path, ) - _job_id = f"{async_job.job_id}" - return create_data_response( - TaskGet( - task_id=_job_id, - task_name=_job_id, - status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=_job_id)))}", - abort_href=f"{request.url.with_path(str(request.app.router['abort_async_job'].url_for(task_id=_job_id)))}", - result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=_job_id)))}", - ), - status=status.HTTP_202_ACCEPTED, + return _create_data_response_from_async_job(request, async_job) + + +@routes.post( + f"{_storage_locations_prefix}/{{location_id}}/-/paths:batchDelete", + name="batch_delete_paths", +) +@login_required +@permission_required("storage.files.*") +async def batch_delete_paths(request: web.Request): + req_ctx = RequestContext.model_validate(request) + path_params = parse_request_path_parameters_as(StorageLocationPathParams, request) + body = await parse_request_body_as(BatchDeletePathsBodyParams, request) + + rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) + async_job, _ = await remote_delete_paths( + rabbitmq_rpc_client, + user_id=req_ctx.user_id, + product_name=req_ctx.product_name, + location_id=path_params.location_id, + paths=body.paths, ) + return _create_data_response_from_async_job(request, async_job) @routes.get( diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index 3ea1ec40230..2a447af81e3 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -44,6 +44,7 @@ ) from models_library.api_schemas_webserver._base import OutputSchema from models_library.api_schemas_webserver.storage import ( + BatchDeletePathsBodyParams, DataExportPost, ) from models_library.generics import Envelope @@ -190,6 +191,51 @@ async def test_compute_path_size( TypeAdapter(TaskGet).validate_python(data) +@pytest.mark.parametrize( + "user_role,expected", + [ + (UserRole.ANONYMOUS, status.HTTP_401_UNAUTHORIZED), + (UserRole.GUEST, status.HTTP_202_ACCEPTED), + (UserRole.USER, status.HTTP_202_ACCEPTED), + (UserRole.TESTER, status.HTTP_202_ACCEPTED), + ], +) +@pytest.mark.parametrize( + "backend_result_or_exception", + [ + AsyncJobGet(job_id=AsyncJobId(f"{_faker.uuid4()}")), + ], + ids=lambda x: type(x).__name__, +) +async def test_batch_delete_paths( + client: TestClient, + logged_user: dict[str, Any], + expected: int, + location_id: LocationID, + faker: Faker, + create_storage_paths_rpc_client_mock: Callable[[str, Any], None], + backend_result_or_exception: Any, +): + create_storage_paths_rpc_client_mock( + submit.__name__, + backend_result_or_exception, + ) + + body = BatchDeletePathsBodyParams( + paths={Path(f"{faker.file_path(absolute=False)}")} + ) + + assert client.app + url = client.app.router["batch_delete_paths"].url_for( + location_id=f"{location_id}", + ) + + resp = await client.post(f"{url}", json=body.model_dump(mode="json")) + data, error = await assert_status(resp, expected) + if not error: + TypeAdapter(TaskGet).validate_python(data) + + @pytest.mark.parametrize( "user_role,expected", [