diff --git a/packages/service-library/src/servicelib/fastapi/http_error.py b/packages/service-library/src/servicelib/fastapi/http_error.py index 8640fbf2dbb..2cc9814dc8f 100644 --- a/packages/service-library/src/servicelib/fastapi/http_error.py +++ b/packages/service-library/src/servicelib/fastapi/http_error.py @@ -1,3 +1,4 @@ +import logging from collections.abc import Awaitable, Callable from typing import TypeVar @@ -10,6 +11,9 @@ from fastapi.responses import JSONResponse from pydantic import ValidationError +from ..logging_errors import create_troubleshotting_log_kwargs +from ..status_codes_utils import is_5xx_server_error + validation_error_response_definition["properties"] = { "errors": { "title": "Validation errors", @@ -21,6 +25,8 @@ TException = TypeVar("TException") +_logger = logging.getLogger(__name__) + def make_http_error_handler_for_exception( status_code: int, @@ -36,12 +42,24 @@ def make_http_error_handler_for_exception( SEE https://docs.python.org/3/library/exceptions.html#concrete-exceptions """ - async def _http_error_handler(_: Request, exc: Exception) -> JSONResponse: + async def _http_error_handler(request: Request, exc: Exception) -> JSONResponse: assert isinstance(exc, exception_cls) # nosec error_content = { "errors": error_extractor(exc) if error_extractor else [f"{exc}"] } + if is_5xx_server_error(status_code): + _logger.exception( + create_troubleshotting_log_kwargs( + "Unexpected error happened in the Resource Usage Tracker. Please contact support.", + error=exc, + error_context={ + "request": request, + "request.method": f"{request.method}", + }, + ) + ) + return JSONResponse( content=jsonable_encoder( {"error": error_content} if envelope_error else error_content diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py index a549b8fcffc..0d03e83a1f6 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py @@ -5,6 +5,7 @@ AsyncJobNameData, ) from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE +from models_library.products import ProductName from models_library.projects_nodes_io import LocationID from models_library.rabbitmq_basic_types import RPCMethodName from models_library.users import UserID @@ -17,7 +18,7 @@ async def compute_path_size( client: RabbitMQRPCClient, *, user_id: UserID, - product_name: str, + product_name: ProductName, location_id: LocationID, path: Path, ) -> tuple[AsyncJobGet, AsyncJobNameData]: diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/simcore_s3.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/simcore_s3.py new file mode 100644 index 00000000000..a56b91a9af9 --- /dev/null +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/simcore_s3.py @@ -0,0 +1,29 @@ +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobGet, + AsyncJobNameData, +) +from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE +from models_library.api_schemas_storage.storage_schemas import FoldersBody +from models_library.rabbitmq_basic_types import RPCMethodName +from models_library.users import UserID + +from ..._client_rpc import RabbitMQRPCClient +from ..async_jobs.async_jobs import submit + + +async def copy_folders_from_project( + client: RabbitMQRPCClient, + *, + user_id: UserID, + product_name: str, + body: FoldersBody, +) -> tuple[AsyncJobGet, AsyncJobNameData]: + job_id_data = AsyncJobNameData(user_id=user_id, product_name=product_name) + async_job_rpc_get = await submit( + rabbitmq_rpc_client=client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + method_name=RPCMethodName("copy_folders_from_project"), + job_id_data=job_id_data, + body=body, + ) + return async_job_rpc_get, job_id_data diff --git a/packages/simcore-sdk/tests/integration/test_node_data_data_manager.py b/packages/simcore-sdk/tests/integration/test_node_data_data_manager.py index 59806926025..a25e95aa715 100644 --- a/packages/simcore-sdk/tests/integration/test_node_data_data_manager.py +++ b/packages/simcore-sdk/tests/integration/test_node_data_data_manager.py @@ -32,6 +32,7 @@ "rabbit", "redis", "storage", + "sto-worker", ] pytest_simcore_ops_services_selection = [ diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_common_aws_s3_cli.py b/packages/simcore-sdk/tests/integration/test_node_ports_common_aws_s3_cli.py index 018b047b3a8..9de63cb4fed 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_common_aws_s3_cli.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_common_aws_s3_cli.py @@ -28,6 +28,7 @@ "rabbit", "redis", "storage", + "sto-worker", ] pytest_simcore_ops_services_selection = [ diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py b/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py index b7368cffd65..da7bef85cbe 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py @@ -37,6 +37,7 @@ "rabbit", "redis", "storage", + "sto-worker", ] pytest_simcore_ops_services_selection = ["minio", "adminer"] diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_common_r_clone.py b/packages/simcore-sdk/tests/integration/test_node_ports_common_r_clone.py index 3beb4c6e0f2..598d7d653e7 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_common_r_clone.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_common_r_clone.py @@ -29,6 +29,7 @@ "rabbit", "redis", "storage", + "sto-worker", ] pytest_simcore_ops_services_selection = [ diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py index 2affe04e190..f9f189d01c4 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py @@ -48,6 +48,7 @@ "rabbit", "redis", "storage", + "sto-worker", ] pytest_simcore_ops_services_selection = [ @@ -280,7 +281,6 @@ async def test_port_file_accessors( request: pytest.FixtureRequest, constant_uuid4: None, ): - if item_value == "symlink_path": item_value = request.getfixturevalue("symlink_path") if config_value == "config_value_symlink_path": diff --git a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py index 1d867be004a..6f8f6769f54 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py +++ b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py @@ -110,6 +110,7 @@ "rabbit", "redis", "storage", + "sto-worker", "redis", ] @@ -934,15 +935,15 @@ async def test_nodeports_integration( `aioboto` instead of `docker` or `storage-data_manager API`. """ # STEP 1 - dynamic_services_urls: dict[ - str, str - ] = await _start_and_wait_for_dynamic_services_ready( - director_v2_client=async_client, - product_name=osparc_product_name, - user_id=current_user["id"], - workbench_dynamic_services=workbench_dynamic_services, - current_study=current_study, - catalog_url=services_endpoint["catalog"], + dynamic_services_urls: dict[str, str] = ( + await _start_and_wait_for_dynamic_services_ready( + director_v2_client=async_client, + product_name=osparc_product_name, + user_id=current_user["id"], + workbench_dynamic_services=workbench_dynamic_services, + current_study=current_study, + catalog_url=services_endpoint["catalog"], + ) ) # STEP 2 diff --git a/services/dynamic-sidecar/tests/integration/test_modules_long_running_tasks.py b/services/dynamic-sidecar/tests/integration/test_modules_long_running_tasks.py index e205946c90d..b7d45d90654 100644 --- a/services/dynamic-sidecar/tests/integration/test_modules_long_running_tasks.py +++ b/services/dynamic-sidecar/tests/integration/test_modules_long_running_tasks.py @@ -50,6 +50,7 @@ "rabbit", "redis", "storage", + "sto-worker", ] pytest_simcore_ops_services_selection = [ @@ -179,7 +180,6 @@ async def restore_legacy_state_archives( node_id: NodeID, state_paths_to_legacy_archives: dict[Path, Path], ) -> None: - tasks = [] for legacy_archive_zip in state_paths_to_legacy_archives.values(): s3_path = f"{project_id}/{node_id}/{legacy_archive_zip.name}" diff --git a/services/storage/VERSION b/services/storage/VERSION index a918a2aa18d..faef31a4357 100644 --- a/services/storage/VERSION +++ b/services/storage/VERSION @@ -1 +1 @@ -0.6.0 +0.7.0 diff --git a/services/storage/openapi.json b/services/storage/openapi.json index 2c5676b08bc..b9a9e0021df 100644 --- a/services/storage/openapi.json +++ b/services/storage/openapi.json @@ -1185,60 +1185,6 @@ } } }, - "/v0/simcore-s3/folders": { - "post": { - "tags": [ - "simcore-s3" - ], - "summary": "Copy Folders From Project", - "operationId": "copy_folders_from_project_v0_simcore_s3_folders_post", - "parameters": [ - { - "name": "user_id", - "in": "query", - "required": true, - "schema": { - "type": "integer", - "exclusiveMinimum": true, - "title": "User Id", - "minimum": 0 - } - } - ], - "requestBody": { - "required": true, - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/FoldersBody" - } - } - } - }, - "responses": { - "202": { - "description": "Successful Response", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/Envelope_TaskGet_" - } - } - } - }, - "422": { - "description": "Validation Error", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/HTTPValidationError" - } - } - } - } - } - } - }, "/v0/simcore-s3/folders/{folder_id}": { "delete": { "tags": [ @@ -1861,31 +1807,6 @@ "type": "object", "title": "Envelope[S3Settings]" }, - "Envelope_TaskGet_": { - "properties": { - "data": { - "anyOf": [ - { - "$ref": "#/components/schemas/TaskGet" - }, - { - "type": "null" - } - ] - }, - "error": { - "anyOf": [ - {}, - { - "type": "null" - } - ], - "title": "Error" - } - }, - "type": "object", - "title": "Envelope[TaskGet]" - }, "Envelope_dict_": { "properties": { "data": { @@ -2417,31 +2338,6 @@ ], "title": "FileUploadSchema" }, - "FoldersBody": { - "properties": { - "source": { - "type": "object", - "title": "Source" - }, - "destination": { - "type": "object", - "title": "Destination" - }, - "nodes_map": { - "additionalProperties": { - "type": "string", - "format": "uuid" - }, - "propertyNames": { - "format": "uuid" - }, - "type": "object", - "title": "Nodes Map" - } - }, - "type": "object", - "title": "FoldersBody" - }, "HTTPValidationError": { "properties": { "errors": { @@ -2639,39 +2535,6 @@ ], "title": "SoftCopyBody" }, - "TaskGet": { - "properties": { - "task_id": { - "type": "string", - "title": "Task Id" - }, - "task_name": { - "type": "string", - "title": "Task Name" - }, - "status_href": { - "type": "string", - "title": "Status Href" - }, - "result_href": { - "type": "string", - "title": "Result Href" - }, - "abort_href": { - "type": "string", - "title": "Abort Href" - } - }, - "type": "object", - "required": [ - "task_id", - "task_name", - "status_href", - "result_href", - "abort_href" - ], - "title": "TaskGet" - }, "UploadedPart": { "properties": { "number": { diff --git a/services/storage/setup.cfg b/services/storage/setup.cfg index a185ddfb0a4..a0fbc3b4cac 100644 --- a/services/storage/setup.cfg +++ b/services/storage/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.6.0 +current_version = 0.7.0 commit = True message = services/storage api version: {current_version} → {new_version} tag = False diff --git a/services/storage/src/simcore_service_storage/api/_worker_tasks/_files.py b/services/storage/src/simcore_service_storage/api/_worker_tasks/_files.py new file mode 100644 index 00000000000..871e8a7bcbc --- /dev/null +++ b/services/storage/src/simcore_service_storage/api/_worker_tasks/_files.py @@ -0,0 +1,37 @@ +import logging + +from celery import Task # type: ignore[import-untyped] +from models_library.api_schemas_storage.storage_schemas import ( + FileUploadCompletionBody, +) +from models_library.projects_nodes_io import LocationID, StorageFileID +from models_library.users import UserID +from servicelib.logging_utils import log_context + +from ...dsm import get_dsm_provider +from ...models import FileMetaData +from ...modules.celery.models import TaskId +from ...modules.celery.utils import get_fastapi_app + +_logger = logging.getLogger(__name__) + + +async def complete_upload_file( + task: Task, + task_id: TaskId, + user_id: UserID, + location_id: LocationID, + file_id: StorageFileID, + body: FileUploadCompletionBody, +) -> FileMetaData: + assert task_id # nosec + with log_context( + _logger, + logging.INFO, + msg=f"completing upload of file {user_id=}, {location_id=}, {file_id=}", + ): + dsm = get_dsm_provider(get_fastapi_app(task.app)).get(location_id) + # NOTE: completing a multipart upload on AWS can take up to several minutes + # if it returns slow we return a 202 - Accepted, the client will have to check later + # for completeness + return await dsm.complete_file_upload(file_id, user_id, body.parts) diff --git a/services/storage/src/simcore_service_storage/api/_worker_tasks/_simcore_s3.py b/services/storage/src/simcore_service_storage/api/_worker_tasks/_simcore_s3.py new file mode 100644 index 00000000000..508cc2e0d01 --- /dev/null +++ b/services/storage/src/simcore_service_storage/api/_worker_tasks/_simcore_s3.py @@ -0,0 +1,55 @@ +import functools +import logging +from typing import Any + +from celery import Task # type: ignore[import-untyped] +from models_library.api_schemas_storage.storage_schemas import FoldersBody +from models_library.progress_bar import ProgressReport +from models_library.users import UserID +from servicelib.logging_utils import log_context +from servicelib.progress_bar import ProgressBarData + +from ...dsm import get_dsm_provider +from ...modules.celery.models import TaskId +from ...modules.celery.utils import get_celery_worker, get_fastapi_app +from ...simcore_s3_dsm import SimcoreS3DataManager + +_logger = logging.getLogger(__name__) + + +def _task_progress_cb(task: Task, task_id: TaskId, report: ProgressReport) -> None: + worker = get_celery_worker(task.app) + assert task.name # nosec + worker.set_task_progress( + task_name=task.name, + task_id=task_id, + report=report, + ) + + +async def deep_copy_files_from_project( + task: Task, task_id: TaskId, user_id: UserID, body: FoldersBody +) -> dict[str, Any]: + with log_context( + _logger, + logging.INFO, + msg=f"copying {body.source['uuid']} -> {body.destination['uuid']} with {task.request.id}", + ): + dsm = get_dsm_provider(get_fastapi_app(task.app)).get( + SimcoreS3DataManager.get_location_id() + ) + assert isinstance(dsm, SimcoreS3DataManager) # nosec + async with ProgressBarData( + num_steps=1, + description="copying files", + progress_report_cb=functools.partial(_task_progress_cb, task, task_id), + ) as task_progress: + await dsm.deep_copy_project_simcore_s3( + user_id, + body.source, + body.destination, + body.nodes_map, + task_progress=task_progress, + ) + + return body.destination diff --git a/services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py b/services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py index 557013de976..3848e4114a4 100644 --- a/services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py +++ b/services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py @@ -6,7 +6,9 @@ from ...modules.celery._celery_types import register_celery_types from ...modules.celery._task import define_task from ...modules.celery.tasks import export_data +from ._files import complete_upload_file from ._paths import compute_path_size +from ._simcore_s3 import deep_copy_files_from_project _logger = logging.getLogger(__name__) @@ -20,3 +22,5 @@ def setup_worker_tasks(app: Celery) -> None: ): define_task(app, export_data) define_task(app, compute_path_size) + define_task(app, complete_upload_file) + define_task(app, deep_copy_files_from_project) diff --git a/services/storage/src/simcore_service_storage/api/rest/_files.py b/services/storage/src/simcore_service_storage/api/rest/_files.py index c0b6a4f4a7c..253fa99627a 100644 --- a/services/storage/src/simcore_service_storage/api/rest/_files.py +++ b/services/storage/src/simcore_service_storage/api/rest/_files.py @@ -1,9 +1,9 @@ -import asyncio import logging -from typing import Annotated, cast +from typing import Annotated, Final, cast from urllib.parse import quote -from fastapi import APIRouter, Depends, Header, HTTPException, Request +from fastapi import APIRouter, Depends, Header, Request +from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobNameData from models_library.api_schemas_storage.storage_schemas import ( FileMetaDataGet, FileMetaDataGetv010, @@ -34,9 +34,11 @@ StorageQueryParamsBase, UploadLinks, ) -from ...modules.long_running_tasks import get_completed_upload_tasks +from ...modules.celery.client import CeleryTaskQueueClient +from ...modules.celery.models import TaskUUID from ...simcore_s3_dsm import SimcoreS3DataManager -from ...utils.utils import create_upload_completion_task_name +from .._worker_tasks._files import complete_upload_file as remote_complete_upload_file +from .dependencies.celery import get_celery_client _logger = logging.getLogger(__name__) @@ -257,27 +259,39 @@ async def abort_upload_file( await dsm.abort_file_upload(query_params.user_id, file_id) +_UNDEFINED_PRODUCT_NAME_FOR_WORKER_TASKS: Final[str] = ( + "undefinedproduct" # NOTE: this is used to keep backwards compatibility with user of these APIs +) + + @router.post( "/locations/{location_id}/files/{file_id:path}:complete", response_model=Envelope[FileUploadCompleteResponse], status_code=status.HTTP_202_ACCEPTED, ) async def complete_upload_file( + celery_client: Annotated[CeleryTaskQueueClient, Depends(get_celery_client)], query_params: Annotated[StorageQueryParamsBase, Depends()], location_id: LocationID, file_id: StorageFileID, body: FileUploadCompletionBody, request: Request, ): - dsm = get_dsm_provider(request.app).get(location_id) # NOTE: completing a multipart upload on AWS can take up to several minutes # if it returns slow we return a 202 - Accepted, the client will have to check later # for completeness - task = asyncio.create_task( - dsm.complete_file_upload(file_id, query_params.user_id, body.parts), - name=create_upload_completion_task_name(query_params.user_id, file_id), + async_job_name_data = AsyncJobNameData( + user_id=query_params.user_id, + product_name=_UNDEFINED_PRODUCT_NAME_FOR_WORKER_TASKS, # NOTE: I would need to change the API here + ) + task_uuid = await celery_client.send_task( + remote_complete_upload_file.__name__, + task_context=async_job_name_data.model_dump(), + user_id=async_job_name_data.user_id, + location_id=location_id, + file_id=file_id, + body=body, ) - get_completed_upload_tasks(request.app)[task.get_name()] = task route = ( URL(f"{request.url}") @@ -287,7 +301,7 @@ async def complete_upload_file( "is_completed_upload_file", location_id=f"{location_id}", file_id=file_id, - future_id=task.get_name(), + future_id=f"{task_uuid}", ), safe=":/", ), @@ -310,48 +324,39 @@ async def complete_upload_file( response_model=Envelope[FileUploadCompleteFutureResponse], ) async def is_completed_upload_file( + celery_client: Annotated[CeleryTaskQueueClient, Depends(get_celery_client)], query_params: Annotated[StorageQueryParamsBase, Depends()], location_id: LocationID, file_id: StorageFileID, future_id: str, - request: Request, ): # NOTE: completing a multipart upload on AWS can take up to several minutes # therefore we wait a bit to see if it completes fast and return a 204 # if it returns slow we return a 202 - Accepted, the client will have to check later # for completeness - task_name = create_upload_completion_task_name(query_params.user_id, file_id) - assert task_name == future_id # nosec # NOTE: fastapi auto-decode path parameters - # first check if the task is in the app - if task := get_completed_upload_tasks(request.app).get(task_name): - if task.done(): - new_fmd: FileMetaData = task.result() - get_completed_upload_tasks(request.app).pop(task_name) - response = FileUploadCompleteFutureResponse( - state=FileUploadCompleteState.OK, e_tag=new_fmd.entity_tag - ) - else: - # the task is still running - response = FileUploadCompleteFutureResponse( - state=FileUploadCompleteState.NOK - ) - return Envelope[FileUploadCompleteFutureResponse](data=response) - # there is no task, either wrong call or storage was restarted - # we try to get the file to see if it exists in S3 - dsm = get_dsm_provider(request.app).get(location_id) - if fmd := await dsm.get_file( + async_job_name_data = AsyncJobNameData( user_id=query_params.user_id, - file_id=file_id, - ): - return Envelope[FileUploadCompleteFutureResponse]( - data=FileUploadCompleteFutureResponse( - state=FileUploadCompleteState.OK, e_tag=fmd.entity_tag - ) - ) - raise HTTPException( - status.HTTP_404_NOT_FOUND, - detail="Not found. Upload could not be completed. Please try again and contact support if it fails again.", + product_name=_UNDEFINED_PRODUCT_NAME_FOR_WORKER_TASKS, # NOTE: I would need to change the API here ) + task_status = await celery_client.get_task_status( + task_context=async_job_name_data.model_dump(), task_uuid=TaskUUID(future_id) + ) + # first check if the task is in the app + if task_status.is_done: + task_result = await celery_client.get_task_result( + task_context=async_job_name_data.model_dump(), task_uuid=TaskUUID(future_id) + ) + assert isinstance(task_result, FileMetaData), f"{task_result=}" # nosec + new_fmd = task_result + assert new_fmd.location_id == location_id # nosec + assert new_fmd.file_id == file_id # nosec + response = FileUploadCompleteFutureResponse( + state=FileUploadCompleteState.OK, e_tag=new_fmd.entity_tag + ) + else: + # the task is still running + response = FileUploadCompleteFutureResponse(state=FileUploadCompleteState.NOK) + return Envelope[FileUploadCompleteFutureResponse](data=response) @router.delete( diff --git a/services/storage/src/simcore_service_storage/api/rest/_simcore_s3.py b/services/storage/src/simcore_service_storage/api/rest/_simcore_s3.py index 29b199e6feb..d4026a9f33a 100644 --- a/services/storage/src/simcore_service_storage/api/rest/_simcore_s3.py +++ b/services/storage/src/simcore_service_storage/api/rest/_simcore_s3.py @@ -1,22 +1,14 @@ -import asyncio import logging -from typing import Annotated, Any, cast +from typing import Annotated, cast -from fastapi import APIRouter, Depends, FastAPI, Request -from models_library.api_schemas_long_running_tasks.base import TaskProgress -from models_library.api_schemas_long_running_tasks.tasks import TaskGet +from fastapi import APIRouter, Depends, Request from models_library.api_schemas_storage.storage_schemas import ( FileMetaDataGet, - FoldersBody, ) from models_library.generics import Envelope from models_library.projects import ProjectID from servicelib.aiohttp import status -from servicelib.fastapi.long_running_tasks._dependencies import get_tasks_manager -from servicelib.logging_utils import log_context -from servicelib.long_running_tasks._task import start_task from settings_library.s3 import S3Settings -from yarl import URL from ...dsm import get_dsm_provider from ...models import ( @@ -50,70 +42,6 @@ async def get_or_create_temporary_s3_access( return Envelope[S3Settings](data=s3_settings) -async def _copy_folders_from_project( - progress: TaskProgress, - app: FastAPI, - query_params: StorageQueryParamsBase, - body: FoldersBody, -) -> Envelope[dict[str, Any]]: - dsm = cast( - SimcoreS3DataManager, - get_dsm_provider(app).get(SimcoreS3DataManager.get_location_id()), - ) - with log_context( - _logger, - logging.INFO, - msg=f"copying {body.source['uuid']} -> {body.destination['uuid']}", - ): - await dsm.deep_copy_project_simcore_s3( - query_params.user_id, - body.source, - body.destination, - body.nodes_map, - task_progress=progress, - ) - - return Envelope[dict[str, Any]](data=body.destination) - - -@router.post( - "/simcore-s3/folders", - response_model=Envelope[TaskGet], - status_code=status.HTTP_202_ACCEPTED, -) -async def copy_folders_from_project( - query_params: Annotated[StorageQueryParamsBase, Depends()], - body: FoldersBody, - request: Request, -): - task_id = None - try: - task_id = start_task( - get_tasks_manager(request), - _copy_folders_from_project, - app=request.app, - query_params=query_params, - body=body, - ) - relative_url = URL(f"{request.url}").relative() - - return Envelope[TaskGet]( - data=TaskGet( - task_id=task_id, - task_name=f"{request.method} {relative_url}", - status_href=f"{request.url_for('get_task_status', task_id=task_id)}", - result_href=f"{request.url_for('get_task_result', task_id=task_id)}", - abort_href=f"{request.url_for('cancel_and_delete_task', task_id=task_id)}", - ) - ) - except asyncio.CancelledError: - if task_id: - await get_tasks_manager(request).cancel_task( - task_id, with_task_context=None - ) - raise - - @router.delete( "/simcore-s3/folders/{folder_id}", status_code=status.HTTP_204_NO_CONTENT, diff --git a/services/storage/src/simcore_service_storage/api/rpc/_simcore_s3.py b/services/storage/src/simcore_service_storage/api/rpc/_simcore_s3.py new file mode 100644 index 00000000000..b477668cf4c --- /dev/null +++ b/services/storage/src/simcore_service_storage/api/rpc/_simcore_s3.py @@ -0,0 +1,28 @@ +from fastapi import FastAPI +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobGet, + AsyncJobNameData, +) +from models_library.api_schemas_storage.storage_schemas import FoldersBody +from servicelib.rabbitmq._rpc_router import RPCRouter + +from ...modules.celery import get_celery_client +from .._worker_tasks._simcore_s3 import deep_copy_files_from_project + +router = RPCRouter() + + +@router.expose(reraise_if_error_type=None) +async def copy_folders_from_project( + app: FastAPI, + job_id_data: AsyncJobNameData, + body: FoldersBody, +) -> AsyncJobGet: + task_uuid = await get_celery_client(app).send_task( + deep_copy_files_from_project.__name__, + task_context=job_id_data.model_dump(), + user_id=job_id_data.user_id, + body=body, + ) + + return AsyncJobGet(job_id=task_uuid) diff --git a/services/storage/src/simcore_service_storage/api/rpc/routes.py b/services/storage/src/simcore_service_storage/api/rpc/routes.py index 799a2b4e839..8cb3c8a95e0 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/routes.py +++ b/services/storage/src/simcore_service_storage/api/rpc/routes.py @@ -6,7 +6,7 @@ from servicelib.rabbitmq import RPCRouter from ...modules.rabbitmq import get_rabbitmq_rpc_server -from . import _async_jobs, _data_export, _paths +from . import _async_jobs, _data_export, _paths, _simcore_s3 _logger = logging.getLogger(__name__) @@ -15,6 +15,7 @@ _async_jobs.router, _data_export.router, _paths.router, + _simcore_s3.router, ] diff --git a/services/storage/src/simcore_service_storage/modules/celery/models.py b/services/storage/src/simcore_service_storage/modules/celery/models.py index 6f2193b2da6..2cb3f0cafe1 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/models.py +++ b/services/storage/src/simcore_service_storage/modules/celery/models.py @@ -3,7 +3,7 @@ from uuid import UUID from models_library.progress_bar import ProgressReport -from pydantic import BaseModel, model_validator +from pydantic import BaseModel TaskContext: TypeAlias = dict[str, Any] TaskID: TypeAlias = str @@ -56,7 +56,7 @@ class TaskStatus(BaseModel): def is_done(self) -> bool: return self.task_state in _TASK_DONE - @model_validator(mode="after") + # @model_validator(mode="after") This does not work MB def _check_consistency(self) -> Self: value = self.progress_report.actual_value min_value = 0.0 diff --git a/services/storage/src/simcore_service_storage/modules/long_running_tasks.py b/services/storage/src/simcore_service_storage/modules/long_running_tasks.py index d0c929f7adc..229c1bd3fef 100644 --- a/services/storage/src/simcore_service_storage/modules/long_running_tasks.py +++ b/services/storage/src/simcore_service_storage/modules/long_running_tasks.py @@ -1,5 +1,3 @@ -import asyncio - from fastapi import FastAPI from servicelib.fastapi.long_running_tasks._server import setup @@ -11,10 +9,3 @@ def setup_rest_api_long_running_tasks_for_uploads(app: FastAPI) -> None: app, router_prefix=f"/{API_VTAG}/futures", ) - - app.state.completed_upload_tasks = {} - - -def get_completed_upload_tasks(app: FastAPI) -> dict[str, asyncio.Task]: - assert isinstance(app.state.completed_upload_tasks, dict) # nosec - return app.state.completed_upload_tasks diff --git a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py index 6448cc0cc59..8e3c8026f97 100644 --- a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py +++ b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py @@ -35,9 +35,9 @@ ) from models_library.users import UserID from pydantic import AnyUrl, ByteSize, NonNegativeInt, TypeAdapter, ValidationError -from servicelib.aiohttp.long_running_tasks.server import TaskProgress from servicelib.fastapi.client_session import get_client_session from servicelib.logging_utils import log_context +from servicelib.progress_bar import ProgressBarData from servicelib.utils import ensure_ends_with, limited_gather from simcore_postgres_database.utils_repos import transaction_context from sqlalchemy.ext.asyncio import AsyncEngine @@ -77,7 +77,7 @@ from .modules.db.projects import ProjectRepository from .modules.db.tokens import TokenRepository from .modules.s3 import get_s3_client -from .utils.s3_utils import S3TransferDataCB, update_task_progress +from .utils.s3_utils import S3TransferDataCB from .utils.simcore_s3_dsm_utils import ( compute_file_id_prefix, expand_directory, @@ -751,7 +751,7 @@ async def deep_copy_project_simcore_s3( src_project: dict[str, Any], dst_project: dict[str, Any], node_mapping: dict[NodeID, NodeID], - task_progress: TaskProgress | None = None, + task_progress: ProgressBarData, ) -> None: src_project_uuid: ProjectID = ProjectID(src_project["uuid"]) dst_project_uuid: ProjectID = ProjectID(dst_project["uuid"]) @@ -761,7 +761,7 @@ async def deep_copy_project_simcore_s3( msg=f"{src_project_uuid} -> {dst_project_uuid}: " "Step 1: check access rights (read of src and write of dst)", ): - update_task_progress(task_progress, "Checking study access rights...") + task_progress.description = "Checking study access rights..." for prj_uuid in [src_project_uuid, dst_project_uuid]: if not await ProjectRepository.instance( @@ -789,8 +789,8 @@ async def deep_copy_project_simcore_s3( msg=f"{src_project_uuid} -> {dst_project_uuid}:" " Step 2: collect what to copy", ): - update_task_progress( - task_progress, f"Collecting files of '{src_project['name']}'..." + task_progress.description = ( + f"Collecting files of '{src_project['name']}'..." ) src_project_files = await FileMetaDataRepository.instance( @@ -811,71 +811,73 @@ async def deep_copy_project_simcore_s3( limit=_MAX_PARALLEL_S3_CALLS, ) total_num_of_files = sum(n for _, n in sizes_and_num_files) - src_project_total_data_size: ByteSize = TypeAdapter( - ByteSize - ).validate_python(sum(n for n, _ in sizes_and_num_files)) - with log_context( - _logger, - logging.INFO, - msg=f"{src_project_uuid} -> {dst_project_uuid}:" - " Step 3.1: prepare copy tasks for files referenced from simcore", - ): - copy_tasks = [] - s3_transfered_data_cb = S3TransferDataCB( - task_progress, - src_project_total_data_size, - task_progress_message_prefix=f"Copying {total_num_of_files} files to '{dst_project['name']}'", + src_project_total_data_size = TypeAdapter(ByteSize).validate_python( + sum(n for n, _ in sizes_and_num_files) ) - for src_fmd in src_project_files: - if not src_fmd.node_id or (src_fmd.location_id != self.location_id): - msg = ( - "This is not foreseen, stem from old decisions, and needs to " - f"be implemented if needed. Faulty metadata: {src_fmd=}" - ) - raise NotImplementedError(msg) - - if new_node_id := node_mapping.get(src_fmd.node_id): - copy_tasks.append( - self._copy_path_s3_s3( - user_id, - src_fmd=src_fmd, - dst_file_id=TypeAdapter(SimcoreS3FileID).validate_python( - f"{dst_project_uuid}/{new_node_id}/{src_fmd.object_name.split('/', maxsplit=2)[-1]}" - ), - bytes_transfered_cb=s3_transfered_data_cb.copy_transfer_cb, + + async with S3TransferDataCB( + task_progress, + src_project_total_data_size, + task_progress_message_prefix=f"Copying {total_num_of_files} files to '{dst_project['name']}'", + ) as s3_transfered_data_cb: + with log_context( + _logger, + logging.INFO, + msg=f"{src_project_uuid} -> {dst_project_uuid}:" + " Step 3.1: prepare copy tasks for files referenced from simcore", + ): + copy_tasks = [] + for src_fmd in src_project_files: + if not src_fmd.node_id or (src_fmd.location_id != self.location_id): + msg = ( + "This is not foreseen, stem from old decisions, and needs to " + f"be implemented if needed. Faulty metadata: {src_fmd=}" ) - ) - with log_context( - _logger, - logging.INFO, - msg=f"{src_project_uuid} -> {dst_project_uuid}:" - " Step 3.1: prepare copy tasks for files referenced from DAT-CORE", - ): - for node_id, node in dst_project.get("workbench", {}).items(): - copy_tasks.extend( - [ - self._copy_file_datcore_s3( - user_id=user_id, - source_uuid=output["path"], - dest_project_id=dst_project_uuid, - dest_node_id=NodeID(node_id), - file_storage_link=output, - bytes_transfered_cb=s3_transfered_data_cb.upload_transfer_cb, + raise NotImplementedError(msg) + + if new_node_id := node_mapping.get(src_fmd.node_id): + copy_tasks.append( + self._copy_path_s3_s3( + user_id, + src_fmd=src_fmd, + dst_file_id=TypeAdapter( + SimcoreS3FileID + ).validate_python( + f"{dst_project_uuid}/{new_node_id}/{src_fmd.object_name.split('/', maxsplit=2)[-1]}" + ), + bytes_transfered_cb=s3_transfered_data_cb.copy_transfer_cb, + ) ) - for output in node.get("outputs", {}).values() - if isinstance(output, dict) - and (int(output.get("store", self.location_id)) == DATCORE_ID) - ] - ) - with log_context( - _logger, - logging.INFO, - msg=f"{src_project_uuid} -> {dst_project_uuid}: Step 3.3: effective copying {len(copy_tasks)} files", - ): - await limited_gather(*copy_tasks, limit=MAX_CONCURRENT_S3_TASKS) - - # ensure the full size is reported - s3_transfered_data_cb.finalize_transfer() + with log_context( + _logger, + logging.INFO, + msg=f"{src_project_uuid} -> {dst_project_uuid}:" + " Step 3.1: prepare copy tasks for files referenced from DAT-CORE", + ): + for node_id, node in dst_project.get("workbench", {}).items(): + copy_tasks.extend( + [ + self._copy_file_datcore_s3( + user_id=user_id, + source_uuid=output["path"], + dest_project_id=dst_project_uuid, + dest_node_id=NodeID(node_id), + file_storage_link=output, + bytes_transfered_cb=s3_transfered_data_cb.upload_transfer_cb, + ) + for output in node.get("outputs", {}).values() + if isinstance(output, dict) + and ( + int(output.get("store", self.location_id)) == DATCORE_ID + ) + ] + ) + with log_context( + _logger, + logging.INFO, + msg=f"{src_project_uuid} -> {dst_project_uuid}: Step 3.3: effective copying {len(copy_tasks)} files", + ): + await limited_gather(*copy_tasks, limit=MAX_CONCURRENT_S3_TASKS) async def _get_size_and_num_files( self, fmd: FileMetaDataAtDB diff --git a/services/storage/src/simcore_service_storage/utils/s3_utils.py b/services/storage/src/simcore_service_storage/utils/s3_utils.py index f40d33d531f..3fcb17d0c45 100644 --- a/services/storage/src/simcore_service_storage/utils/s3_utils.py +++ b/services/storage/src/simcore_service_storage/utils/s3_utils.py @@ -1,51 +1,61 @@ +import asyncio +import datetime import logging from collections import defaultdict from dataclasses import dataclass, field from pydantic import ByteSize, TypeAdapter -from servicelib.aiohttp.long_running_tasks.server import ( - ProgressMessage, - ProgressPercent, - TaskProgress, -) +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task +from servicelib.progress_bar import ProgressBarData _logger = logging.getLogger(__name__) -def update_task_progress( - task_progress: TaskProgress | None, - message: ProgressMessage | None = None, - progress: ProgressPercent | None = None, -) -> None: - _logger.debug("%s [%s]", message or "", progress or "n/a") - if task_progress: - task_progress.update(message=message, percent=progress) - - @dataclass class S3TransferDataCB: - task_progress: TaskProgress | None + task_progress: ProgressBarData total_bytes_to_transfer: ByteSize task_progress_message_prefix: str = "" _total_bytes_copied: int = 0 _file_total_bytes_copied: dict[str, int] = field( default_factory=lambda: defaultdict(int) ) + _update_task_event: asyncio.Event = field(default_factory=asyncio.Event) + _async_update_periodic_task: asyncio.Task | None = None def __post_init__(self) -> None: + self._async_update_periodic_task = create_periodic_task( + self._async_update, + interval=datetime.timedelta(seconds=0.2), + task_name="s3_transfer_cb_update", + ) self._update() - def _update(self) -> None: - update_task_progress( - self.task_progress, + async def __aenter__(self) -> "S3TransferDataCB": + return self + + async def __aexit__(self, exc_type, exc_value, traceback) -> None: + self.finalize_transfer() + await asyncio.sleep(0) + assert self._async_update_periodic_task # nosec + await cancel_wait_task(self._async_update_periodic_task) + + async def _async_update(self) -> None: + await self._update_task_event.wait() + self._update_task_event.clear() + self.task_progress.description = ( f"{self.task_progress_message_prefix} - " - f"{self.total_bytes_to_transfer.human_readable()}", - ProgressPercent( - min(self._total_bytes_copied, self.total_bytes_to_transfer) - / (self.total_bytes_to_transfer or 1) - ), + f"{self.total_bytes_to_transfer.human_readable()}" + ) + await self.task_progress.set_( + min(self._total_bytes_copied, self.total_bytes_to_transfer) + / (self.total_bytes_to_transfer or 1) ) + def _update(self) -> None: + self._update_task_event.set() + def finalize_transfer(self) -> None: self._total_bytes_copied = ( self.total_bytes_to_transfer - self._total_bytes_copied diff --git a/services/storage/src/simcore_service_storage/utils/utils.py b/services/storage/src/simcore_service_storage/utils/utils.py index 36fef50d268..fd53b03da85 100644 --- a/services/storage/src/simcore_service_storage/utils/utils.py +++ b/services/storage/src/simcore_service_storage/utils/utils.py @@ -1,4 +1,3 @@ -import hashlib import logging from pathlib import Path @@ -6,8 +5,6 @@ import httpx from aiohttp.typedefs import StrOrURL from aws_library.s3 import UploadID -from models_library.projects_nodes_io import StorageFileID -from models_library.users import UserID from ..constants import MAX_CHUNK_SIZE, S3_UNDEFINED_OR_EXTERNAL_MULTIPART_ID from ..models import FileMetaData, FileMetaDataAtDB @@ -68,11 +65,6 @@ def is_file_entry_valid(file_metadata: FileMetaData | FileMetaDataAtDB) -> bool: ) -def create_upload_completion_task_name(user_id: UserID, file_id: StorageFileID) -> str: - the_hash = hashlib.sha256(f"{user_id}_{file_id}".encode()).hexdigest() - return f"upload_complete_task_{the_hash}" - - def is_valid_managed_multipart_upload(upload_id: UploadID | None) -> bool: """the upload ID is valid (created by storage service) AND internally managed by storage (e.g. PRESIGNED multipart upload) diff --git a/services/storage/tests/conftest.py b/services/storage/tests/conftest.py index cfd4917703a..16b32ca80ad 100644 --- a/services/storage/tests/conftest.py +++ b/services/storage/tests/conftest.py @@ -76,9 +76,6 @@ on_worker_shutdown, ) from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker -from simcore_service_storage.modules.long_running_tasks import ( - get_completed_upload_tasks, -) from simcore_service_storage.modules.s3 import get_s3_client from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager from sqlalchemy import literal_column @@ -481,8 +478,8 @@ async def with_versioning_enabled( async def create_empty_directory( create_simcore_file_id: Callable[[ProjectID, NodeID, str], SimcoreS3FileID], create_upload_file_link_v2: Callable[..., Awaitable[FileUploadSchema]], - initialized_app: FastAPI, client: httpx.AsyncClient, + with_storage_celery_worker: CeleryTaskQueueWorker, ) -> Callable[[str, ProjectID, NodeID], Awaitable[SimcoreS3FileID]]: async def _directory_creator( dir_name: str, project_id: ProjectID, node_id: NodeID @@ -515,8 +512,6 @@ async def _directory_creator( assert file_upload_complete_response state_url = URL(f"{file_upload_complete_response.links.state}").relative() - # check that it finished updating - get_completed_upload_tasks(initialized_app).clear() # now check for the completion async for attempt in AsyncRetrying( reraise=True, @@ -1023,8 +1018,6 @@ async def with_storage_celery_worker_controller( ) as worker: worker_init.send(sender=worker) - # NOTE: wait for worker to be ready (sic) - await asyncio.sleep(1) yield worker worker_shutdown.send(sender=worker) diff --git a/services/storage/tests/unit/test_handlers_files.py b/services/storage/tests/unit/test_handlers_files.py index 3304e226dd9..7362211c3b5 100644 --- a/services/storage/tests/unit/test_handlers_files.py +++ b/services/storage/tests/unit/test_handlers_files.py @@ -30,12 +30,10 @@ FileUploadCompleteFutureResponse, FileUploadCompleteResponse, FileUploadCompleteState, - FileUploadCompletionBody, FileUploadSchema, LinkType, PresignedLink, SoftCopyBody, - UploadedPart, ) from models_library.projects import ProjectID from models_library.projects_nodes_io import LocationID, NodeID, SimcoreS3FileID @@ -47,7 +45,7 @@ from pytest_simcore.helpers.httpx_assert_checks import assert_status from pytest_simcore.helpers.logging_tools import log_context from pytest_simcore.helpers.parametrizations import byte_size_ids -from pytest_simcore.helpers.s3 import upload_file_part, upload_file_to_presigned_link +from pytest_simcore.helpers.s3 import upload_file_part from pytest_simcore.helpers.storage_utils import FileIDDict, ProjectWithFilesParams from pytest_simcore.helpers.storage_utils_file_meta_data import ( assert_file_meta_data_in_db, @@ -55,9 +53,7 @@ from servicelib.aiohttp import status from simcore_service_storage.constants import S3_UNDEFINED_OR_EXTERNAL_MULTIPART_ID from simcore_service_storage.models import FileDownloadResponse, S3BucketName, UploadID -from simcore_service_storage.modules.long_running_tasks import ( - get_completed_upload_tasks, -) +from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager from sqlalchemy.ext.asyncio import AsyncEngine from tenacity.asyncio import AsyncRetrying @@ -608,10 +604,7 @@ def complex_file_name(faker: Faker) -> str: "file_size", [ (TypeAdapter(ByteSize).validate_python("1Mib")), - (TypeAdapter(ByteSize).validate_python("500Mib")), - pytest.param( - TypeAdapter(ByteSize).validate_python("5Gib"), marks=pytest.mark.heavy_load - ), + (TypeAdapter(ByteSize).validate_python("127Mib")), ], ids=byte_size_ids, ) @@ -623,114 +616,6 @@ async def test_upload_real_file( await upload_file(file_size, complex_file_name) -@pytest.mark.parametrize( - "location_id", - [SimcoreS3DataManager.get_location_id()], - ids=[SimcoreS3DataManager.get_location_name()], - indirect=True, -) -@pytest.mark.parametrize( - "file_size", - [ - (TypeAdapter(ByteSize).validate_python("1Mib")), - (TypeAdapter(ByteSize).validate_python("117Mib")), - ], - ids=byte_size_ids, -) -async def test_upload_real_file_with_emulated_storage_restart_after_completion_was_called( - complex_file_name: str, - file_size: ByteSize, - initialized_app: FastAPI, - client: httpx.AsyncClient, - user_id: UserID, - project_id: ProjectID, - node_id: NodeID, - location_id: LocationID, - create_simcore_file_id: Callable[[ProjectID, NodeID, str], SimcoreS3FileID], - create_file_of_size: Callable[[ByteSize, str | None], Path], - create_upload_file_link_v2: Callable[..., Awaitable[FileUploadSchema]], - sqlalchemy_async_engine: AsyncEngine, - storage_s3_client: SimcoreS3API, - storage_s3_bucket: S3BucketName, -): - """what does that mean? - storage runs the completion tasks in the background, - if after running the completion task, storage restarts then the task is lost. - Nevertheless the client still has a reference to the completion future and shall be able - to ask for its status""" - - file = create_file_of_size(file_size, complex_file_name) - file_id = create_simcore_file_id(project_id, node_id, complex_file_name) - file_upload_link = await create_upload_file_link_v2( - file_id, link_type="PRESIGNED", file_size=file_size - ) - # upload the file - part_to_etag: list[UploadedPart] = await upload_file_to_presigned_link( - file, file_upload_link - ) - # complete the upload - complete_url = URL(f"{file_upload_link.links.complete_upload}").relative() - response = await client.post( - f"{complete_url}", - json=jsonable_encoder(FileUploadCompletionBody(parts=part_to_etag)), - ) - response.raise_for_status() - file_upload_complete_response, error = assert_status( - response, status.HTTP_202_ACCEPTED, FileUploadCompleteResponse - ) - assert not error - assert file_upload_complete_response - state_url = URL(f"{file_upload_complete_response.links.state}").relative() - - # here we do not check now for the state completion. instead we simulate a restart where the tasks disappear - get_completed_upload_tasks(initialized_app).clear() - # now check for the completion - completion_etag = None - async for attempt in AsyncRetrying( - reraise=True, - wait=wait_fixed(1), - stop=stop_after_delay(60), - retry=retry_if_exception_type(AssertionError), - ): - with ( - attempt, - log_context( - logging.INFO, - f"waiting for upload completion {state_url=}, {attempt.retry_state.attempt_number}", - ) as ctx, - ): - response = await client.post(f"{state_url}") - future, error = assert_status( - response, status.HTTP_200_OK, FileUploadCompleteFutureResponse - ) - assert not error - assert future - assert future.state == FileUploadCompleteState.OK - assert future.e_tag is not None - completion_etag = future.e_tag - ctx.logger.info( - "%s", - f"--> done waiting, data is completely uploaded [{attempt.retry_state.retry_object.statistics}]", - ) - # check the entry in db now has the correct file size, and the upload id is gone - await assert_file_meta_data_in_db( - sqlalchemy_async_engine, - file_id=file_id, - expected_entry_exists=True, - expected_file_size=file_size, - expected_upload_id=False, - expected_upload_expiration_date=False, - expected_sha256_checksum=None, - ) - # check the file is in S3 for real - s3_metadata = await storage_s3_client.get_object_metadata( - bucket=storage_s3_bucket, object_key=file_id - ) - assert s3_metadata.size == file_size - assert s3_metadata.last_modified - assert s3_metadata.e_tag == completion_etag - - @pytest.mark.parametrize( "location_id", [SimcoreS3DataManager.get_location_id()], @@ -751,7 +636,7 @@ async def test_upload_of_single_presigned_link_lazily_update_database_on_get( get_file_meta_data: Callable[..., Awaitable[FileMetaDataGet]], s3_client: S3Client, ): - file_size = TypeAdapter(ByteSize).validate_python("500Mib") + file_size = TypeAdapter(ByteSize).validate_python("127Mib") file_name = faker.file_name() # create a file file = create_file_of_size(file_size, file_name) @@ -798,6 +683,7 @@ async def test_upload_real_file_with_s3_client( node_id: NodeID, faker: Faker, s3_client: S3Client, + with_storage_celery_worker: CeleryTaskQueueWorker, ): file_size = TypeAdapter(ByteSize).validate_python("500Mib") file_name = faker.file_name() diff --git a/services/storage/tests/unit/test_handlers_simcore_s3.py b/services/storage/tests/unit/test_handlers_simcore_s3.py index ff43db81f48..6f361c1d7e9 100644 --- a/services/storage/tests/unit/test_handlers_simcore_s3.py +++ b/services/storage/tests/unit/test_handlers_simcore_s3.py @@ -5,80 +5,38 @@ # pylint:disable=no-name-in-module # pylint:disable=too-many-nested-blocks -import asyncio -import logging import sys from collections.abc import Awaitable, Callable -from copy import deepcopy from pathlib import Path -from typing import Any, Literal +from typing import Literal import httpx import pytest -import sqlalchemy as sa -from aws_library.s3 import SimcoreS3API from faker import Faker from fastapi import FastAPI from models_library.api_schemas_storage.storage_schemas import ( FileMetaDataGet, - FoldersBody, ) from models_library.basic_types import SHA256Str from models_library.projects import ProjectID -from models_library.projects_nodes_io import NodeID, NodeIDStr, SimcoreS3FileID +from models_library.projects_nodes_io import SimcoreS3FileID from models_library.users import UserID from models_library.utils.fastapi_encoders import jsonable_encoder from pydantic import ByteSize, TypeAdapter from pytest_simcore.helpers.fastapi import url_from_operation_id from pytest_simcore.helpers.httpx_assert_checks import assert_status -from pytest_simcore.helpers.logging_tools import log_context -from pytest_simcore.helpers.storage_utils import ( - FileIDDict, - ProjectWithFilesParams, - get_updated_project, -) -from pytest_simcore.helpers.storage_utils_file_meta_data import ( - assert_file_meta_data_in_db, -) -from pytest_simcore.helpers.storage_utils_project import clone_project_data from servicelib.aiohttp import status -from servicelib.fastapi.long_running_tasks.client import long_running_task_request from settings_library.s3 import S3Settings -from simcore_postgres_database.storage_models import file_meta_data from simcore_service_storage.models import SearchFilesQueryParams from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager -from sqlalchemy.ext.asyncio import AsyncEngine -from yarl import URL -pytest_simcore_core_services_selection = ["postgres"] -pytest_simcore_ops_services_selection = ["adminer", "minio"] +pytest_simcore_core_services_selection = ["postgres", "rabbit"] +pytest_simcore_ops_services_selection = ["adminer"] CURRENT_DIR = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent -@pytest.fixture -def mock_datcore_download(mocker, client): - # Use to mock downloading from DATCore - async def _fake_download_to_file_or_raise(session, url, dest_path): - with log_context(logging.INFO, f"Faking download: {url} -> {dest_path}"): - Path(dest_path).write_text( - "FAKE: test_create_and_delete_folders_from_project" - ) - - mocker.patch( - "simcore_service_storage.simcore_s3_dsm.download_to_file_or_raise", - side_effect=_fake_download_to_file_or_raise, - autospec=True, - ) - - mocker.patch( - "simcore_service_storage.simcore_s3_dsm.datcore_adapter.get_file_download_presigned_link", - autospec=True, - return_value=URL("https://httpbin.org/image"), - ) - - async def test_simcore_s3_access_returns_default( initialized_app: FastAPI, client: httpx.AsyncClient ): @@ -92,368 +50,6 @@ async def test_simcore_s3_access_returns_default( assert received_settings -async def _request_copy_folders( - initialized_app: FastAPI, - client: httpx.AsyncClient, - user_id: UserID, - source_project: dict[str, Any], - dst_project: dict[str, Any], - nodes_map: dict[NodeID, NodeID], -) -> dict[str, Any]: - url = url_from_operation_id( - client, initialized_app, "copy_folders_from_project" - ).with_query(user_id=user_id) - - with log_context( - logging.INFO, - f"Copying folders from {source_project['uuid']} to {dst_project['uuid']}", - ) as ctx: - async for lr_task in long_running_task_request( - client, - url, - json=jsonable_encoder( - FoldersBody( - source=source_project, destination=dst_project, nodes_map=nodes_map - ) - ), - ): - ctx.logger.info("%s", f"<-- current state is {lr_task.progress=}") - if lr_task.done(): - return await lr_task.result() - - pytest.fail(reason="Copy folders failed!") - - -async def test_copy_folders_from_non_existing_project( - initialized_app: FastAPI, - client: httpx.AsyncClient, - user_id: UserID, - create_project: Callable[[], Awaitable[dict[str, Any]]], - faker: Faker, -): - src_project = await create_project() - incorrect_src_project = deepcopy(src_project) - incorrect_src_project["uuid"] = faker.uuid4() - dst_project = await create_project() - incorrect_dst_project = deepcopy(dst_project) - incorrect_dst_project["uuid"] = faker.uuid4() - - with pytest.raises(httpx.HTTPStatusError, match="404") as exc_info: - await _request_copy_folders( - initialized_app, - client, - user_id, - incorrect_src_project, - dst_project, - nodes_map={}, - ) - assert_status( - exc_info.value.response, - status.HTTP_404_NOT_FOUND, - None, - expected_msg=f"{incorrect_src_project['uuid']} was not found", - ) - - with pytest.raises(httpx.HTTPStatusError, match="404") as exc_info: - await _request_copy_folders( - initialized_app, - client, - user_id, - src_project, - incorrect_dst_project, - nodes_map={}, - ) - assert_status( - exc_info.value.response, - status.HTTP_404_NOT_FOUND, - None, - expected_msg=f"{incorrect_dst_project['uuid']} was not found", - ) - - -async def test_copy_folders_from_empty_project( - initialized_app: FastAPI, - client: httpx.AsyncClient, - user_id: UserID, - create_project: Callable[[], Awaitable[dict[str, Any]]], - sqlalchemy_async_engine: AsyncEngine, - storage_s3_client: SimcoreS3API, -): - # we will copy from src to dst - src_project = await create_project() - dst_project = await create_project() - - data = await _request_copy_folders( - initialized_app, - client, - user_id, - src_project, - dst_project, - nodes_map={}, - ) - assert data == jsonable_encoder(dst_project) - # check there is nothing in the dst project - async with sqlalchemy_async_engine.connect() as conn: - num_entries = await conn.scalar( - sa.select(sa.func.count()) - .select_from(file_meta_data) - .where(file_meta_data.c.project_id == dst_project["uuid"]) - ) - assert num_entries == 0 - - -@pytest.fixture -def short_dsm_cleaner_interval(monkeypatch: pytest.MonkeyPatch) -> int: - monkeypatch.setenv("STORAGE_CLEANER_INTERVAL_S", "1") - return 1 - - -@pytest.mark.parametrize( - "location_id", - [SimcoreS3DataManager.get_location_id()], - ids=[SimcoreS3DataManager.get_location_name()], - indirect=True, -) -@pytest.mark.parametrize( - "project_params", - [ - ProjectWithFilesParams( - num_nodes=1, - allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("210Mib"),), - allowed_file_checksums=( - TypeAdapter(SHA256Str).validate_python( - "0b3216d95ec5a36c120ba16c88911dcf5ff655925d0fbdbc74cf95baf86de6fc" - ), - ), - workspace_files_count=0, - ), - ], - ids=str, -) -async def test_copy_folders_from_valid_project_with_one_large_file( - initialized_app: FastAPI, - short_dsm_cleaner_interval: int, - client: httpx.AsyncClient, - user_id: UserID, - create_project: Callable[[], Awaitable[dict[str, Any]]], - sqlalchemy_async_engine: AsyncEngine, - random_project_with_files: Callable[ - [ProjectWithFilesParams], - Awaitable[ - tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, FileIDDict]]] - ], - ], - project_params: ProjectWithFilesParams, -): - # 1. create a src project with 1 large file - src_project, src_projects_list = await random_project_with_files(project_params) - # 2. create a dst project without files - dst_project, nodes_map = clone_project_data(src_project) - dst_project = await create_project(**dst_project) - # copy the project files - data = await _request_copy_folders( - initialized_app, - client, - user_id, - src_project, - dst_project, - nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, - ) - assert data == jsonable_encoder( - await get_updated_project(sqlalchemy_async_engine, dst_project["uuid"]) - ) - # check that file meta data was effectively copied - for src_node_id in src_projects_list: - dst_node_id = nodes_map.get( - TypeAdapter(NodeIDStr).validate_python(f"{src_node_id}") - ) - assert dst_node_id - for src_file_id, src_file in src_projects_list[src_node_id].items(): - path: Any = src_file["path"] - assert isinstance(path, Path) - checksum: Any = src_file["sha256_checksum"] - assert isinstance(checksum, str) - await assert_file_meta_data_in_db( - sqlalchemy_async_engine, - file_id=TypeAdapter(SimcoreS3FileID).validate_python( - f"{src_file_id}".replace( - f"{src_project['uuid']}", dst_project["uuid"] - ).replace(f"{src_node_id}", f"{dst_node_id}") - ), - expected_entry_exists=True, - expected_file_size=path.stat().st_size, - expected_upload_id=None, - expected_upload_expiration_date=None, - expected_sha256_checksum=TypeAdapter(SHA256Str).validate_python( - checksum - ), - ) - - -@pytest.mark.parametrize( - "location_id", - [SimcoreS3DataManager.get_location_id()], - ids=[SimcoreS3DataManager.get_location_name()], - indirect=True, -) -@pytest.mark.parametrize( - "project_params", - [ - ProjectWithFilesParams( - num_nodes=12, - allowed_file_sizes=( - TypeAdapter(ByteSize).validate_python("7Mib"), - TypeAdapter(ByteSize).validate_python("110Mib"), - TypeAdapter(ByteSize).validate_python("1Mib"), - ), - allowed_file_checksums=( - TypeAdapter(SHA256Str).validate_python( - "311e2e130d83cfea9c3b7560699c221b0b7f9e5d58b02870bd52b695d8b4aabd" - ), - TypeAdapter(SHA256Str).validate_python( - "08e297db979d3c84f6b072c2a1e269e8aa04e82714ca7b295933a0c9c0f62b2e" - ), - TypeAdapter(SHA256Str).validate_python( - "488f3b57932803bbf644593bd46d95599b1d4da1d63bc020d7ebe6f1c255f7f3" - ), - ), - workspace_files_count=0, - ), - ], - ids=str, -) -async def test_copy_folders_from_valid_project( - short_dsm_cleaner_interval: int, - initialized_app: FastAPI, - client: httpx.AsyncClient, - user_id: UserID, - create_project: Callable[[], Awaitable[dict[str, Any]]], - create_simcore_file_id: Callable[[ProjectID, NodeID, str], SimcoreS3FileID], - sqlalchemy_async_engine: AsyncEngine, - random_project_with_files: Callable[ - [ProjectWithFilesParams], - Awaitable[ - tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, FileIDDict]]] - ], - ], - project_params: ProjectWithFilesParams, -): - # 1. create a src project with some files - src_project, src_projects_list = await random_project_with_files(project_params) - # 2. create a dst project without files - dst_project, nodes_map = clone_project_data(src_project) - dst_project = await create_project(**dst_project) - # copy the project files - data = await _request_copy_folders( - initialized_app, - client, - user_id, - src_project, - dst_project, - nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, - ) - assert data == jsonable_encoder( - await get_updated_project(sqlalchemy_async_engine, dst_project["uuid"]) - ) - - # check that file meta data was effectively copied - for src_node_id in src_projects_list: - dst_node_id = nodes_map.get( - TypeAdapter(NodeIDStr).validate_python(f"{src_node_id}") - ) - assert dst_node_id - for src_file_id, src_file in src_projects_list[src_node_id].items(): - path: Any = src_file["path"] - assert isinstance(path, Path) - checksum: Any = src_file["sha256_checksum"] - assert isinstance(checksum, str) - await assert_file_meta_data_in_db( - sqlalchemy_async_engine, - file_id=TypeAdapter(SimcoreS3FileID).validate_python( - f"{src_file_id}".replace( - f"{src_project['uuid']}", dst_project["uuid"] - ).replace(f"{src_node_id}", f"{dst_node_id}") - ), - expected_entry_exists=True, - expected_file_size=path.stat().st_size, - expected_upload_id=None, - expected_upload_expiration_date=None, - expected_sha256_checksum=TypeAdapter(SHA256Str).validate_python( - checksum - ), - ) - - -async def _create_and_delete_folders_from_project( - user_id: UserID, - project: dict[str, Any], - initialized_app: FastAPI, - client: httpx.AsyncClient, - project_db_creator: Callable, - check_list_files: bool, -) -> None: - destination_project, nodes_map = clone_project_data(project) - await project_db_creator(**destination_project) - - # creating a copy - data = await _request_copy_folders( - initialized_app, - client, - user_id, - project, - destination_project, - nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, - ) - - # data should be equal to the destination project, and all store entries should point to simcore.s3 - # NOTE: data is jsonized where destination project is not! - assert jsonable_encoder(destination_project) == data - - project_id = data["uuid"] - - # list data to check all is here - - if check_list_files: - url = url_from_operation_id( - client, - initialized_app, - "list_files_metadata", - location_id=f"{SimcoreS3DataManager.get_location_id()}", - ).with_query(user_id=f"{user_id}", uuid_filter=f"{project_id}") - - resp = await client.get(f"{url}") - data, error = assert_status(resp, status.HTTP_200_OK, list[FileMetaDataGet]) - assert not error - # DELETING - url = url_from_operation_id( - client, - initialized_app, - "delete_folders_of_project", - folder_id=project_id, - ).with_query(user_id=f"{user_id}") - resp = await client.delete(f"{url}") - assert_status(resp, status.HTTP_204_NO_CONTENT, None) - - # list data is gone - if check_list_files: - url = url_from_operation_id( - client, - initialized_app, - "list_files_metadata", - location_id=f"{SimcoreS3DataManager.get_location_id()}", - ).with_query(user_id=f"{user_id}", uuid_filter=f"{project_id}") - resp = await client.get(f"{url}") - data, error = assert_status(resp, status.HTTP_200_OK, list[FileMetaDataGet]) - assert not error - assert not data - - -@pytest.fixture -def set_log_levels_for_noisy_libraries() -> None: - # Reduce the log level for 'werkzeug' - logging.getLogger("werkzeug").setLevel(logging.WARNING) - - async def test_connect_to_external( set_log_levels_for_noisy_libraries: None, initialized_app: FastAPI, @@ -472,101 +68,6 @@ async def test_connect_to_external( print(data) -@pytest.mark.parametrize( - "location_id", - [SimcoreS3DataManager.get_location_id()], - ids=[SimcoreS3DataManager.get_location_name()], - indirect=True, -) -@pytest.mark.parametrize( - "project_params", - [ - ProjectWithFilesParams( - num_nodes=3, - allowed_file_sizes=( - TypeAdapter(ByteSize).validate_python("7Mib"), - TypeAdapter(ByteSize).validate_python("110Mib"), - TypeAdapter(ByteSize).validate_python("1Mib"), - ), - workspace_files_count=0, - ) - ], -) -async def test_create_and_delete_folders_from_project( - set_log_levels_for_noisy_libraries: None, - initialized_app: FastAPI, - client: httpx.AsyncClient, - user_id: UserID, - create_project: Callable[..., Awaitable[dict[str, Any]]], - with_random_project_with_files: tuple[ - dict[str, Any], - dict[NodeID, dict[SimcoreS3FileID, dict[str, Path | str]]], - ], - mock_datcore_download, -): - project_in_db, _ = with_random_project_with_files - await _create_and_delete_folders_from_project( - user_id, - project_in_db, - initialized_app, - client, - create_project, - check_list_files=True, - ) - - -@pytest.mark.flaky(max_runs=3) -@pytest.mark.parametrize( - "location_id", - [SimcoreS3DataManager.get_location_id()], - ids=[SimcoreS3DataManager.get_location_name()], - indirect=True, -) -@pytest.mark.parametrize( - "project_params", - [ - ProjectWithFilesParams( - num_nodes=3, - allowed_file_sizes=( - TypeAdapter(ByteSize).validate_python("7Mib"), - TypeAdapter(ByteSize).validate_python("110Mib"), - TypeAdapter(ByteSize).validate_python("1Mib"), - ), - workspace_files_count=0, - ) - ], -) -@pytest.mark.parametrize("num_concurrent_calls", [50]) -async def test_create_and_delete_folders_from_project_burst( - set_log_levels_for_noisy_libraries: None, - initialized_app: FastAPI, - client: httpx.AsyncClient, - user_id: UserID, - with_random_project_with_files: tuple[ - dict[str, Any], - dict[NodeID, dict[SimcoreS3FileID, dict[str, Path | str]]], - ], - create_project: Callable[..., Awaitable[dict[str, Any]]], - mock_datcore_download, - num_concurrent_calls: int, -): - project_in_db, _ = with_random_project_with_files - # NOTE: here the point is to NOT have a limit on the number of calls!! - await asyncio.gather( - *[ - _create_and_delete_folders_from_project( - user_id, - project_in_db, - initialized_app, - client, - create_project, - check_list_files=False, - ) - for _ in range(num_concurrent_calls) - ] - ) - - @pytest.fixture async def uploaded_file_ids( faker: Faker, diff --git a/services/storage/tests/unit/test_rpc_handlers_simcore_s3.py b/services/storage/tests/unit/test_rpc_handlers_simcore_s3.py new file mode 100644 index 00000000000..846fea718ed --- /dev/null +++ b/services/storage/tests/unit/test_rpc_handlers_simcore_s3.py @@ -0,0 +1,503 @@ +# pylint:disable=no-name-in-module +# pylint:disable=protected-access +# pylint:disable=redefined-outer-name +# pylint:disable=too-many-arguments +# pylint:disable=too-many-positional-arguments +# pylint:disable=unused-argument +# pylint:disable=unused-variable + +import asyncio +import datetime +import logging +from collections.abc import Awaitable, Callable +from copy import deepcopy +from pathlib import Path +from typing import Any + +import httpx +import pytest +import sqlalchemy as sa +from faker import Faker +from fastapi import FastAPI +from fastapi.encoders import jsonable_encoder +from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobResult +from models_library.api_schemas_rpc_async_jobs.exceptions import JobError +from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE +from models_library.api_schemas_storage.storage_schemas import ( + FileMetaDataGet, + FoldersBody, +) +from models_library.basic_types import SHA256Str +from models_library.products import ProductName +from models_library.projects_nodes_io import NodeID, NodeIDStr, SimcoreS3FileID +from models_library.users import UserID +from pydantic import ByteSize, TypeAdapter +from pytest_simcore.helpers.fastapi import url_from_operation_id +from pytest_simcore.helpers.httpx_assert_checks import assert_status +from pytest_simcore.helpers.logging_tools import log_context +from pytest_simcore.helpers.storage_utils import ( + FileIDDict, + ProjectWithFilesParams, + get_updated_project, +) +from pytest_simcore.helpers.storage_utils_file_meta_data import ( + assert_file_meta_data_in_db, +) +from pytest_simcore.helpers.storage_utils_project import clone_project_data +from servicelib.aiohttp import status +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import wait_and_get_result +from servicelib.rabbitmq.rpc_interfaces.storage.simcore_s3 import ( + copy_folders_from_project, +) +from simcore_postgres_database.storage_models import file_meta_data +from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker +from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager +from sqlalchemy.ext.asyncio import AsyncEngine +from yarl import URL + +pytest_simcore_core_services_selection = ["postgres", "rabbit"] +pytest_simcore_ops_services_selection = ["adminer"] + + +async def _request_copy_folders( + rpc_client: RabbitMQRPCClient, + user_id: UserID, + product_name: ProductName, + source_project: dict[str, Any], + dst_project: dict[str, Any], + nodes_map: dict[NodeID, NodeID], + *, + client_timeout: datetime.timedelta = datetime.timedelta(seconds=60), +) -> dict[str, Any]: + with log_context( + logging.INFO, + f"Copying folders from {source_project['uuid']} to {dst_project['uuid']}", + ) as ctx: + async_job_get, async_job_name = await copy_folders_from_project( + rpc_client, + user_id=user_id, + product_name=product_name, + body=FoldersBody( + source=source_project, destination=dst_project, nodes_map=nodes_map + ), + ) + + async for async_job_result in wait_and_get_result( + rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + method_name=copy_folders_from_project.__name__, + job_id=async_job_get.job_id, + job_id_data=async_job_name, + client_timeout=client_timeout, + ): + ctx.logger.info("%s", f"<-- current state is {async_job_result=}") + if async_job_result.done: + result = await async_job_result.result() + assert isinstance(result, AsyncJobResult) + return result.result + + pytest.fail(reason="Copy folders failed!") + + +async def test_copy_folders_from_non_existing_project( + initialized_app: FastAPI, + storage_rabbitmq_rpc_client: RabbitMQRPCClient, + user_id: UserID, + product_name: ProductName, + create_project: Callable[..., Awaitable[dict[str, Any]]], + faker: Faker, + with_storage_celery_worker: CeleryTaskQueueWorker, +): + src_project = await create_project() + incorrect_src_project = deepcopy(src_project) + incorrect_src_project["uuid"] = faker.uuid4() + dst_project = await create_project() + incorrect_dst_project = deepcopy(dst_project) + incorrect_dst_project["uuid"] = faker.uuid4() + + with pytest.raises( + JobError, match=f"Project {incorrect_src_project['uuid']} was not found" + ): + await _request_copy_folders( + storage_rabbitmq_rpc_client, + user_id, + product_name, + incorrect_src_project, + dst_project, + nodes_map={}, + ) + + with pytest.raises( + JobError, match=f"Project {incorrect_dst_project['uuid']} was not found" + ): + await _request_copy_folders( + storage_rabbitmq_rpc_client, + user_id, + product_name, + src_project, + incorrect_dst_project, + nodes_map={}, + ) + + +async def test_copy_folders_from_empty_project( + initialized_app: FastAPI, + storage_rabbitmq_rpc_client: RabbitMQRPCClient, + user_id: UserID, + product_name: ProductName, + create_project: Callable[[], Awaitable[dict[str, Any]]], + sqlalchemy_async_engine: AsyncEngine, + with_storage_celery_worker: CeleryTaskQueueWorker, +): + # we will copy from src to dst + src_project = await create_project() + dst_project = await create_project() + + data = await _request_copy_folders( + storage_rabbitmq_rpc_client, + user_id, + product_name, + src_project, + dst_project, + nodes_map={}, + ) + assert data == jsonable_encoder(dst_project) + # check there is nothing in the dst project + async with sqlalchemy_async_engine.connect() as conn: + num_entries = await conn.scalar( + sa.select(sa.func.count()) + .select_from(file_meta_data) + .where(file_meta_data.c.project_id == dst_project["uuid"]) + ) + assert num_entries == 0 + + +@pytest.fixture +def short_dsm_cleaner_interval(monkeypatch: pytest.MonkeyPatch) -> int: + monkeypatch.setenv("STORAGE_CLEANER_INTERVAL_S", "1") + return 1 + + +@pytest.mark.parametrize( + "location_id", + [SimcoreS3DataManager.get_location_id()], + ids=[SimcoreS3DataManager.get_location_name()], + indirect=True, +) +@pytest.mark.parametrize( + "project_params", + [ + ProjectWithFilesParams( + num_nodes=1, + allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("210Mib"),), + allowed_file_checksums=( + TypeAdapter(SHA256Str).validate_python( + "0b3216d95ec5a36c120ba16c88911dcf5ff655925d0fbdbc74cf95baf86de6fc" + ), + ), + workspace_files_count=0, + ), + ], + ids=str, +) +async def test_copy_folders_from_valid_project_with_one_large_file( + initialized_app: FastAPI, + short_dsm_cleaner_interval: int, + storage_rabbitmq_rpc_client: RabbitMQRPCClient, + user_id: UserID, + product_name: ProductName, + create_project: Callable[[], Awaitable[dict[str, Any]]], + sqlalchemy_async_engine: AsyncEngine, + random_project_with_files: Callable[ + [ProjectWithFilesParams], + Awaitable[ + tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, FileIDDict]]] + ], + ], + project_params: ProjectWithFilesParams, +): + # 1. create a src project with 1 large file + src_project, src_projects_list = await random_project_with_files(project_params) + # 2. create a dst project without files + dst_project, nodes_map = clone_project_data(src_project) + dst_project = await create_project(**dst_project) + # copy the project files + data = await _request_copy_folders( + storage_rabbitmq_rpc_client, + user_id, + product_name, + src_project, + dst_project, + nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, + ) + assert data == jsonable_encoder( + await get_updated_project(sqlalchemy_async_engine, dst_project["uuid"]) + ) + # check that file meta data was effectively copied + for src_node_id in src_projects_list: + dst_node_id = nodes_map.get( + TypeAdapter(NodeIDStr).validate_python(f"{src_node_id}") + ) + assert dst_node_id + for src_file_id, src_file in src_projects_list[src_node_id].items(): + path: Any = src_file["path"] + assert isinstance(path, Path) + checksum: Any = src_file["sha256_checksum"] + assert isinstance(checksum, str) + await assert_file_meta_data_in_db( + sqlalchemy_async_engine, + file_id=TypeAdapter(SimcoreS3FileID).validate_python( + f"{src_file_id}".replace( + f"{src_project['uuid']}", dst_project["uuid"] + ).replace(f"{src_node_id}", f"{dst_node_id}") + ), + expected_entry_exists=True, + expected_file_size=path.stat().st_size, + expected_upload_id=None, + expected_upload_expiration_date=None, + expected_sha256_checksum=TypeAdapter(SHA256Str).validate_python( + checksum + ), + ) + + +@pytest.mark.parametrize( + "location_id", + [SimcoreS3DataManager.get_location_id()], + ids=[SimcoreS3DataManager.get_location_name()], + indirect=True, +) +@pytest.mark.parametrize( + "project_params", + [ + ProjectWithFilesParams( + num_nodes=12, + allowed_file_sizes=( + TypeAdapter(ByteSize).validate_python("7Mib"), + TypeAdapter(ByteSize).validate_python("110Mib"), + TypeAdapter(ByteSize).validate_python("1Mib"), + ), + allowed_file_checksums=( + TypeAdapter(SHA256Str).validate_python( + "311e2e130d83cfea9c3b7560699c221b0b7f9e5d58b02870bd52b695d8b4aabd" + ), + TypeAdapter(SHA256Str).validate_python( + "08e297db979d3c84f6b072c2a1e269e8aa04e82714ca7b295933a0c9c0f62b2e" + ), + TypeAdapter(SHA256Str).validate_python( + "488f3b57932803bbf644593bd46d95599b1d4da1d63bc020d7ebe6f1c255f7f3" + ), + ), + workspace_files_count=0, + ), + ], + ids=str, +) +async def test_copy_folders_from_valid_project( + short_dsm_cleaner_interval: int, + initialized_app: FastAPI, + storage_rabbitmq_rpc_client: RabbitMQRPCClient, + user_id: UserID, + product_name: ProductName, + create_project: Callable[[], Awaitable[dict[str, Any]]], + sqlalchemy_async_engine: AsyncEngine, + random_project_with_files: Callable[ + [ProjectWithFilesParams], + Awaitable[ + tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, FileIDDict]]] + ], + ], + project_params: ProjectWithFilesParams, +): + # 1. create a src project with some files + src_project, src_projects_list = await random_project_with_files(project_params) + # 2. create a dst project without files + dst_project, nodes_map = clone_project_data(src_project) + dst_project = await create_project(**dst_project) + # copy the project files + data = await _request_copy_folders( + storage_rabbitmq_rpc_client, + user_id, + product_name, + src_project, + dst_project, + nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, + ) + assert data == jsonable_encoder( + await get_updated_project(sqlalchemy_async_engine, dst_project["uuid"]) + ) + + # check that file meta data was effectively copied + for src_node_id in src_projects_list: + dst_node_id = nodes_map.get( + TypeAdapter(NodeIDStr).validate_python(f"{src_node_id}") + ) + assert dst_node_id + for src_file_id, src_file in src_projects_list[src_node_id].items(): + path: Any = src_file["path"] + assert isinstance(path, Path) + checksum: Any = src_file["sha256_checksum"] + assert isinstance(checksum, str) + await assert_file_meta_data_in_db( + sqlalchemy_async_engine, + file_id=TypeAdapter(SimcoreS3FileID).validate_python( + f"{src_file_id}".replace( + f"{src_project['uuid']}", dst_project["uuid"] + ).replace(f"{src_node_id}", f"{dst_node_id}") + ), + expected_entry_exists=True, + expected_file_size=path.stat().st_size, + expected_upload_id=None, + expected_upload_expiration_date=None, + expected_sha256_checksum=TypeAdapter(SHA256Str).validate_python( + checksum + ), + ) + + +async def _create_and_delete_folders_from_project( + rpc_client: RabbitMQRPCClient, + client: httpx.AsyncClient, + user_id: UserID, + product_name: ProductName, + project: dict[str, Any], + initialized_app: FastAPI, + project_db_creator: Callable, + check_list_files: bool, + *, + client_timeout: datetime.timedelta = datetime.timedelta(seconds=60), +) -> None: + destination_project, nodes_map = clone_project_data(project) + await project_db_creator(**destination_project) + + # creating a copy + data = await _request_copy_folders( + rpc_client, + user_id, + product_name, + project, + destination_project, + nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, + client_timeout=client_timeout, + ) + + # data should be equal to the destination project, and all store entries should point to simcore.s3 + # NOTE: data is jsonized where destination project is not! + assert jsonable_encoder(destination_project) == data + + project_id = data["uuid"] + + # list data to check all is here + + if check_list_files: + url = url_from_operation_id( + client, + initialized_app, + "list_files_metadata", + location_id=f"{SimcoreS3DataManager.get_location_id()}", + ).with_query(user_id=f"{user_id}", uuid_filter=f"{project_id}") + + resp = await client.get(f"{url}") + data, error = assert_status(resp, status.HTTP_200_OK, list[FileMetaDataGet]) + assert not error + # DELETING + url = url_from_operation_id( + client, + initialized_app, + "delete_folders_of_project", + folder_id=project_id, + ).with_query(user_id=f"{user_id}") + resp = await client.delete(f"{url}") + assert_status(resp, status.HTTP_204_NO_CONTENT, None) + + # list data is gone + if check_list_files: + url = url_from_operation_id( + client, + initialized_app, + "list_files_metadata", + location_id=f"{SimcoreS3DataManager.get_location_id()}", + ).with_query(user_id=f"{user_id}", uuid_filter=f"{project_id}") + resp = await client.get(f"{url}") + data, error = assert_status(resp, status.HTTP_200_OK, list[FileMetaDataGet]) + assert not error + assert not data + + +@pytest.fixture +def mock_datcore_download(mocker, client): + # Use to mock downloading from DATCore + async def _fake_download_to_file_or_raise(session, url, dest_path): + with log_context(logging.INFO, f"Faking download: {url} -> {dest_path}"): + Path(dest_path).write_text( + "FAKE: test_create_and_delete_folders_from_project" + ) + + mocker.patch( + "simcore_service_storage.simcore_s3_dsm.download_to_file_or_raise", + side_effect=_fake_download_to_file_or_raise, + autospec=True, + ) + + mocker.patch( + "simcore_service_storage.simcore_s3_dsm.datcore_adapter.get_file_download_presigned_link", + autospec=True, + return_value=URL("https://httpbin.org/image"), + ) + + +@pytest.mark.parametrize( + "location_id", + [SimcoreS3DataManager.get_location_id()], + ids=[SimcoreS3DataManager.get_location_name()], + indirect=True, +) +@pytest.mark.parametrize( + "project_params", + [ + ProjectWithFilesParams( + num_nodes=3, + allowed_file_sizes=( + TypeAdapter(ByteSize).validate_python("7Mib"), + TypeAdapter(ByteSize).validate_python("110Mib"), + TypeAdapter(ByteSize).validate_python("1Mib"), + ), + workspace_files_count=0, + ) + ], +) +@pytest.mark.parametrize("num_concurrent_calls", [1], ids=str) +async def test_create_and_delete_folders_from_project( + set_log_levels_for_noisy_libraries: None, + initialized_app: FastAPI, + storage_rabbitmq_rpc_client: RabbitMQRPCClient, + client: httpx.AsyncClient, + user_id: UserID, + product_name: ProductName, + with_random_project_with_files: tuple[ + dict[str, Any], + dict[NodeID, dict[SimcoreS3FileID, dict[str, Path | str]]], + ], + create_project: Callable[..., Awaitable[dict[str, Any]]], + mock_datcore_download, + num_concurrent_calls: int, +): + project_in_db, _ = with_random_project_with_files + # NOTE: here the point is to NOT have a limit on the number of calls!! + await asyncio.gather( + *[ + _create_and_delete_folders_from_project( + storage_rabbitmq_rpc_client, + client, + user_id, + product_name, + project_in_db, + initialized_app, + create_project, + check_list_files=False, + client_timeout=datetime.timedelta(seconds=300), + ) + for _ in range(num_concurrent_calls) + ] + ) diff --git a/services/web/server/src/simcore_service_webserver/projects/_crud_api_create.py b/services/web/server/src/simcore_service_webserver/projects/_crud_api_create.py index 4bb6f9b938a..73f068f9cee 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_crud_api_create.py +++ b/services/web/server/src/simcore_service_webserver/projects/_crud_api_create.py @@ -8,6 +8,7 @@ from jsonschema import ValidationError as JsonSchemaValidationError from models_library.api_schemas_long_running_tasks.base import ProgressPercent from models_library.api_schemas_webserver.projects import ProjectGet +from models_library.products import ProductName from models_library.projects import ProjectID from models_library.projects_access import Owner from models_library.projects_nodes_io import NodeID @@ -71,6 +72,7 @@ async def _prepare_project_copy( app: web.Application, *, user_id: UserID, + product_name: ProductName, src_project_uuid: ProjectID, as_template: bool, deep_copy: bool, @@ -121,6 +123,7 @@ async def _prepare_project_copy( new_project, nodes_map, user_id, + product_name, task_progress, ) return new_project, copy_project_nodes_coro, copy_file_coro @@ -155,6 +158,7 @@ async def _copy_files_from_source_project( new_project: ProjectDict, nodes_map: NodesMap, user_id: UserID, + product_name: str, task_progress: TaskProgress, ): _projects_repository = ProjectDBAPI.get_from_app_context(app) @@ -168,20 +172,26 @@ async def _copy_files_from_source_project( async def _copy() -> None: starting_value = task_progress.percent - async for long_running_task in copy_data_folders_from_project( - app, source_project, new_project, nodes_map, user_id + async for async_job_composed_result in copy_data_folders_from_project( + app, + source_project=source_project, + destination_project=new_project, + nodes_map=nodes_map, + user_id=user_id, + product_name=product_name, ): task_progress.update( - message=long_running_task.progress.message, + message=async_job_composed_result.status.progress.composed_message, percent=TypeAdapter(ProgressPercent).validate_python( ( starting_value - + long_running_task.progress.percent * (1.0 - starting_value) + + async_job_composed_result.status.progress.percent_value + * (1.0 - starting_value) ), ), ) - if long_running_task.done(): - await long_running_task.result() + if async_job_composed_result.done: + await async_job_composed_result.result() if needs_lock_source_project: await with_project_locked( @@ -311,6 +321,7 @@ async def create_project( # pylint: disable=too-many-arguments,too-many-branche ) = await _prepare_project_copy( request.app, user_id=user_id, + product_name=product_name, src_project_uuid=from_study, as_template=as_template, deep_copy=copy_data, diff --git a/services/web/server/src/simcore_service_webserver/storage/api.py b/services/web/server/src/simcore_service_webserver/storage/api.py index 9d65ac3faf3..868dd63ad93 100644 --- a/services/web/server/src/simcore_service_webserver/storage/api.py +++ b/services/web/server/src/simcore_service_webserver/storage/api.py @@ -1,34 +1,38 @@ """Storage subsystem's API: responsible of communication with storage service""" -import asyncio +import datetime import logging import urllib.parse from collections.abc import AsyncGenerator from typing import Any, Final from aiohttp import ClientError, ClientSession, ClientTimeout, web +from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobNameData +from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE from models_library.api_schemas_storage.storage_schemas import ( FileLocation, FileLocationArray, FileMetaDataGet, + FoldersBody, PresignedLink, ) from models_library.generics import Envelope +from models_library.products import ProductName from models_library.projects import ProjectID from models_library.projects_nodes_io import LocationID, NodeID, SimCoreFileLink from models_library.users import UserID -from models_library.utils.fastapi_encoders import jsonable_encoder from pydantic import ByteSize, HttpUrl, TypeAdapter from servicelib.aiohttp.client_session import get_client_session -from servicelib.aiohttp.long_running_tasks.client import ( - LRTask, - long_running_task_request, -) from servicelib.logging_utils import get_log_record_extra, log_context +from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import ( + AsyncJobComposedResult, + submit_and_wait, +) from yarl import URL from ..projects.models import ProjectDict from ..projects.utils import NodesMap +from ..rabbitmq import get_rabbitmq_rpc_client from .settings import StorageSettings, get_plugin_settings _logger = logging.getLogger(__name__) @@ -100,27 +104,30 @@ async def get_project_total_size_simcore_s3( async def copy_data_folders_from_project( app: web.Application, + *, source_project: ProjectDict, destination_project: ProjectDict, nodes_map: NodesMap, user_id: UserID, -) -> AsyncGenerator[LRTask, None]: - session, api_endpoint = _get_storage_client(app) - _logger.debug("Copying %d nodes", len(nodes_map)) - # /simcore-s3/folders: - async for lr_task in long_running_task_request( - session, - (api_endpoint / "simcore-s3/folders").with_query(user_id=user_id), - json=jsonable_encoder( - { - "source": source_project, - "destination": destination_project, - "nodes_map": nodes_map, - } - ), - client_timeout=_TOTAL_TIMEOUT_TO_COPY_DATA_SECS, - ): - yield lr_task + product_name: ProductName, +) -> AsyncGenerator[AsyncJobComposedResult, None]: + with log_context(_logger, logging.DEBUG, msg=f"copy {nodes_map=}"): + rabbitmq_client = get_rabbitmq_rpc_client(app) + async for job_composed_result in submit_and_wait( + rabbitmq_client, + method_name="copy_folders_from_project", + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id_data=AsyncJobNameData(user_id=user_id, product_name=product_name), + body=TypeAdapter(FoldersBody).validate_python( + { + "source": source_project, + "destination": destination_project, + "nodes_map": nodes_map, + }, + ), + client_timeout=datetime.timedelta(seconds=_TOTAL_TIMEOUT_TO_COPY_DATA_SECS), + ): + yield job_composed_result async def _delete(session, target_url): @@ -164,7 +171,7 @@ async def is_healthy(app: web.Application) -> bool: timeout=ClientTimeout(total=2, connect=1), ) return True - except (ClientError, asyncio.TimeoutError) as err: + except (TimeoutError, ClientError) as err: # ClientResponseError, ClientConnectionError, ClientPayloadError, InValidURL _logger.debug("Storage is NOT healthy: %s", err) return False diff --git a/services/web/server/src/simcore_service_webserver/studies_dispatcher/_studies_access.py b/services/web/server/src/simcore_service_webserver/studies_dispatcher/_studies_access.py index 691f6c4df69..5a132352463 100644 --- a/services/web/server/src/simcore_service_webserver/studies_dispatcher/_studies_access.py +++ b/services/web/server/src/simcore_service_webserver/studies_dispatcher/_studies_access.py @@ -195,19 +195,20 @@ async def copy_study_to_account( ) async for lr_task in copy_data_folders_from_project( request.app, - template_project, - project, - nodes_map, - user["id"], + source_project=template_project, + destination_project=project, + nodes_map=nodes_map, + user_id=user["id"], + product_name=product_name, ): _logger.info( "copying %s into %s for %s: %s", f"{template_project['uuid']=}", f"{project['uuid']}", f"{user['id']}", - f"{lr_task.progress=}", + f"{lr_task.status.progress=}", ) - if lr_task.done(): + if lr_task.done: await lr_task.result() await create_or_update_pipeline( request.app, user["id"], project["uuid"], product_name diff --git a/services/web/server/tests/unit/with_dbs/02/test_projects_cancellations.py b/services/web/server/tests/unit/with_dbs/02/test_projects_cancellations.py index fe43672ebfc..07cb83015d0 100644 --- a/services/web/server/tests/unit/with_dbs/02/test_projects_cancellations.py +++ b/services/web/server/tests/unit/with_dbs/02/test_projects_cancellations.py @@ -11,6 +11,9 @@ import pytest from aiohttp.test_utils import TestClient +from faker import Faker +from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobStatus +from models_library.progress_bar import ProgressReport from pydantic import ByteSize, TypeAdapter from pytest_simcore.helpers.assert_checks import assert_status from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict @@ -20,8 +23,10 @@ MockedStorageSubsystem, standard_role_response, ) -from servicelib.aiohttp.long_running_tasks.client import LRTask -from servicelib.aiohttp.long_running_tasks.server import TaskGet, TaskProgress +from servicelib.aiohttp.long_running_tasks.server import TaskGet +from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import ( + AsyncJobComposedResult, +) from simcore_postgres_database.models.users import UserRole from simcore_service_webserver._meta import api_version_prefix from simcore_service_webserver.application_settings import get_application_settings @@ -46,16 +51,22 @@ def app_environment( @pytest.fixture async def slow_storage_subsystem_mock( - storage_subsystem_mock: MockedStorageSubsystem, + storage_subsystem_mock: MockedStorageSubsystem, faker: Faker ) -> MockedStorageSubsystem: # requests storage to copy data - async def _very_slow_copy_of_data(*args): + async def _very_slow_copy_of_data(*args, **kwargs): await asyncio.sleep(30) - async def _mock_result(): - ... + async def _mock_result(): ... - yield LRTask(progress=TaskProgress(), _result=_mock_result()) + yield AsyncJobComposedResult( + AsyncJobStatus( + job_id=faker.uuid4(cast_to=None), + progress=ProgressReport(actual_value=1), + done=True, + ), + _mock_result(), + ) storage_subsystem_mock.copy_data_folders_from_project.side_effect = ( _very_slow_copy_of_data diff --git a/services/web/server/tests/unit/with_dbs/04/studies_dispatcher/test_studies_dispatcher_studies_access.py b/services/web/server/tests/unit/with_dbs/04/studies_dispatcher/test_studies_dispatcher_studies_access.py index 16dfde75956..a049ae257db 100644 --- a/services/web/server/tests/unit/with_dbs/04/studies_dispatcher/test_studies_dispatcher_studies_access.py +++ b/services/web/server/tests/unit/with_dbs/04/studies_dispatcher/test_studies_dispatcher_studies_access.py @@ -18,7 +18,10 @@ from aiohttp import ClientResponse, ClientSession, web from aiohttp.test_utils import TestClient, TestServer from faker import Faker +from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobStatus +from models_library.progress_bar import ProgressReport from models_library.projects_state import ProjectLocked, ProjectStatus +from models_library.users import UserID from pytest_mock import MockerFixture from pytest_simcore.aioresponses_mocker import AioResponsesMock from pytest_simcore.helpers.assert_checks import assert_status @@ -26,15 +29,17 @@ from pytest_simcore.helpers.webserver_parametrizations import MockedStorageSubsystem from pytest_simcore.helpers.webserver_projects import NewProject, delete_all_projects from servicelib.aiohttp import status -from servicelib.aiohttp.long_running_tasks.client import LRTask -from servicelib.aiohttp.long_running_tasks.server import TaskProgress from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE +from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import ( + AsyncJobComposedResult, +) from servicelib.rest_responses import unwrap_envelope from settings_library.utils_session import DEFAULT_SESSION_COOKIE_NAME from simcore_service_webserver.projects._projects_service import ( submit_delete_project_task, ) from simcore_service_webserver.projects.models import ProjectDict +from simcore_service_webserver.projects.utils import NodesMap from simcore_service_webserver.users.api import ( delete_user_without_projects, get_user_role, @@ -143,7 +148,7 @@ def mocks_on_projects_api(mocker: MockerFixture) -> None: @pytest.fixture async def storage_subsystem_mock_override( - storage_subsystem_mock: MockedStorageSubsystem, mocker: MockerFixture + storage_subsystem_mock: MockedStorageSubsystem, mocker: MockerFixture, faker: Faker ) -> None: """ Mocks functions that require storage client @@ -160,21 +165,37 @@ async def storage_subsystem_mock_override( ) async def _mock_copy_data_from_project( - app, src_prj, dst_prj, nodes_map, user_id - ) -> AsyncGenerator[LRTask, None]: + app: web.Application, + *, + source_project: ProjectDict, + destination_project: ProjectDict, + nodes_map: NodesMap, + user_id: UserID, + product_name: str, + ) -> AsyncGenerator[AsyncJobComposedResult, None]: print( - f"MOCK copying data project {src_prj['uuid']} -> {dst_prj['uuid']} " + f"MOCK copying data project {source_project['uuid']} -> {destination_project['uuid']} " f"with {len(nodes_map)} s3 objects by user={user_id}" ) - yield LRTask(TaskProgress(message="pytest mocked fct, started")) + yield AsyncJobComposedResult( + AsyncJobStatus( + job_id=faker.uuid4(cast_to=None), + progress=ProgressReport(actual_value=0), + done=False, + ) + ) async def _mock_result(): return None - yield LRTask( - TaskProgress(message="pytest mocked fct, finished", percent=1.0), - _result=_mock_result(), + yield AsyncJobComposedResult( + AsyncJobStatus( + job_id=faker.uuid4(cast_to=None), + progress=ProgressReport(actual_value=1), + done=True, + ), + _mock_result(), ) mock.side_effect = _mock_copy_data_from_project diff --git a/services/web/server/tests/unit/with_dbs/conftest.py b/services/web/server/tests/unit/with_dbs/conftest.py index b08db8f8702..aadf58c6101 100644 --- a/services/web/server/tests/unit/with_dbs/conftest.py +++ b/services/web/server/tests/unit/with_dbs/conftest.py @@ -8,7 +8,14 @@ import sys import textwrap import warnings -from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Iterator +from collections.abc import ( + AsyncGenerator, + AsyncIterable, + AsyncIterator, + Awaitable, + Callable, + Iterator, +) from copy import deepcopy from decimal import Decimal from pathlib import Path @@ -30,8 +37,11 @@ from aiopg.sa import create_engine from faker import Faker from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobStatus from models_library.products import ProductName +from models_library.progress_bar import ProgressReport from models_library.services_enums import ServiceState +from models_library.users import UserID from pydantic import ByteSize, TypeAdapter from pytest_docker.plugin import Services from pytest_mock import MockerFixture @@ -44,9 +54,10 @@ from pytest_simcore.helpers.webserver_projects import NewProject from redis import Redis from servicelib.aiohttp.application_keys import APP_AIOPG_ENGINE_KEY -from servicelib.aiohttp.long_running_tasks.client import LRTask -from servicelib.aiohttp.long_running_tasks.server import ProgressPercent, TaskProgress from servicelib.common_aiopg_utils import DSN +from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import ( + AsyncJobComposedResult, +) from settings_library.email import SMTPSettings from settings_library.redis import RedisDatabase, RedisSettings from simcore_postgres_database.models.groups_extra_properties import ( @@ -63,6 +74,7 @@ from simcore_service_webserver.constants import INDEX_RESOURCE_NAME from simcore_service_webserver.db.plugin import get_database_engine from simcore_service_webserver.projects.models import ProjectDict +from simcore_service_webserver.projects.utils import NodesMap from simcore_service_webserver.statics._constants import ( FRONTEND_APP_DEFAULT, FRONTEND_APPS_AVAILABLE, @@ -336,29 +348,47 @@ def add_index_route(app: web.Application) -> None: @pytest.fixture -async def storage_subsystem_mock(mocker: MockerFixture) -> MockedStorageSubsystem: +async def storage_subsystem_mock( + mocker: MockerFixture, faker: Faker +) -> MockedStorageSubsystem: """ Patches client calls to storage service Patched functions are exposed within projects but call storage subsystem """ - async def _mock_copy_data_from_project(app, src_prj, dst_prj, nodes_map, user_id): + async def _mock_copy_data_from_project( + app: web.Application, + *, + source_project: ProjectDict, + destination_project: ProjectDict, + nodes_map: NodesMap, + user_id: UserID, + product_name: str, + ) -> AsyncGenerator[AsyncJobComposedResult, None]: print( - f"MOCK copying data project {src_prj['uuid']} -> {dst_prj['uuid']} " + f"MOCK copying data project {source_project['uuid']} -> {destination_project['uuid']} " f"with {len(nodes_map)} s3 objects by user={user_id}" ) - yield LRTask(TaskProgress(message="pytest mocked fct, started")) + yield AsyncJobComposedResult( + AsyncJobStatus( + job_id=faker.uuid4(cast_to=None), + progress=ProgressReport(actual_value=0), + done=False, + ) + ) - async def _mock_result(): + async def _mock_result() -> None: return None - yield LRTask( - TaskProgress( - message="pytest mocked fct, finished", percent=ProgressPercent(1.0) + yield AsyncJobComposedResult( + AsyncJobStatus( + job_id=faker.uuid4(cast_to=None), + progress=ProgressReport(actual_value=1), + done=True, ), - _result=_mock_result(), + _mock_result(), ) mock = mocker.patch( @@ -723,7 +753,6 @@ async def app_products_names( priority = 1 for name in FRONTEND_APPS_AVAILABLE: if name != FRONTEND_APP_DEFAULT: - async with asyncpg_engine.begin() as conn: result = await conn.execute( products.insert().values(