Skip to content

✨Computation of path size: WebAPI + RPC to storage through Asyncjobs #7315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 41 commits into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
bec5f0c
fixed tests
sanderegg Mar 11, 2025
b5407f3
prepare webserver API
sanderegg Mar 6, 2025
68f3477
preparing for RPC
sanderegg Mar 6, 2025
15f70a7
moved to RPC interface
sanderegg Mar 6, 2025
75327c8
tested webserver API
sanderegg Mar 6, 2025
e0230d5
refactored
sanderegg Mar 6, 2025
5ae04fb
fixed return value
sanderegg Mar 6, 2025
00f582f
ongoing
sanderegg Mar 6, 2025
dff0494
initial test
sanderegg Mar 6, 2025
e30ed80
enable rabbit mq by defaulkt
sanderegg Mar 6, 2025
3edd060
prepare test through RPC
sanderegg Mar 6, 2025
df5c449
ongoing
sanderegg Mar 6, 2025
b1a2bb0
prepare calls to celery
sanderegg Mar 13, 2025
be76104
rename
sanderegg Mar 13, 2025
b8864ce
added worker api entrypoint
sanderegg Mar 13, 2025
e3001cb
preparing
sanderegg Mar 13, 2025
f8fe334
setup the celery client when not worker
sanderegg Mar 13, 2025
af317f7
in devel mode we also want a worker
sanderegg Mar 14, 2025
b4b3835
celery api
sanderegg Mar 14, 2025
697bbfc
serialization
sanderegg Mar 14, 2025
9a3f2a5
ruff
sanderegg Mar 14, 2025
d44ebc9
added watchdog to auto-restart in devel mode
sanderegg Mar 14, 2025
441cb22
define task
sanderegg Mar 14, 2025
c9777f3
async tasks run as is
sanderegg Mar 14, 2025
5df4e1a
allow passing specific types
sanderegg Mar 14, 2025
7b0074a
mypy
sanderegg Mar 14, 2025
32527b5
fix dependencies
sanderegg Mar 14, 2025
a8b1842
nothing to do here
sanderegg Mar 14, 2025
7bd9642
hmm
sanderegg Mar 14, 2025
6d4030a
renaming and refactor
sanderegg Mar 16, 2025
876eea1
name
sanderegg Mar 16, 2025
bc0c209
fixed test
sanderegg Mar 16, 2025
d5b77c2
test worker task
sanderegg Mar 16, 2025
78ccf0e
clean
sanderegg Mar 16, 2025
68ca47e
mypy
sanderegg Mar 16, 2025
4921c51
fix call
sanderegg Mar 16, 2025
1344d4d
fixed test
sanderegg Mar 16, 2025
59a6dbb
fixed test
sanderegg Mar 16, 2025
a324fb3
fix unit tests
sanderegg Mar 17, 2025
57fa3d8
@GitHK review: remove unused stuff
sanderegg Mar 17, 2025
08f0994
e2e bugfix
sanderegg Mar 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/specs/web-server/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
StorageAsyncJobResult,
StorageAsyncJobStatus,
StorageLocationPathParams,
StoragePathComputeSizeParams,
)
from models_library.generics import Envelope
from models_library.projects_nodes_io import LocationID
Expand Down Expand Up @@ -68,6 +69,15 @@ async def list_storage_paths(
"""Lists the files/directories in WorkingDirectory"""


@router.post(
"/storage/locations/{location_id}/paths/{path}:size",
response_model=Envelope[StorageAsyncJobGet],
status_code=status.HTTP_202_ACCEPTED,
)
async def compute_path_size(_path: Annotated[StoragePathComputeSizeParams, Depends()]):
"""Compute the size of a path"""


@router.get(
"/storage/locations/{location_id}/datasets",
response_model=Envelope[list[DatasetMetaData]],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from pathlib import Path
from typing import Annotated, Any

from models_library.api_schemas_storage.storage_schemas import (
DEFAULT_NUMBER_OF_PATHS_PER_PAGE,
MAX_NUMBER_OF_PATHS_PER_PAGE,
)
from pydantic import BaseModel, Field

from ..api_schemas_rpc_async_jobs.async_jobs import (
Expand All @@ -14,6 +10,10 @@
AsyncJobStatus,
)
from ..api_schemas_storage.data_export_async_jobs import DataExportTaskStartInput
from ..api_schemas_storage.storage_schemas import (
DEFAULT_NUMBER_OF_PATHS_PER_PAGE,
MAX_NUMBER_OF_PATHS_PER_PAGE,
)
from ..progress_bar import ProgressReport
from ..projects_nodes_io import LocationID, StorageFileID
from ..rest_pagination import (
Expand All @@ -26,6 +26,10 @@ class StorageLocationPathParams(BaseModel):
location_id: LocationID


class StoragePathComputeSizeParams(StorageLocationPathParams):
path: Path


class ListPathsQueryParams(InputSchema, CursorQueryParameters):
file_filter: Path | None = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def abort(
*,
rpc_namespace: RPCNamespace,
job_id: AsyncJobId,
job_id_data: AsyncJobNameData
job_id_data: AsyncJobNameData,
) -> AsyncJobAbort:
result = await rabbitmq_rpc_client.request(
rpc_namespace,
Expand All @@ -41,7 +41,7 @@ async def get_status(
*,
rpc_namespace: RPCNamespace,
job_id: AsyncJobId,
job_id_data: AsyncJobNameData
job_id_data: AsyncJobNameData,
) -> AsyncJobStatus:
result = await rabbitmq_rpc_client.request(
rpc_namespace,
Expand All @@ -59,7 +59,7 @@ async def get_result(
*,
rpc_namespace: RPCNamespace,
job_id: AsyncJobId,
job_id_data: AsyncJobNameData
job_id_data: AsyncJobNameData,
) -> AsyncJobResult:
result = await rabbitmq_rpc_client.request(
rpc_namespace,
Expand All @@ -77,7 +77,7 @@ async def list_jobs(
*,
rpc_namespace: RPCNamespace,
filter_: str,
job_id_data: AsyncJobNameData
job_id_data: AsyncJobNameData,
) -> list[AsyncJobGet]:
result: list[AsyncJobGet] = await rabbitmq_rpc_client.request(
rpc_namespace,
Expand All @@ -95,7 +95,7 @@ async def submit_job(
rpc_namespace: RPCNamespace,
method_name: str,
job_id_data: AsyncJobNameData,
**kwargs
**kwargs,
) -> AsyncJobGet:
result = await rabbitmq_rpc_client.request(
rpc_namespace,
Expand All @@ -104,5 +104,5 @@ async def submit_job(
**kwargs,
timeout_s=_DEFAULT_TIMEOUT_S,
)
assert isinstance(result, AsyncJobGet)
assert isinstance(result, AsyncJobGet) # nosec
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pathlib import Path

from models_library.api_schemas_rpc_async_jobs.async_jobs import (
AsyncJobNameData,
)
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
from models_library.api_schemas_webserver.storage import StorageAsyncJobGet
from models_library.projects_nodes_io import LocationID
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.users import UserID

from ..._client_rpc import RabbitMQRPCClient
from ..async_jobs.async_jobs import submit_job


async def compute_path_size(
client: RabbitMQRPCClient,
*,
user_id: UserID,
product_name: str,
location_id: LocationID,
path: Path,
) -> tuple[StorageAsyncJobGet, AsyncJobNameData]:
job_id_data = AsyncJobNameData(user_id=user_id, product_name=product_name)
async_job_rpc_get = await submit_job(
rabbitmq_rpc_client=client,
rpc_namespace=STORAGE_RPC_NAMESPACE,
method_name=RPCMethodName("compute_path_size"),
job_id_data=job_id_data,
location_id=location_id,
path=path,
)
return StorageAsyncJobGet.from_rpc_schema(async_job_rpc_get), job_id_data
36 changes: 25 additions & 11 deletions services/storage/docker/boot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,32 @@ APP_LOG_LEVEL=${STORAGE_LOGLEVEL:-${LOG_LEVEL:-${LOGLEVEL:-INFO}}}
SERVER_LOG_LEVEL=$(echo "${APP_LOG_LEVEL}" | tr '[:upper:]' '[:lower:]')
echo "$INFO" "Log-level app/server: $APP_LOG_LEVEL/$SERVER_LOG_LEVEL"

if [ "${SC_BOOT_MODE}" = "debug" ]; then
reload_dir_packages=$(find /devel/packages -maxdepth 3 -type d -path "*/src/*" ! -path "*.*" -exec echo '--reload-dir {} \' \;)
if [ "${STORAGE_WORKER_MODE}" = "true" ]; then
if [ "${SC_BOOT_MODE}" = "debug" ]; then
exec watchmedo auto-restart \
--directory /devel/packages \
--directory services/storage \
--pattern "*.py" \
--recursive \
-- \
celery \
--app=simcore_service_storage.modules.celery.worker_main:app \
worker --pool=threads \
--loglevel="${SERVER_LOG_LEVEL}" \
--concurrency="${CELERY_CONCURRENCY}"
else
exec celery \
--app=simcore_service_storage.modules.celery.worker_main:app \
worker --pool=threads \
--loglevel="${SERVER_LOG_LEVEL}" \
--hostname="${HOSTNAME}" \
--concurrency="${CELERY_CONCURRENCY}"
fi
else
if [ "${SC_BOOT_MODE}" = "debug" ]; then
reload_dir_packages=$(find /devel/packages -maxdepth 3 -type d -path "*/src/*" ! -path "*.*" -exec echo '--reload-dir {} \' \;)

exec sh -c "
exec sh -c "
cd services/storage/src/simcore_service_storage && \
python -m debugpy --listen 0.0.0.0:${STORAGE_REMOTE_DEBUGGING_PORT} -m uvicorn main:app \
--host 0.0.0.0 \
Expand All @@ -55,14 +77,6 @@ if [ "${SC_BOOT_MODE}" = "debug" ]; then
--reload-dir . \
--log-level \"${SERVER_LOG_LEVEL}\"
"
else
if [ "${STORAGE_WORKER_MODE}" = "true" ]; then
exec celery \
--app=simcore_service_storage.modules.celery.worker_main:app \
worker --pool=threads \
--loglevel="${SERVER_LOG_LEVEL}" \
--hostname="${HOSTNAME}" \
--concurrency="${CELERY_CONCURRENCY}"
else
exec uvicorn simcore_service_storage.main:app \
--host 0.0.0.0 \
Expand Down
1 change: 1 addition & 0 deletions services/storage/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ pydantic[dotenv]
tenacity
typer
types-aiobotocore[s3] # s3 storage
watchdog
2 changes: 2 additions & 0 deletions services/storage/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,8 @@ vine==5.1.0
# amqp
# celery
# kombu
watchdog==6.0.0
# via -r requirements/_base.in
watchfiles==1.0.4
# via uvicorn
wcwidth==0.2.13
Expand Down
1 change: 0 additions & 1 deletion services/storage/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ setuptools==75.8.2
# via
# moto
# pytest-celery
# via moto
simcore-service-storage-sdk @ git+https://github.com/ITISFoundation/osparc-simcore.git@cfdf4f86d844ebb362f4f39e9c6571d561b72897#subdirectory=services/storage/client-sdk/python
# via -r requirements/_test.in
six==1.17.0
Expand Down
4 changes: 3 additions & 1 deletion services/storage/requirements/_tools.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ typing-extensions==4.12.2
virtualenv==20.29.2
# via pre-commit
watchdog==6.0.0
# via -r requirements/_tools.in
# via
# -c requirements/_base.txt
# -r requirements/_tools.in
wheel==0.45.1
# via pip-tools
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging
from pathlib import Path

from celery import Task # type: ignore[import-untyped]
from models_library.projects_nodes_io import LocationID
from models_library.users import UserID
from pydantic import ByteSize
from servicelib.logging_utils import log_context

from ...dsm import get_dsm_provider
from ...modules.celery.utils import get_fastapi_app

_logger = logging.getLogger(__name__)


async def compute_path_size(
task: Task, user_id: UserID, location_id: LocationID, path: Path
) -> ByteSize:
with log_context(
_logger,
logging.INFO,
msg=f"computing path size {user_id=}, {location_id=}, {path=}",
):
dsm = get_dsm_provider(get_fastapi_app(task.app)).get(location_id)
return await dsm.compute_path_size(user_id, path=Path(path))
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging

from celery import Celery # type: ignore[import-untyped]
from servicelib.logging_utils import log_context

from ...modules.celery._celery_types import register_celery_types
from ...modules.celery._task import define_task
from ...modules.celery.tasks import export_data
from ._paths import compute_path_size

_logger = logging.getLogger(__name__)


def setup_worker_tasks(app: Celery) -> None:
register_celery_types()
with log_context(
_logger,
logging.INFO,
msg="Storage setup Worker Tasks",
):
define_task(app, export_data)
define_task(app, compute_path_size)
35 changes: 35 additions & 0 deletions services/storage/src/simcore_service_storage/api/rpc/_paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from pathlib import Path

from fastapi import FastAPI
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
AsyncJobGet,
AsyncJobNameData,
)
from models_library.projects_nodes_io import LocationID
from servicelib.rabbitmq import RPCRouter

from ...modules.celery import get_celery_client
from .._worker_tasks._paths import compute_path_size as remote_compute_path_size

router = RPCRouter()


@router.expose(reraise_if_error_type=None)
async def compute_path_size(
app: FastAPI,
job_id_data: AsyncJobNameData,
# user_id: UserID,
location_id: LocationID,
path: Path,
) -> AsyncJobGet:
assert app # nosec

task_uuid = await get_celery_client(app).send_task(
remote_compute_path_size.__name__,
task_context=job_id_data.model_dump(),
user_id=job_id_data.user_id,
location_id=location_id,
path=path,
)

return AsyncJobGet(job_id=task_uuid)
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
from servicelib.rabbitmq import RPCRouter

from ...modules.rabbitmq import get_rabbitmq_rpc_server
from . import _async_jobs, _data_export
from . import _async_jobs, _data_export, _paths

_logger = logging.getLogger(__name__)


ROUTERS: list[RPCRouter] = [_data_export.router, _async_jobs.router]
ROUTERS: list[RPCRouter] = [
_async_jobs.router,
_data_export.router,
_paths.router,
]


def setup_rpc_api_routes(app: FastAPI) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from ..dsm import setup_dsm
from ..dsm_cleaner import setup_dsm_cleaner
from ..exceptions.handlers import set_exception_handlers
from ..modules.celery import setup_celery_client
from ..modules.db import setup_db
from ..modules.long_running_tasks import setup_rest_api_long_running_tasks_for_uploads
from ..modules.rabbitmq import setup as setup_rabbitmq
Expand All @@ -53,7 +54,7 @@
_logger = logging.getLogger(__name__)


def create_app(settings: ApplicationSettings) -> FastAPI:
def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901
# keep mostly quiet noisy loggers
quiet_level: int = max(
min(logging.root.level + _LOG_LEVEL_STEP, logging.CRITICAL), logging.WARNING
Expand Down Expand Up @@ -86,6 +87,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
setup_rabbitmq(app)
if not settings.STORAGE_WORKER_MODE:
setup_rpc_api_routes(app)
setup_celery_client(app)
setup_rest_api_long_running_tasks_for_uploads(app)
setup_rest_api_routes(app, API_VTAG)
set_exception_handlers(app)
Expand Down
6 changes: 1 addition & 5 deletions services/storage/src/simcore_service_storage/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from servicelib.logging_utils import config_all_loggers
from simcore_service_storage.core.application import create_app
from simcore_service_storage.core.settings import ApplicationSettings
from simcore_service_storage.modules.celery import setup_celery

_settings = ApplicationSettings.create_from_envs()

Expand All @@ -20,7 +19,4 @@

_logger = logging.getLogger(__name__)

fastapi_app = create_app(_settings)
setup_celery(fastapi_app)

app = fastapi_app
app = create_app(_settings)
Loading
Loading