Skip to content

✨Storage: batchDelete #7450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions api/specs/web-server/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
PresignedLink,
)
from models_library.api_schemas_webserver.storage import (
BatchDeletePathsBodyParams,
DataExportPost,
ListPathsQueryParams,
StorageLocationPathParams,
Expand Down Expand Up @@ -80,6 +81,19 @@ async def compute_path_size(_path: Annotated[StoragePathComputeSizeParams, Depen
"""Compute the size of a path"""


@router.post(
"/storage/locations/{location_id}/-/paths:batchDelete",
response_model=Envelope[TaskGet],
status_code=status.HTTP_202_ACCEPTED,
description="Deletes Paths",
)
async def batch_delete_paths(
_path: Annotated[StorageLocationPathParams, Depends()],
_body: Annotated[BatchDeletePathsBodyParams, Depends()],
):
"""deletes files/folders if user has the rights to"""


@router.get(
"/storage/locations/{location_id}/datasets",
response_model=Envelope[list[DatasetMetaData]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class ListPathsQueryParams(InputSchema, CursorQueryParameters):
] = DEFAULT_NUMBER_OF_PATHS_PER_PAGE


class BatchDeletePathsBodyParams(InputSchema):
paths: set[Path]


class DataExportPost(InputSchema):
paths: list[StorageFileID]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,23 @@ async def compute_path_size(
path=path,
)
return async_job_rpc_get, job_id_data


async def delete_paths(
client: RabbitMQRPCClient,
*,
user_id: UserID,
product_name: ProductName,
location_id: LocationID,
paths: set[Path],
) -> tuple[AsyncJobGet, AsyncJobNameData]:
job_id_data = AsyncJobNameData(user_id=user_id, product_name=product_name)
async_job_rpc_get = await submit(
rabbitmq_rpc_client=client,
rpc_namespace=STORAGE_RPC_NAMESPACE,
method_name=RPCMethodName("delete_paths"),
job_id_data=job_id_data,
location_id=location_id,
paths=paths,
)
return async_job_rpc_get, job_id_data
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
from pathlib import Path

from celery import Task # type: ignore[import-untyped]
from models_library.projects_nodes_io import LocationID
from models_library.projects_nodes_io import LocationID, StorageFileID
from models_library.users import UserID
from pydantic import ByteSize
from pydantic import ByteSize, TypeAdapter
from servicelib.logging_utils import log_context
from servicelib.utils import limited_gather

from ...constants import MAX_CONCURRENT_S3_TASKS
from ...dsm import get_dsm_provider
from ...modules.celery.models import TaskId
from ...modules.celery.utils import get_fastapi_app
Expand All @@ -25,3 +27,26 @@ async def compute_path_size(
):
dsm = get_dsm_provider(get_fastapi_app(task.app)).get(location_id)
return await dsm.compute_path_size(user_id, path=Path(path))


async def delete_paths(
task: Task,
task_id: TaskId,
user_id: UserID,
location_id: LocationID,
paths: set[Path],
) -> None:
assert task_id # nosec
with log_context(
_logger,
logging.INFO,
msg=f"delete {paths=} in {location_id=} for {user_id=}",
):
dsm = get_dsm_provider(get_fastapi_app(task.app)).get(location_id)
files_ids: set[StorageFileID] = {
TypeAdapter(StorageFileID).validate_python(f"{path}") for path in paths
}
await limited_gather(
*[dsm.delete_file(user_id, file_id) for file_id in files_ids],
limit=MAX_CONCURRENT_S3_TASKS,
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ...modules.celery._task import define_task
from ...modules.celery.tasks import export_data
from ._files import complete_upload_file
from ._paths import compute_path_size
from ._paths import compute_path_size, delete_paths
from ._simcore_s3 import deep_copy_files_from_project

_logger = logging.getLogger(__name__)
Expand All @@ -22,5 +22,6 @@ def setup_worker_tasks(app: Celery) -> None:
):
define_task(app, export_data)
define_task(app, compute_path_size)
define_task(app, delete_paths)
define_task(app, complete_upload_file)
define_task(app, deep_copy_files_from_project)
19 changes: 18 additions & 1 deletion services/storage/src/simcore_service_storage/api/rpc/_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from ...modules.celery import get_celery_client
from .._worker_tasks._paths import compute_path_size as remote_compute_path_size
from .._worker_tasks._paths import delete_paths as remote_delete_paths

_logger = logging.getLogger(__name__)
router = RPCRouter()
Expand All @@ -20,7 +21,6 @@
async def compute_path_size(
app: FastAPI,
job_id_data: AsyncJobNameData,
# user_id: UserID,
location_id: LocationID,
path: Path,
) -> AsyncJobGet:
Expand All @@ -33,3 +33,20 @@ async def compute_path_size(
)

return AsyncJobGet(job_id=task_uuid)


@router.expose(reraise_if_error_type=None)
async def delete_paths(
app: FastAPI,
job_id_data: AsyncJobNameData,
location_id: LocationID,
paths: set[Path],
) -> AsyncJobGet:
task_uuid = await get_celery_client(app).send_task(
remote_delete_paths.__name__,
task_context=job_id_data.model_dump(),
user_id=job_id_data.user_id,
location_id=location_id,
paths=paths,
)
return AsyncJobGet(job_id=task_uuid)
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ def register_celery_types() -> None:
_register_pydantic_types(FileMetaData)
_register_pydantic_types(FoldersBody)
_register_pydantic_types(TaskError)
register_type(set, _class_full_name(set), encoder=list, decoder=set)
144 changes: 141 additions & 3 deletions services/storage/tests/unit/test_rpc_handlers_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# pylint:disable=unused-variable


import asyncio
import datetime
import random
from pathlib import Path
Expand All @@ -31,7 +30,10 @@
from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import (
wait_and_get_result,
)
from servicelib.rabbitmq.rpc_interfaces.storage.paths import compute_path_size
from servicelib.rabbitmq.rpc_interfaces.storage.paths import (
compute_path_size,
delete_paths,
)
from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker
from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager

Expand Down Expand Up @@ -74,7 +76,6 @@ async def _assert_compute_path_size(
location_id=location_id,
path=path,
)
await asyncio.sleep(1)
async for job_composed_result in wait_and_get_result(
storage_rpc_client,
rpc_namespace=STORAGE_RPC_NAMESPACE,
Expand All @@ -91,6 +92,39 @@ async def _assert_compute_path_size(
return received_size

pytest.fail("Job did not finish")
return ByteSize(0) # for mypy


async def _assert_delete_paths(
storage_rpc_client: RabbitMQRPCClient,
location_id: LocationID,
user_id: UserID,
product_name: ProductName,
*,
paths: set[Path],
) -> None:
async_job, async_job_name = await delete_paths(
storage_rpc_client,
product_name=product_name,
user_id=user_id,
location_id=location_id,
paths=paths,
)
async for job_composed_result in wait_and_get_result(
storage_rpc_client,
rpc_namespace=STORAGE_RPC_NAMESPACE,
method_name=RPCMethodName(compute_path_size.__name__),
job_id=async_job.job_id,
job_id_data=AsyncJobNameData(user_id=user_id, product_name=product_name),
client_timeout=datetime.timedelta(seconds=120),
):
if job_composed_result.done:
response = await job_composed_result.result()
assert isinstance(response, AsyncJobResult)
assert response.result is None
return

pytest.fail("Job did not finish")


@pytest.mark.parametrize(
Expand Down Expand Up @@ -246,3 +280,107 @@ async def test_path_compute_size_inexistent_path(
expected_total_size=0,
product_name=product_name,
)


@pytest.mark.parametrize(
"location_id",
[SimcoreS3DataManager.get_location_id()],
ids=[SimcoreS3DataManager.get_location_name()],
indirect=True,
)
async def test_delete_paths_empty_set(
initialized_app: FastAPI,
storage_rabbitmq_rpc_client: RabbitMQRPCClient,
user_id: UserID,
location_id: LocationID,
product_name: ProductName,
with_storage_celery_worker: CeleryTaskQueueWorker,
):
await _assert_delete_paths(
storage_rabbitmq_rpc_client,
location_id,
user_id,
product_name,
paths=set(),
)


@pytest.mark.parametrize(
"location_id",
[SimcoreS3DataManager.get_location_id()],
ids=[SimcoreS3DataManager.get_location_name()],
indirect=True,
)
@pytest.mark.parametrize(
"project_params",
[
ProjectWithFilesParams(
num_nodes=1,
allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("1b"),),
workspace_files_count=15,
)
],
ids=str,
)
async def test_delete_paths(
initialized_app: FastAPI,
storage_rabbitmq_rpc_client: RabbitMQRPCClient,
user_id: UserID,
location_id: LocationID,
with_random_project_with_files: tuple[
dict[str, Any],
dict[NodeID, dict[SimcoreS3FileID, FileIDDict]],
],
project_params: ProjectWithFilesParams,
product_name: ProductName,
with_storage_celery_worker: CeleryTaskQueueWorker,
):
assert (
len(project_params.allowed_file_sizes) == 1
), "test preconditions are not filled! allowed file sizes should have only 1 option for this test"
project, list_of_files = with_random_project_with_files

total_num_files = sum(
len(files_in_node) for files_in_node in list_of_files.values()
)

# get size of a full project
expected_total_size = project_params.allowed_file_sizes[0] * total_num_files
path = Path(project["uuid"])
await _assert_compute_path_size(
storage_rabbitmq_rpc_client,
location_id,
user_id,
path=path,
expected_total_size=expected_total_size,
product_name=product_name,
)

# now select multiple random files to delete
selected_paths = random.sample(
list(
list_of_files[
NodeID(random.choice(list(project["workbench"]))) # noqa: S311
]
),
round(project_params.workspace_files_count / 2),
)

await _assert_delete_paths(
storage_rabbitmq_rpc_client,
location_id,
user_id,
product_name,
paths=set({Path(_) for _ in selected_paths}),
)

# the size is reduced by the amount of deleted files
await _assert_compute_path_size(
storage_rabbitmq_rpc_client,
location_id,
user_id,
path=path,
expected_total_size=expected_total_size
- len(selected_paths) * project_params.allowed_file_sizes[0],
product_name=product_name,
)
2 changes: 1 addition & 1 deletion services/web/server/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.61.4
0.61.5
2 changes: 1 addition & 1 deletion services/web/server/setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.61.4
current_version = 0.61.5
commit = True
message = services/webserver api version: {current_version} → {new_version}
tag = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ openapi: 3.1.0
info:
title: simcore-service-webserver
description: Main service with an interface (http-API & websockets) to the web front-end
version: 0.61.4
version: 0.61.5
servers:
- url: ''
description: webserver
Expand Down Expand Up @@ -6168,6 +6168,38 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/Envelope_TaskGet_'
/v0/storage/locations/{location_id}/-/paths:batchDelete:
post:
tags:
- storage
summary: Batch Delete Paths
description: Deletes Paths
operationId: batch_delete_paths
parameters:
- name: location_id
in: path
required: true
schema:
type: integer
title: Location Id
requestBody:
required: true
content:
application/json:
schema:
type: array
uniqueItems: true
items:
type: string
format: path
title: Paths
responses:
'202':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/Envelope_TaskGet_'
/v0/storage/locations/{location_id}/datasets:
get:
tags:
Expand Down
Loading
Loading