diff --git a/api/specs/web-server/_storage.py b/api/specs/web-server/_storage.py index 848fac10a7b..7eae53bf742 100644 --- a/api/specs/web-server/_storage.py +++ b/api/specs/web-server/_storage.py @@ -26,6 +26,7 @@ StorageAsyncJobResult, StorageAsyncJobStatus, StorageLocationPathParams, + StoragePathComputeSizeParams, ) from models_library.generics import Envelope from models_library.projects_nodes_io import LocationID @@ -68,6 +69,15 @@ async def list_storage_paths( """Lists the files/directories in WorkingDirectory""" +@router.post( + "/storage/locations/{location_id}/paths/{path}:size", + response_model=Envelope[StorageAsyncJobGet], + status_code=status.HTTP_202_ACCEPTED, +) +async def compute_path_size(_path: Annotated[StoragePathComputeSizeParams, Depends()]): + """Compute the size of a path""" + + @router.get( "/storage/locations/{location_id}/datasets", response_model=Envelope[list[DatasetMetaData]], diff --git a/packages/models-library/src/models_library/api_schemas_webserver/storage.py b/packages/models-library/src/models_library/api_schemas_webserver/storage.py index f192ba51355..07659121a42 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/storage.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/storage.py @@ -1,10 +1,6 @@ from pathlib import Path from typing import Annotated, Any -from models_library.api_schemas_storage.storage_schemas import ( - DEFAULT_NUMBER_OF_PATHS_PER_PAGE, - MAX_NUMBER_OF_PATHS_PER_PAGE, -) from pydantic import BaseModel, Field from ..api_schemas_rpc_async_jobs.async_jobs import ( @@ -14,6 +10,10 @@ AsyncJobStatus, ) from ..api_schemas_storage.data_export_async_jobs import DataExportTaskStartInput +from ..api_schemas_storage.storage_schemas import ( + DEFAULT_NUMBER_OF_PATHS_PER_PAGE, + MAX_NUMBER_OF_PATHS_PER_PAGE, +) from ..progress_bar import ProgressReport from ..projects_nodes_io import LocationID, StorageFileID from ..rest_pagination import ( @@ -26,6 +26,10 @@ class StorageLocationPathParams(BaseModel): location_id: LocationID +class StoragePathComputeSizeParams(StorageLocationPathParams): + path: Path + + class ListPathsQueryParams(InputSchema, CursorQueryParameters): file_filter: Path | None = None diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py index a7246350773..022da445b83 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py @@ -23,7 +23,7 @@ async def abort( *, rpc_namespace: RPCNamespace, job_id: AsyncJobId, - job_id_data: AsyncJobNameData + job_id_data: AsyncJobNameData, ) -> AsyncJobAbort: result = await rabbitmq_rpc_client.request( rpc_namespace, @@ -41,7 +41,7 @@ async def get_status( *, rpc_namespace: RPCNamespace, job_id: AsyncJobId, - job_id_data: AsyncJobNameData + job_id_data: AsyncJobNameData, ) -> AsyncJobStatus: result = await rabbitmq_rpc_client.request( rpc_namespace, @@ -59,7 +59,7 @@ async def get_result( *, rpc_namespace: RPCNamespace, job_id: AsyncJobId, - job_id_data: AsyncJobNameData + job_id_data: AsyncJobNameData, ) -> AsyncJobResult: result = await rabbitmq_rpc_client.request( rpc_namespace, @@ -77,7 +77,7 @@ async def list_jobs( *, rpc_namespace: RPCNamespace, filter_: str, - job_id_data: AsyncJobNameData + job_id_data: AsyncJobNameData, ) -> list[AsyncJobGet]: result: list[AsyncJobGet] = await rabbitmq_rpc_client.request( rpc_namespace, @@ -95,7 +95,7 @@ async def submit_job( rpc_namespace: RPCNamespace, method_name: str, job_id_data: AsyncJobNameData, - **kwargs + **kwargs, ) -> AsyncJobGet: result = await rabbitmq_rpc_client.request( rpc_namespace, @@ -104,5 +104,5 @@ async def submit_job( **kwargs, timeout_s=_DEFAULT_TIMEOUT_S, ) - assert isinstance(result, AsyncJobGet) + assert isinstance(result, AsyncJobGet) # nosec return result diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/__init__.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py new file mode 100644 index 00000000000..d924a94fbe7 --- /dev/null +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py @@ -0,0 +1,33 @@ +from pathlib import Path + +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobNameData, +) +from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE +from models_library.api_schemas_webserver.storage import StorageAsyncJobGet +from models_library.projects_nodes_io import LocationID +from models_library.rabbitmq_basic_types import RPCMethodName +from models_library.users import UserID + +from ..._client_rpc import RabbitMQRPCClient +from ..async_jobs.async_jobs import submit_job + + +async def compute_path_size( + client: RabbitMQRPCClient, + *, + user_id: UserID, + product_name: str, + location_id: LocationID, + path: Path, +) -> tuple[StorageAsyncJobGet, AsyncJobNameData]: + job_id_data = AsyncJobNameData(user_id=user_id, product_name=product_name) + async_job_rpc_get = await submit_job( + rabbitmq_rpc_client=client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + method_name=RPCMethodName("compute_path_size"), + job_id_data=job_id_data, + location_id=location_id, + path=path, + ) + return StorageAsyncJobGet.from_rpc_schema(async_job_rpc_get), job_id_data diff --git a/services/storage/docker/boot.sh b/services/storage/docker/boot.sh index 1a62904bb35..fc1ea38c59c 100755 --- a/services/storage/docker/boot.sh +++ b/services/storage/docker/boot.sh @@ -42,10 +42,32 @@ APP_LOG_LEVEL=${STORAGE_LOGLEVEL:-${LOG_LEVEL:-${LOGLEVEL:-INFO}}} SERVER_LOG_LEVEL=$(echo "${APP_LOG_LEVEL}" | tr '[:upper:]' '[:lower:]') echo "$INFO" "Log-level app/server: $APP_LOG_LEVEL/$SERVER_LOG_LEVEL" -if [ "${SC_BOOT_MODE}" = "debug" ]; then - reload_dir_packages=$(find /devel/packages -maxdepth 3 -type d -path "*/src/*" ! -path "*.*" -exec echo '--reload-dir {} \' \;) +if [ "${STORAGE_WORKER_MODE}" = "true" ]; then + if [ "${SC_BOOT_MODE}" = "debug" ]; then + exec watchmedo auto-restart \ + --directory /devel/packages \ + --directory services/storage \ + --pattern "*.py" \ + --recursive \ + -- \ + celery \ + --app=simcore_service_storage.modules.celery.worker_main:app \ + worker --pool=threads \ + --loglevel="${SERVER_LOG_LEVEL}" \ + --concurrency="${CELERY_CONCURRENCY}" + else + exec celery \ + --app=simcore_service_storage.modules.celery.worker_main:app \ + worker --pool=threads \ + --loglevel="${SERVER_LOG_LEVEL}" \ + --hostname="${HOSTNAME}" \ + --concurrency="${CELERY_CONCURRENCY}" + fi +else + if [ "${SC_BOOT_MODE}" = "debug" ]; then + reload_dir_packages=$(find /devel/packages -maxdepth 3 -type d -path "*/src/*" ! -path "*.*" -exec echo '--reload-dir {} \' \;) - exec sh -c " + exec sh -c " cd services/storage/src/simcore_service_storage && \ python -m debugpy --listen 0.0.0.0:${STORAGE_REMOTE_DEBUGGING_PORT} -m uvicorn main:app \ --host 0.0.0.0 \ @@ -55,14 +77,6 @@ if [ "${SC_BOOT_MODE}" = "debug" ]; then --reload-dir . \ --log-level \"${SERVER_LOG_LEVEL}\" " -else - if [ "${STORAGE_WORKER_MODE}" = "true" ]; then - exec celery \ - --app=simcore_service_storage.modules.celery.worker_main:app \ - worker --pool=threads \ - --loglevel="${SERVER_LOG_LEVEL}" \ - --hostname="${HOSTNAME}" \ - --concurrency="${CELERY_CONCURRENCY}" else exec uvicorn simcore_service_storage.main:app \ --host 0.0.0.0 \ diff --git a/services/storage/requirements/_base.in b/services/storage/requirements/_base.in index 44da01ea789..cf0ccfdba89 100644 --- a/services/storage/requirements/_base.in +++ b/services/storage/requirements/_base.in @@ -29,3 +29,4 @@ pydantic[dotenv] tenacity typer types-aiobotocore[s3] # s3 storage +watchdog diff --git a/services/storage/requirements/_base.txt b/services/storage/requirements/_base.txt index 0e84c66d32e..99c101b84d6 100644 --- a/services/storage/requirements/_base.txt +++ b/services/storage/requirements/_base.txt @@ -984,6 +984,8 @@ vine==5.1.0 # amqp # celery # kombu +watchdog==6.0.0 + # via -r requirements/_base.in watchfiles==1.0.4 # via uvicorn wcwidth==0.2.13 diff --git a/services/storage/requirements/_test.txt b/services/storage/requirements/_test.txt index 962a86a4d56..886d2d165e3 100644 --- a/services/storage/requirements/_test.txt +++ b/services/storage/requirements/_test.txt @@ -374,7 +374,6 @@ setuptools==75.8.2 # via # moto # pytest-celery - # via moto simcore-service-storage-sdk @ git+https://github.com/ITISFoundation/osparc-simcore.git@cfdf4f86d844ebb362f4f39e9c6571d561b72897#subdirectory=services/storage/client-sdk/python # via -r requirements/_test.in six==1.17.0 diff --git a/services/storage/requirements/_tools.txt b/services/storage/requirements/_tools.txt index 24925ca1725..d57a2d475d5 100644 --- a/services/storage/requirements/_tools.txt +++ b/services/storage/requirements/_tools.txt @@ -87,6 +87,8 @@ typing-extensions==4.12.2 virtualenv==20.29.2 # via pre-commit watchdog==6.0.0 - # via -r requirements/_tools.in + # via + # -c requirements/_base.txt + # -r requirements/_tools.in wheel==0.45.1 # via pip-tools diff --git a/services/storage/src/simcore_service_storage/api/_worker_tasks/__init__.py b/services/storage/src/simcore_service_storage/api/_worker_tasks/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/services/storage/src/simcore_service_storage/api/_worker_tasks/_paths.py b/services/storage/src/simcore_service_storage/api/_worker_tasks/_paths.py new file mode 100644 index 00000000000..f8208221f9f --- /dev/null +++ b/services/storage/src/simcore_service_storage/api/_worker_tasks/_paths.py @@ -0,0 +1,25 @@ +import logging +from pathlib import Path + +from celery import Task # type: ignore[import-untyped] +from models_library.projects_nodes_io import LocationID +from models_library.users import UserID +from pydantic import ByteSize +from servicelib.logging_utils import log_context + +from ...dsm import get_dsm_provider +from ...modules.celery.utils import get_fastapi_app + +_logger = logging.getLogger(__name__) + + +async def compute_path_size( + task: Task, user_id: UserID, location_id: LocationID, path: Path +) -> ByteSize: + with log_context( + _logger, + logging.INFO, + msg=f"computing path size {user_id=}, {location_id=}, {path=}", + ): + dsm = get_dsm_provider(get_fastapi_app(task.app)).get(location_id) + return await dsm.compute_path_size(user_id, path=Path(path)) diff --git a/services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py b/services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py new file mode 100644 index 00000000000..557013de976 --- /dev/null +++ b/services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py @@ -0,0 +1,22 @@ +import logging + +from celery import Celery # type: ignore[import-untyped] +from servicelib.logging_utils import log_context + +from ...modules.celery._celery_types import register_celery_types +from ...modules.celery._task import define_task +from ...modules.celery.tasks import export_data +from ._paths import compute_path_size + +_logger = logging.getLogger(__name__) + + +def setup_worker_tasks(app: Celery) -> None: + register_celery_types() + with log_context( + _logger, + logging.INFO, + msg="Storage setup Worker Tasks", + ): + define_task(app, export_data) + define_task(app, compute_path_size) diff --git a/services/storage/src/simcore_service_storage/api/rpc/_paths.py b/services/storage/src/simcore_service_storage/api/rpc/_paths.py new file mode 100644 index 00000000000..34ea3deeedb --- /dev/null +++ b/services/storage/src/simcore_service_storage/api/rpc/_paths.py @@ -0,0 +1,35 @@ +from pathlib import Path + +from fastapi import FastAPI +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobGet, + AsyncJobNameData, +) +from models_library.projects_nodes_io import LocationID +from servicelib.rabbitmq import RPCRouter + +from ...modules.celery import get_celery_client +from .._worker_tasks._paths import compute_path_size as remote_compute_path_size + +router = RPCRouter() + + +@router.expose(reraise_if_error_type=None) +async def compute_path_size( + app: FastAPI, + job_id_data: AsyncJobNameData, + # user_id: UserID, + location_id: LocationID, + path: Path, +) -> AsyncJobGet: + assert app # nosec + + task_uuid = await get_celery_client(app).send_task( + remote_compute_path_size.__name__, + task_context=job_id_data.model_dump(), + user_id=job_id_data.user_id, + location_id=location_id, + path=path, + ) + + return AsyncJobGet(job_id=task_uuid) diff --git a/services/storage/src/simcore_service_storage/api/rpc/routes.py b/services/storage/src/simcore_service_storage/api/rpc/routes.py index 812ce296adf..799a2b4e839 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/routes.py +++ b/services/storage/src/simcore_service_storage/api/rpc/routes.py @@ -6,12 +6,16 @@ from servicelib.rabbitmq import RPCRouter from ...modules.rabbitmq import get_rabbitmq_rpc_server -from . import _async_jobs, _data_export +from . import _async_jobs, _data_export, _paths _logger = logging.getLogger(__name__) -ROUTERS: list[RPCRouter] = [_data_export.router, _async_jobs.router] +ROUTERS: list[RPCRouter] = [ + _async_jobs.router, + _data_export.router, + _paths.router, +] def setup_rpc_api_routes(app: FastAPI) -> None: diff --git a/services/storage/src/simcore_service_storage/core/application.py b/services/storage/src/simcore_service_storage/core/application.py index d843bd72994..aecc95bd499 100644 --- a/services/storage/src/simcore_service_storage/core/application.py +++ b/services/storage/src/simcore_service_storage/core/application.py @@ -33,6 +33,7 @@ from ..dsm import setup_dsm from ..dsm_cleaner import setup_dsm_cleaner from ..exceptions.handlers import set_exception_handlers +from ..modules.celery import setup_celery_client from ..modules.db import setup_db from ..modules.long_running_tasks import setup_rest_api_long_running_tasks_for_uploads from ..modules.rabbitmq import setup as setup_rabbitmq @@ -53,7 +54,7 @@ _logger = logging.getLogger(__name__) -def create_app(settings: ApplicationSettings) -> FastAPI: +def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901 # keep mostly quiet noisy loggers quiet_level: int = max( min(logging.root.level + _LOG_LEVEL_STEP, logging.CRITICAL), logging.WARNING @@ -86,6 +87,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI: setup_rabbitmq(app) if not settings.STORAGE_WORKER_MODE: setup_rpc_api_routes(app) + setup_celery_client(app) setup_rest_api_long_running_tasks_for_uploads(app) setup_rest_api_routes(app, API_VTAG) set_exception_handlers(app) diff --git a/services/storage/src/simcore_service_storage/main.py b/services/storage/src/simcore_service_storage/main.py index 6270aa4786c..abf94338662 100644 --- a/services/storage/src/simcore_service_storage/main.py +++ b/services/storage/src/simcore_service_storage/main.py @@ -5,7 +5,6 @@ from servicelib.logging_utils import config_all_loggers from simcore_service_storage.core.application import create_app from simcore_service_storage.core.settings import ApplicationSettings -from simcore_service_storage.modules.celery import setup_celery _settings = ApplicationSettings.create_from_envs() @@ -20,7 +19,4 @@ _logger = logging.getLogger(__name__) -fastapi_app = create_app(_settings) -setup_celery(fastapi_app) - -app = fastapi_app +app = create_app(_settings) diff --git a/services/storage/src/simcore_service_storage/modules/celery/__init__.py b/services/storage/src/simcore_service_storage/modules/celery/__init__.py index abfb72bae65..fc6ed86c7b5 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/__init__.py +++ b/services/storage/src/simcore_service_storage/modules/celery/__init__.py @@ -4,23 +4,27 @@ from fastapi import FastAPI from ...core.settings import get_application_settings +from ._celery_types import register_celery_types from ._common import create_app from .client import CeleryTaskQueueClient _logger = logging.getLogger(__name__) -def setup_celery(app: FastAPI) -> None: +def setup_celery_client(app: FastAPI) -> None: async def on_startup() -> None: celery_settings = get_application_settings(app).STORAGE_CELERY assert celery_settings # nosec celery_app = create_app(celery_settings) app.state.celery_client = CeleryTaskQueueClient(celery_app) + register_celery_types() + app.add_event_handler("startup", on_startup) def get_celery_client(app: FastAPI) -> CeleryTaskQueueClient: + assert hasattr(app.state, "celery_client") # nosec celery_client = app.state.celery_client assert isinstance(celery_client, CeleryTaskQueueClient) return celery_client diff --git a/services/storage/src/simcore_service_storage/modules/celery/_celery_types.py b/services/storage/src/simcore_service_storage/modules/celery/_celery_types.py new file mode 100644 index 00000000000..e3742f6fc64 --- /dev/null +++ b/services/storage/src/simcore_service_storage/modules/celery/_celery_types.py @@ -0,0 +1,29 @@ +from pathlib import Path + +from kombu.utils.json import register_type # type: ignore[import-untyped] + + +def _path_encoder(obj): + if isinstance(obj, Path): + return {"__path__": True, "path": str(obj)} + return obj + + +# Define how Path objects are deserialized +def _path_decoder(obj): + if "__path__" in obj: + return Path(obj["path"]) + return obj + + +def _class_full_name(clz: type) -> str: + return ".".join([clz.__module__, clz.__qualname__]) + + +def register_celery_types() -> None: + register_type( + Path, + _class_full_name(Path), + _path_encoder, + _path_decoder, + ) diff --git a/services/storage/src/simcore_service_storage/modules/celery/_common.py b/services/storage/src/simcore_service_storage/modules/celery/_common.py index 52bb638772a..2007aae2c39 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/_common.py +++ b/services/storage/src/simcore_service_storage/modules/celery/_common.py @@ -1,17 +1,9 @@ import logging -import traceback -from collections.abc import Callable -from functools import wraps -from typing import Any -from celery import Celery, Task # type: ignore[import-untyped] -from celery.contrib.abortable import AbortableTask # type: ignore[import-untyped] -from celery.exceptions import Ignore # type: ignore[import-untyped] +from celery import Celery # type: ignore[import-untyped] from settings_library.celery import CelerySettings from settings_library.redis import RedisDatabase -from .models import TaskError, TaskState - _logger = logging.getLogger(__name__) @@ -32,40 +24,3 @@ def create_app(celery_settings: CelerySettings) -> Celery: app.conf.worker_send_task_events = True # enable tasks monitoring return app - - -def error_handling(func: Callable[..., Any]) -> Callable[..., Any]: - @wraps(func) - def wrapper(task: Task, *args: Any, **kwargs: Any) -> Any: - try: - return func(task, *args, **kwargs) - except Exception as exc: - exc_type = type(exc).__name__ - exc_message = f"{exc}" - exc_traceback = traceback.format_exc().split("\n") - - _logger.exception( - "Task %s failed with exception: %s", - task.request.id, - exc_message, - ) - - task.update_state( - state=TaskState.ERROR.upper(), - meta=TaskError( - exc_type=exc_type, - exc_msg=exc_message, - ).model_dump(mode="json"), - traceback=exc_traceback, - ) - raise Ignore from exc # ignore doing state updates - - return wrapper - - -def define_task(app: Celery, fn: Callable, task_name: str | None = None): - app.task( - name=task_name or fn.__name__, - bind=True, - base=AbortableTask, - )(error_handling(fn)) diff --git a/services/storage/src/simcore_service_storage/modules/celery/_task.py b/services/storage/src/simcore_service_storage/modules/celery/_task.py new file mode 100644 index 00000000000..f89ca963c86 --- /dev/null +++ b/services/storage/src/simcore_service_storage/modules/celery/_task.py @@ -0,0 +1,81 @@ +import asyncio +import logging +import traceback +from collections.abc import Callable, Coroutine +from functools import wraps +from typing import Any, ParamSpec, TypeVar + +from celery import ( # type: ignore[import-untyped] + Celery, + Task, +) +from celery.contrib.abortable import AbortableTask # type: ignore[import-untyped] +from celery.exceptions import Ignore # type: ignore[import-untyped] + +from . import get_event_loop +from .models import TaskError, TaskState +from .utils import get_fastapi_app + +_logger = logging.getLogger(__name__) + + +def error_handling(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + def wrapper(task: Task, *args: Any, **kwargs: Any) -> Any: + try: + return func(task, *args, **kwargs) + except Exception as exc: + exc_type = type(exc).__name__ + exc_message = f"{exc}" + exc_traceback = traceback.format_exc().split("\n") + + _logger.exception( + "Task %s failed with exception: %s", + task.request.id, + exc_message, + ) + + task.update_state( + state=TaskState.ERROR.upper(), + meta=TaskError( + exc_type=exc_type, + exc_msg=exc_message, + ).model_dump(mode="json"), + traceback=exc_traceback, + ) + raise Ignore from exc # ignore doing state updates + + return wrapper + + +T = TypeVar("T") +P = ParamSpec("P") +R = TypeVar("R") + + +def _async_task_wrapper( + app: Celery, +) -> Callable[[Callable[P, Coroutine[Any, Any, R]]], Callable[P, R]]: + def decorator(coro: Callable[P, Coroutine[Any, Any, R]]) -> Callable[P, R]: + @wraps(coro) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + fastapi_app = get_fastapi_app(app) + return asyncio.run_coroutine_threadsafe( + coro(*args, **kwargs), get_event_loop(fastapi_app) + ).result() + + return wrapper + + return decorator + + +def define_task(app: Celery, fn: Callable, task_name: str | None = None): + wrapped_fn = error_handling(fn) + if asyncio.iscoroutinefunction(fn): + wrapped_fn = _async_task_wrapper(app)(fn) + + app.task( + name=task_name or fn.__name__, + bind=True, + base=AbortableTask, + )(wrapped_fn) diff --git a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py index 873384095d0..99b9a53676e 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py +++ b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py @@ -4,15 +4,14 @@ from celery.signals import worker_init, worker_shutdown # type: ignore[import-untyped] from servicelib.logging_utils import config_all_loggers +from simcore_service_storage.api._worker_tasks.tasks import setup_worker_tasks from ...core.settings import ApplicationSettings from ._common import create_app as create_celery_app -from ._common import define_task from .signals import ( on_worker_init, on_worker_shutdown, ) -from .tasks import export_data _settings = ApplicationSettings.create_from_envs() @@ -31,4 +30,5 @@ worker_init.connect(on_worker_init) worker_shutdown.connect(on_worker_shutdown) -define_task(app, export_data) + +setup_worker_tasks(app) diff --git a/services/storage/tests/conftest.py b/services/storage/tests/conftest.py index b766c765521..03c73f5bfd0 100644 --- a/services/storage/tests/conftest.py +++ b/services/storage/tests/conftest.py @@ -57,6 +57,7 @@ from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.aiohttp import status from servicelib.utils import limited_gather +from settings_library.rabbit import RabbitSettings from simcore_postgres_database.models.tokens import tokens from simcore_postgres_database.storage_models import file_meta_data, projects, users from simcore_service_storage.core.application import create_app @@ -92,6 +93,7 @@ "pytest_simcore.openapi_specs", "pytest_simcore.postgres_service", "pytest_simcore.pytest_global_environs", + "pytest_simcore.rabbit_service", "pytest_simcore.repository_paths", "pytest_simcore.simcore_storage_data_models", "pytest_simcore.simcore_storage_datcore_adapter", @@ -165,7 +167,6 @@ def app_environment( mock_env_devel_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, external_envfile_dict: EnvVarsDict, - mock_rabbit_setup: MockerFixture, ) -> EnvVarsDict: if external_envfile_dict: delenvs_from_dict(monkeypatch, mock_env_devel_environment, raising=False) @@ -175,9 +176,22 @@ def app_environment( return mock_env_devel_environment | envs +@pytest.fixture +def disabled_rabbitmq(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch): + monkeypatch.setenv("STORAGE_RABBITMQ", "null") + + +@pytest.fixture +def enabled_rabbitmq( + app_environment: EnvVarsDict, rabbit_service: RabbitSettings +) -> RabbitSettings: + return rabbit_service + + @pytest.fixture def app_settings( app_environment: EnvVarsDict, + enabled_rabbitmq: RabbitSettings, sqlalchemy_async_engine: AsyncEngine, postgres_host_config: dict[str, str], mocked_s3_server_envs: EnvVarsDict, diff --git a/services/storage/tests/unit/modules/celery/test_celery.py b/services/storage/tests/unit/modules/celery/test_celery.py index 097e5b269ab..77dd5cf3e2a 100644 --- a/services/storage/tests/unit/modules/celery/test_celery.py +++ b/services/storage/tests/unit/modules/celery/test_celery.py @@ -12,7 +12,7 @@ from pydantic import TypeAdapter, ValidationError from servicelib.logging_utils import log_context from simcore_service_storage.modules.celery import get_event_loop -from simcore_service_storage.modules.celery._common import define_task +from simcore_service_storage.modules.celery._task import define_task from simcore_service_storage.modules.celery.client import CeleryTaskQueueClient from simcore_service_storage.modules.celery.models import ( TaskContext, @@ -72,7 +72,7 @@ def dreamer_task(task: AbortableTask) -> list[int]: if task.is_aborted(): _logger.warning("Alarm clock") return numbers - numbers.append(randint(1, 90)) + numbers.append(randint(1, 90)) # noqa: S311 time.sleep(1) return numbers @@ -88,7 +88,7 @@ def _(celery_app: Celery) -> None: @pytest.mark.usefixtures("celery_worker") -async def test_sumitting_task_calling_async_function_results_with_success_state( +async def test_submitting_task_calling_async_function_results_with_success_state( celery_client: CeleryTaskQueueClient, ): task_context = TaskContext(user_id=42) diff --git a/services/storage/tests/unit/test__legacy_storage_sdk_compatibility.py b/services/storage/tests/unit/test__legacy_storage_sdk_compatibility.py index f66443d9681..0531f02c6b1 100644 --- a/services/storage/tests/unit/test__legacy_storage_sdk_compatibility.py +++ b/services/storage/tests/unit/test__legacy_storage_sdk_compatibility.py @@ -39,7 +39,7 @@ ) from yarl import URL -pytest_simcore_core_services_selection = ["postgres"] +pytest_simcore_core_services_selection = ["postgres", "rabbit"] pytest_simcore_ops_services_selection = [ "adminer", ] diff --git a/services/storage/tests/unit/test__worker_tasks_paths.py b/services/storage/tests/unit/test__worker_tasks_paths.py new file mode 100644 index 00000000000..c8d508d4129 --- /dev/null +++ b/services/storage/tests/unit/test__worker_tasks_paths.py @@ -0,0 +1,216 @@ +# pylint:disable=no-name-in-module +# pylint:disable=protected-access +# pylint:disable=redefined-outer-name +# pylint:disable=too-many-arguments +# pylint:disable=too-many-positional-arguments +# pylint:disable=unused-argument +# pylint:disable=unused-variable + + +import random +from pathlib import Path +from typing import Any, TypeAlias + +import httpx +import pytest +from celery import Celery, Task +from faker import Faker +from fastapi import FastAPI +from models_library.projects_nodes_io import LocationID, NodeID, SimcoreS3FileID +from models_library.users import UserID +from pydantic import ByteSize, TypeAdapter +from pytest_simcore.helpers.storage_utils import FileIDDict, ProjectWithFilesParams +from simcore_service_storage.api._worker_tasks._paths import compute_path_size +from simcore_service_storage.modules.celery.utils import set_fastapi_app +from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager + +pytest_simcore_core_services_selection = ["postgres"] +pytest_simcore_ops_services_selection = ["adminer"] + +_IsFile: TypeAlias = bool + + +def _filter_and_group_paths_one_level_deeper( + paths: list[Path], prefix: Path +) -> list[tuple[Path, _IsFile]]: + relative_paths = (path for path in paths if path.is_relative_to(prefix)) + return sorted( + { + ( + (path, len(path.relative_to(prefix).parts) == 1) + if len(path.relative_to(prefix).parts) == 1 + else (prefix / path.relative_to(prefix).parts[0], False) + ) + for path in relative_paths + }, + key=lambda x: x[0], + ) + + +async def _assert_compute_path_size( + celery_task: Task, + location_id: LocationID, + user_id: UserID, + *, + path: Path, + expected_total_size: int, +) -> ByteSize: + response = await compute_path_size( + celery_task, user_id=user_id, location_id=location_id, path=path + ) + assert isinstance(response, ByteSize) + assert response == expected_total_size + return response + + +@pytest.fixture +def fake_celery_task(celery_app: Celery, initialized_app: FastAPI) -> Task: + celery_task = Task() + celery_task.app = celery_app + set_fastapi_app(celery_app, initialized_app) + return celery_task + + +@pytest.mark.parametrize( + "location_id", + [SimcoreS3DataManager.get_location_id()], + ids=[SimcoreS3DataManager.get_location_name()], + indirect=True, +) +@pytest.mark.parametrize( + "project_params", + [ + ProjectWithFilesParams( + num_nodes=5, + allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("1b"),), + workspace_files_count=10, + ) + ], + ids=str, +) +async def test_path_compute_size( + fake_celery_task: Task, + location_id: LocationID, + user_id: UserID, + with_random_project_with_files: tuple[ + dict[str, Any], + dict[NodeID, dict[SimcoreS3FileID, FileIDDict]], + ], + project_params: ProjectWithFilesParams, +): + assert ( + len(project_params.allowed_file_sizes) == 1 + ), "test preconditions are not filled! allowed file sizes should have only 1 option for this test" + project, list_of_files = with_random_project_with_files + + total_num_files = sum( + len(files_in_node) for files_in_node in list_of_files.values() + ) + + # get size of a full project + expected_total_size = project_params.allowed_file_sizes[0] * total_num_files + path = Path(project["uuid"]) + await _assert_compute_path_size( + fake_celery_task, + location_id, + user_id, + path=path, + expected_total_size=expected_total_size, + ) + + # get size of one of the nodes + selected_node_id = NodeID(random.choice(list(project["workbench"]))) # noqa: S311 + path = Path(project["uuid"]) / f"{selected_node_id}" + selected_node_s3_keys = [ + Path(s3_object_id) for s3_object_id in list_of_files[selected_node_id] + ] + expected_total_size = project_params.allowed_file_sizes[0] * len( + selected_node_s3_keys + ) + await _assert_compute_path_size( + fake_celery_task, + location_id, + user_id, + path=path, + expected_total_size=expected_total_size, + ) + + # get size of the outputs of one of the nodes + path = Path(project["uuid"]) / f"{selected_node_id}" / "outputs" + selected_node_s3_keys = [ + Path(s3_object_id) + for s3_object_id in list_of_files[selected_node_id] + if s3_object_id.startswith(f"{path}") + ] + expected_total_size = project_params.allowed_file_sizes[0] * len( + selected_node_s3_keys + ) + await _assert_compute_path_size( + fake_celery_task, + location_id, + user_id, + path=path, + expected_total_size=expected_total_size, + ) + + # get size of workspace in one of the nodes (this is semi-cached in the DB) + path = Path(project["uuid"]) / f"{selected_node_id}" / "workspace" + selected_node_s3_keys = [ + Path(s3_object_id) + for s3_object_id in list_of_files[selected_node_id] + if s3_object_id.startswith(f"{path}") + ] + expected_total_size = project_params.allowed_file_sizes[0] * len( + selected_node_s3_keys + ) + workspace_total_size = await _assert_compute_path_size( + fake_celery_task, + location_id, + user_id, + path=path, + expected_total_size=expected_total_size, + ) + + # get size of folders inside the workspace + folders_inside_workspace = [ + p[0] + for p in _filter_and_group_paths_one_level_deeper(selected_node_s3_keys, path) + if p[1] is False + ] + accumulated_subfolder_size = 0 + for workspace_subfolder in folders_inside_workspace: + selected_node_s3_keys = [ + Path(s3_object_id) + for s3_object_id in list_of_files[selected_node_id] + if s3_object_id.startswith(f"{workspace_subfolder}") + ] + expected_total_size = project_params.allowed_file_sizes[0] * len( + selected_node_s3_keys + ) + accumulated_subfolder_size += await _assert_compute_path_size( + fake_celery_task, + location_id, + user_id, + path=workspace_subfolder, + expected_total_size=expected_total_size, + ) + + assert workspace_total_size == accumulated_subfolder_size + + +async def test_path_compute_size_inexistent_path( + fake_celery_task: Task, + initialized_app: FastAPI, + client: httpx.AsyncClient, + location_id: LocationID, + user_id: UserID, + faker: Faker, + fake_datcore_tokens: tuple[str, str], +): + await _assert_compute_path_size( + fake_celery_task, + location_id, + user_id, + path=Path(faker.file_path(absolute=False)), + expected_total_size=0, + ) diff --git a/services/storage/tests/unit/test_dsm_dsmcleaner.py b/services/storage/tests/unit/test_dsm_dsmcleaner.py index 023cd5ee422..2f48b85bd03 100644 --- a/services/storage/tests/unit/test_dsm_dsmcleaner.py +++ b/services/storage/tests/unit/test_dsm_dsmcleaner.py @@ -34,7 +34,7 @@ from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager from sqlalchemy.ext.asyncio import AsyncEngine -pytest_simcore_core_services_selection = ["postgres"] +pytest_simcore_core_services_selection = ["postgres", "rabbit"] pytest_simcore_ops_services_selection = ["adminer"] _faker: Faker = Faker() diff --git a/services/storage/tests/unit/test_rpc_handlers_paths.py b/services/storage/tests/unit/test_rpc_handlers_paths.py new file mode 100644 index 00000000000..ee6787b22a3 --- /dev/null +++ b/services/storage/tests/unit/test_rpc_handlers_paths.py @@ -0,0 +1,71 @@ +# pylint:disable=no-name-in-module +# pylint:disable=protected-access +# pylint:disable=redefined-outer-name +# pylint:disable=too-many-arguments +# pylint:disable=too-many-positional-arguments +# pylint:disable=unused-argument +# pylint:disable=unused-variable + + +from collections.abc import Awaitable, Callable +from pathlib import Path +from unittest import mock + +import pytest +from faker import Faker +from fastapi import FastAPI +from models_library.projects_nodes_io import LocationID +from models_library.users import UserID +from pytest_mock import MockerFixture +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.storage.paths import compute_path_size +from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager + +pytest_simcore_core_services_selection = ["postgres", "rabbit"] +pytest_simcore_ops_services_selection = ["adminer"] + + +@pytest.fixture +async def storage_rabbitmq_rpc_client( + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], +) -> RabbitMQRPCClient: + rpc_client = await rabbitmq_rpc_client("pytest_storage_rpc_client") + assert rpc_client + return rpc_client + + +@pytest.fixture +async def mock_celery_send_task(mocker: MockerFixture, faker: Faker) -> mock.AsyncMock: + def mocked_send_task(*args, **kwargs): + return faker.uuid4() + + return mocker.patch( + "simcore_service_storage.modules.celery.client.CeleryTaskQueueClient.send_task", + side_effect=mocked_send_task, + ) + + +@pytest.mark.parametrize( + "location_id", + [SimcoreS3DataManager.get_location_id()], + ids=[SimcoreS3DataManager.get_location_name()], + indirect=True, +) +async def test_path_compute_size_calls_in_celery( + initialized_app: FastAPI, + storage_rabbitmq_rpc_client: RabbitMQRPCClient, + location_id: LocationID, + user_id: UserID, + faker: Faker, + mock_celery_send_task: mock.AsyncMock, +): + received, job_id_data = await compute_path_size( + storage_rabbitmq_rpc_client, + user_id=user_id, + product_name=faker.name(), + location_id=location_id, + path=Path(faker.file_path(absolute=False)), + ) + mock_celery_send_task.assert_called_once() + assert received + assert job_id_data diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index a2a2f0735c0..19343593caa 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -6058,6 +6058,34 @@ paths: application/json: schema: $ref: '#/components/schemas/CursorPage___T_Customized_PathMetaDataGet_' + /v0/storage/locations/{location_id}/paths/{path}:size: + post: + tags: + - storage + summary: Compute Path Size + description: Compute the size of a path + operationId: compute_path_size + parameters: + - name: location_id + in: path + required: true + schema: + type: integer + title: Location Id + - name: path + in: path + required: true + schema: + type: string + format: path + title: Path + responses: + '202': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Envelope_StorageAsyncJobGet_' /v0/storage/locations/{location_id}/datasets: get: tags: diff --git a/services/web/server/src/simcore_service_webserver/storage/_rest.py b/services/web/server/src/simcore_service_webserver/storage/_rest.py index 669bad63f21..6e244cef4c0 100644 --- a/services/web/server/src/simcore_service_webserver/storage/_rest.py +++ b/services/web/server/src/simcore_service_webserver/storage/_rest.py @@ -23,6 +23,7 @@ StorageAsyncJobGet, StorageAsyncJobResult, StorageAsyncJobStatus, + StoragePathComputeSizeParams, ) from models_library.projects_nodes_io import LocationID from models_library.utils.change_case import camel_to_snake @@ -44,6 +45,9 @@ list_jobs, submit_job, ) +from servicelib.rabbitmq.rpc_interfaces.storage.paths import ( + compute_path_size as remote_compute_path_size, +) from servicelib.request_keys import RQT_USERID_KEY from servicelib.rest_responses import unwrap_envelope from yarl import URL @@ -174,6 +178,33 @@ async def list_paths(request: web.Request) -> web.Response: return create_data_response(payload, status=resp_status) +@routes.post( + f"{_storage_locations_prefix}/{{location_id}}/paths/{{path}}:size", + name="compute_path_size", +) +@login_required +@permission_required("storage.files.*") +async def compute_path_size(request: web.Request) -> web.Response: + req_ctx = RequestContext.model_validate(request) + path_params = parse_request_path_parameters_as( + StoragePathComputeSizeParams, request + ) + + rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) + async_job, _ = await remote_compute_path_size( + rabbitmq_rpc_client, + user_id=req_ctx.user_id, + product_name=req_ctx.product_name, + location_id=path_params.location_id, + path=path_params.path, + ) + + return create_data_response( + async_job, + status=status.HTTP_202_ACCEPTED, + ) + + @routes.get( _storage_locations_prefix + "/{location_id}/datasets", name="list_datasets_metadata" ) @@ -472,7 +503,6 @@ async def get_async_jobs(request: web.Request) -> web.Response: @permission_required("storage.files.*") @handle_data_export_exceptions async def get_async_job_status(request: web.Request) -> web.Response: - class _PathParams(BaseModel): job_id: UUID diff --git a/services/web/server/tests/unit/with_dbs/01/storage/conftest.py b/services/web/server/tests/unit/with_dbs/01/storage/conftest.py index d3bf9a09ed5..051a2d23423 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/conftest.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/conftest.py @@ -9,6 +9,7 @@ from pathlib import Path from threading import Thread from typing import Annotated +from urllib.parse import quote import pytest import uvicorn @@ -189,11 +190,15 @@ async def upload_file( abort_url = ( URL(f"{request.url}") .with_path( - request.app.url_path_for( - "abort_upload_file", - location_id=f"{location_id}", - file_id=file_id, - ) + quote( + request.app.url_path_for( + "abort_upload_file", + location_id=f"{location_id}", + file_id=file_id, + ), + safe=":/", + ), + encoded=True, ) .with_query(user_id=user_id) ) @@ -201,11 +206,15 @@ async def upload_file( complete_url = ( URL(f"{request.url}") .with_path( - request.app.url_path_for( - "complete_upload_file", - location_id=f"{location_id}", - file_id=file_id, - ) + quote( + request.app.url_path_for( + "complete_upload_file", + location_id=f"{location_id}", + file_id=file_id, + ), + safe=":/", + ), + encoded=True, ) .with_query(user_id=user_id) ) @@ -234,8 +243,7 @@ async def complete_upload_file( file_id: StorageFileID, body: FileUploadCompletionBody, request: Request, - ): - ... + ): ... @router.post( "/locations/{location_id}/files/{file_id:path}:abort", @@ -246,8 +254,7 @@ async def abort_upload_file( location_id: LocationID, file_id: StorageFileID, request: Request, - ): - ... + ): ... app.include_router(router) diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index 13d124b0f64..9eb5d42407f 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -3,6 +3,7 @@ # pylint: disable=unused-variable # pylint: disable=too-many-arguments +from collections.abc import Callable from typing import Any from urllib.parse import quote @@ -10,6 +11,7 @@ from aiohttp.test_utils import TestClient from faker import Faker from fastapi_pagination.cursor import CursorPage +from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobGet, AsyncJobId from models_library.api_schemas_storage.storage_schemas import ( DatasetMetaDataGet, FileLocation, @@ -17,10 +19,15 @@ FileUploadSchema, PathMetaDataGet, ) +from models_library.api_schemas_webserver.storage import StorageAsyncJobGet from models_library.projects_nodes_io import LocationID, StorageFileID from pydantic import TypeAdapter +from pytest_mock import MockerFixture from pytest_simcore.helpers.assert_checks import assert_status from servicelib.aiohttp import status +from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import ( + submit_job, +) from simcore_postgres_database.models.users import UserRole API_VERSION = "v0" @@ -46,7 +53,7 @@ async def test_list_storage_locations( url = "/v0/storage/locations" assert url.startswith(PREFIX) - resp = await client.get(url, params={"user_id": logged_user["id"]}) + resp = await client.get(url) data, error = await assert_status(resp, expected) if not error: @@ -74,12 +81,74 @@ async def test_list_storage_paths( assert client.app url = client.app.router["list_storage_paths"].url_for(location_id=f"{location_id}") - resp = await client.get(f"{url}", params={"user_id": logged_user["id"]}) + resp = await client.get(f"{url}") data, error = await assert_status(resp, expected) if not error: TypeAdapter(CursorPage[PathMetaDataGet]).validate_python(data) +_faker = Faker() + + +@pytest.fixture +def create_storage_paths_rpc_client_mock( + mocker: MockerFixture, +) -> Callable[[str, Any], None]: + def _(method: str, result_or_exception: Any): + def side_effect(*args, **kwargs): + if isinstance(result_or_exception, Exception): + raise result_or_exception + + return result_or_exception + + for fct in (f"servicelib.rabbitmq.rpc_interfaces.storage.paths.{method}",): + mocker.patch(fct, side_effect=side_effect) + + return _ + + +@pytest.mark.parametrize( + "user_role,expected", + [ + (UserRole.ANONYMOUS, status.HTTP_401_UNAUTHORIZED), + (UserRole.GUEST, status.HTTP_202_ACCEPTED), + (UserRole.USER, status.HTTP_202_ACCEPTED), + (UserRole.TESTER, status.HTTP_202_ACCEPTED), + ], +) +@pytest.mark.parametrize( + "backend_result_or_exception", + [ + AsyncJobGet(job_id=AsyncJobId(f"{_faker.uuid4()}")), + ], + ids=lambda x: type(x).__name__, +) +async def test_compute_path_size( + client: TestClient, + logged_user: dict[str, Any], + expected: int, + location_id: LocationID, + faker: Faker, + create_storage_paths_rpc_client_mock: Callable[[str, Any], None], + backend_result_or_exception: Any, +): + create_storage_paths_rpc_client_mock( + submit_job.__name__, + backend_result_or_exception, + ) + + assert client.app + url = client.app.router["compute_path_size"].url_for( + location_id=f"{location_id}", + path=quote(faker.file_path(absolute=False), safe=""), + ) + + resp = await client.post(f"{url}") + data, error = await assert_status(resp, expected) + if not error: + TypeAdapter(StorageAsyncJobGet).validate_python(data) + + @pytest.mark.parametrize( "user_role,expected", [ @@ -101,7 +170,7 @@ async def test_list_datasets_metadata( assert url == str(_url) - resp = await client.get(url, params={"user_id": logged_user["id"]}) + resp = await client.get(url) data, error = await assert_status(resp, expected) if not error: @@ -134,7 +203,7 @@ async def test_list_dataset_files_metadata( assert url == str(_url) - resp = await client.get(url, params={"user_id": logged_user["id"]}) + resp = await client.get(url) data, error = await assert_status(resp, expected) if not error: @@ -169,7 +238,7 @@ async def test_storage_file_meta( assert url.startswith(PREFIX) - resp = await client.get(url, params={"user_id": logged_user["id"]}) + resp = await client.get(url) data, error = await assert_status(resp, expected) if not error: @@ -200,7 +269,7 @@ async def test_storage_list_filter( assert url.startswith(PREFIX) - resp = await client.get(url, params={"user_id": logged_user["id"]}) + resp = await client.get(url) data, error = await assert_status(resp, expected) if not error: @@ -213,7 +282,7 @@ async def test_storage_list_filter( @pytest.fixture def file_id(faker: Faker) -> StorageFileID: return TypeAdapter(StorageFileID).validate_python( - f"{faker.uuid4()}/{faker.uuid4()}/{faker.file_name()} with spaces.dat" + f"{faker.uuid4()}/{faker.uuid4()}/{faker.file_name()} with spaces().dat" ) @@ -236,7 +305,7 @@ async def test_upload_file( assert url.startswith(PREFIX) - resp = await client.put(url, params={"user_id": logged_user["id"]}) + resp = await client.put(url) data, error = await assert_status(resp, expected) if not error: assert not error @@ -244,10 +313,7 @@ async def test_upload_file( file_upload_schema = FileUploadSchema.model_validate(data) # let's abort - resp = await client.post( - f"{file_upload_schema.links.abort_upload.path}", - params={"user_id": logged_user["id"]}, - ) + resp = await client.post(f"{file_upload_schema.links.abort_upload.path}") data, error = await assert_status(resp, status.HTTP_204_NO_CONTENT) assert not error assert not data diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage_rpc.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage_rpc.py index 910836c0245..ba7b910b24d 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage_rpc.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage_rpc.py @@ -1,8 +1,9 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument +from collections.abc import Callable from datetime import datetime from pathlib import Path -from typing import Any, Callable +from typing import Any import pytest from aiohttp.test_utils import TestClient @@ -53,10 +54,8 @@ def side_effect(*args, **kwargs): return result_or_exception - mocker.patch( - f"simcore_service_webserver.storage._rest.{method}", - side_effect=side_effect, - ) + for fct in (f"simcore_service_webserver.storage._rest.{method}",): + mocker.patch(fct, side_effect=side_effect) return _ diff --git a/tests/e2e-playwright/tests/conftest.py b/tests/e2e-playwright/tests/conftest.py index d2c7fb6e87d..7df7c63c5e6 100644 --- a/tests/e2e-playwright/tests/conftest.py +++ b/tests/e2e-playwright/tests/conftest.py @@ -458,13 +458,11 @@ def _select_service_version(page: Page, *, version: str) -> None: # since https://github.com/ITISFoundation/osparc-simcore/pull/7060 with log_context(logging.INFO, msg=f"selecting version {version}"): page.get_by_test_id("serviceSelectBox").click(timeout=5 * SECOND) - with page.expect_response( - re.compile(r"/catalog/services/[^/]+/(?P.+)"), - timeout=1.5 * 5 * SECOND, - ): - page.get_by_test_id(f"serviceVersionItem_{version}").click( - timeout=5 * SECOND - ) + page.get_by_test_id(f"serviceVersionItem_{version}").click( + timeout=5 * SECOND + ) + # the call is cached so the best is to wait here a bit (sic) + page.wait_for_timeout(2 * SECOND) except TimeoutError: # we try the non robust way