Skip to content

Commit 0bfbbb8

Browse files
sandereggmrnicegyu11
authored andcommitted
✨Computation of path size: WebAPI + RPC to storage through Asyncjobs (ITISFoundation#7315)
1 parent 3d465ad commit 0bfbbb8

File tree

34 files changed

+773
-126
lines changed

34 files changed

+773
-126
lines changed

api/specs/web-server/_storage.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
StorageAsyncJobResult,
2727
StorageAsyncJobStatus,
2828
StorageLocationPathParams,
29+
StoragePathComputeSizeParams,
2930
)
3031
from models_library.generics import Envelope
3132
from models_library.projects_nodes_io import LocationID
@@ -68,6 +69,15 @@ async def list_storage_paths(
6869
"""Lists the files/directories in WorkingDirectory"""
6970

7071

72+
@router.post(
73+
"/storage/locations/{location_id}/paths/{path}:size",
74+
response_model=Envelope[StorageAsyncJobGet],
75+
status_code=status.HTTP_202_ACCEPTED,
76+
)
77+
async def compute_path_size(_path: Annotated[StoragePathComputeSizeParams, Depends()]):
78+
"""Compute the size of a path"""
79+
80+
7181
@router.get(
7282
"/storage/locations/{location_id}/datasets",
7383
response_model=Envelope[list[DatasetMetaData]],

packages/models-library/src/models_library/api_schemas_webserver/storage.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
from pathlib import Path
22
from typing import Annotated, Any
33

4-
from models_library.api_schemas_storage.storage_schemas import (
5-
DEFAULT_NUMBER_OF_PATHS_PER_PAGE,
6-
MAX_NUMBER_OF_PATHS_PER_PAGE,
7-
)
84
from pydantic import BaseModel, Field
95

106
from ..api_schemas_rpc_async_jobs.async_jobs import (
@@ -14,6 +10,10 @@
1410
AsyncJobStatus,
1511
)
1612
from ..api_schemas_storage.data_export_async_jobs import DataExportTaskStartInput
13+
from ..api_schemas_storage.storage_schemas import (
14+
DEFAULT_NUMBER_OF_PATHS_PER_PAGE,
15+
MAX_NUMBER_OF_PATHS_PER_PAGE,
16+
)
1717
from ..progress_bar import ProgressReport
1818
from ..projects_nodes_io import LocationID, StorageFileID
1919
from ..rest_pagination import (
@@ -26,6 +26,10 @@ class StorageLocationPathParams(BaseModel):
2626
location_id: LocationID
2727

2828

29+
class StoragePathComputeSizeParams(StorageLocationPathParams):
30+
path: Path
31+
32+
2933
class ListPathsQueryParams(InputSchema, CursorQueryParameters):
3034
file_filter: Path | None = None
3135

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async def abort(
2323
*,
2424
rpc_namespace: RPCNamespace,
2525
job_id: AsyncJobId,
26-
job_id_data: AsyncJobNameData
26+
job_id_data: AsyncJobNameData,
2727
) -> AsyncJobAbort:
2828
result = await rabbitmq_rpc_client.request(
2929
rpc_namespace,
@@ -41,7 +41,7 @@ async def get_status(
4141
*,
4242
rpc_namespace: RPCNamespace,
4343
job_id: AsyncJobId,
44-
job_id_data: AsyncJobNameData
44+
job_id_data: AsyncJobNameData,
4545
) -> AsyncJobStatus:
4646
result = await rabbitmq_rpc_client.request(
4747
rpc_namespace,
@@ -59,7 +59,7 @@ async def get_result(
5959
*,
6060
rpc_namespace: RPCNamespace,
6161
job_id: AsyncJobId,
62-
job_id_data: AsyncJobNameData
62+
job_id_data: AsyncJobNameData,
6363
) -> AsyncJobResult:
6464
result = await rabbitmq_rpc_client.request(
6565
rpc_namespace,
@@ -77,7 +77,7 @@ async def list_jobs(
7777
*,
7878
rpc_namespace: RPCNamespace,
7979
filter_: str,
80-
job_id_data: AsyncJobNameData
80+
job_id_data: AsyncJobNameData,
8181
) -> list[AsyncJobGet]:
8282
result: list[AsyncJobGet] = await rabbitmq_rpc_client.request(
8383
rpc_namespace,
@@ -95,7 +95,7 @@ async def submit_job(
9595
rpc_namespace: RPCNamespace,
9696
method_name: str,
9797
job_id_data: AsyncJobNameData,
98-
**kwargs
98+
**kwargs,
9999
) -> AsyncJobGet:
100100
result = await rabbitmq_rpc_client.request(
101101
rpc_namespace,
@@ -104,5 +104,5 @@ async def submit_job(
104104
**kwargs,
105105
timeout_s=_DEFAULT_TIMEOUT_S,
106106
)
107-
assert isinstance(result, AsyncJobGet)
107+
assert isinstance(result, AsyncJobGet) # nosec
108108
return result

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/__init__.py

Whitespace-only changes.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from pathlib import Path
2+
3+
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
4+
AsyncJobNameData,
5+
)
6+
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
7+
from models_library.api_schemas_webserver.storage import StorageAsyncJobGet
8+
from models_library.projects_nodes_io import LocationID
9+
from models_library.rabbitmq_basic_types import RPCMethodName
10+
from models_library.users import UserID
11+
12+
from ..._client_rpc import RabbitMQRPCClient
13+
from ..async_jobs.async_jobs import submit_job
14+
15+
16+
async def compute_path_size(
17+
client: RabbitMQRPCClient,
18+
*,
19+
user_id: UserID,
20+
product_name: str,
21+
location_id: LocationID,
22+
path: Path,
23+
) -> tuple[StorageAsyncJobGet, AsyncJobNameData]:
24+
job_id_data = AsyncJobNameData(user_id=user_id, product_name=product_name)
25+
async_job_rpc_get = await submit_job(
26+
rabbitmq_rpc_client=client,
27+
rpc_namespace=STORAGE_RPC_NAMESPACE,
28+
method_name=RPCMethodName("compute_path_size"),
29+
job_id_data=job_id_data,
30+
location_id=location_id,
31+
path=path,
32+
)
33+
return StorageAsyncJobGet.from_rpc_schema(async_job_rpc_get), job_id_data

services/storage/docker/boot.sh

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,32 @@ APP_LOG_LEVEL=${STORAGE_LOGLEVEL:-${LOG_LEVEL:-${LOGLEVEL:-INFO}}}
4242
SERVER_LOG_LEVEL=$(echo "${APP_LOG_LEVEL}" | tr '[:upper:]' '[:lower:]')
4343
echo "$INFO" "Log-level app/server: $APP_LOG_LEVEL/$SERVER_LOG_LEVEL"
4444

45-
if [ "${SC_BOOT_MODE}" = "debug" ]; then
46-
reload_dir_packages=$(find /devel/packages -maxdepth 3 -type d -path "*/src/*" ! -path "*.*" -exec echo '--reload-dir {} \' \;)
45+
if [ "${STORAGE_WORKER_MODE}" = "true" ]; then
46+
if [ "${SC_BOOT_MODE}" = "debug" ]; then
47+
exec watchmedo auto-restart \
48+
--directory /devel/packages \
49+
--directory services/storage \
50+
--pattern "*.py" \
51+
--recursive \
52+
-- \
53+
celery \
54+
--app=simcore_service_storage.modules.celery.worker_main:app \
55+
worker --pool=threads \
56+
--loglevel="${SERVER_LOG_LEVEL}" \
57+
--concurrency="${CELERY_CONCURRENCY}"
58+
else
59+
exec celery \
60+
--app=simcore_service_storage.modules.celery.worker_main:app \
61+
worker --pool=threads \
62+
--loglevel="${SERVER_LOG_LEVEL}" \
63+
--hostname="${HOSTNAME}" \
64+
--concurrency="${CELERY_CONCURRENCY}"
65+
fi
66+
else
67+
if [ "${SC_BOOT_MODE}" = "debug" ]; then
68+
reload_dir_packages=$(find /devel/packages -maxdepth 3 -type d -path "*/src/*" ! -path "*.*" -exec echo '--reload-dir {} \' \;)
4769

48-
exec sh -c "
70+
exec sh -c "
4971
cd services/storage/src/simcore_service_storage && \
5072
python -m debugpy --listen 0.0.0.0:${STORAGE_REMOTE_DEBUGGING_PORT} -m uvicorn main:app \
5173
--host 0.0.0.0 \
@@ -55,14 +77,6 @@ if [ "${SC_BOOT_MODE}" = "debug" ]; then
5577
--reload-dir . \
5678
--log-level \"${SERVER_LOG_LEVEL}\"
5779
"
58-
else
59-
if [ "${STORAGE_WORKER_MODE}" = "true" ]; then
60-
exec celery \
61-
--app=simcore_service_storage.modules.celery.worker_main:app \
62-
worker --pool=threads \
63-
--loglevel="${SERVER_LOG_LEVEL}" \
64-
--hostname="${HOSTNAME}" \
65-
--concurrency="${CELERY_CONCURRENCY}"
6680
else
6781
exec uvicorn simcore_service_storage.main:app \
6882
--host 0.0.0.0 \

services/storage/requirements/_base.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,4 @@ pydantic[dotenv]
2929
tenacity
3030
typer
3131
types-aiobotocore[s3] # s3 storage
32+
watchdog

services/storage/requirements/_base.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,8 @@ vine==5.1.0
984984
# amqp
985985
# celery
986986
# kombu
987+
watchdog==6.0.0
988+
# via -r requirements/_base.in
987989
watchfiles==1.0.4
988990
# via uvicorn
989991
wcwidth==0.2.13

services/storage/requirements/_test.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,6 @@ setuptools==75.8.2
374374
# via
375375
# moto
376376
# pytest-celery
377-
# via moto
378377
simcore-service-storage-sdk @ git+https://github.com/ITISFoundation/osparc-simcore.git@cfdf4f86d844ebb362f4f39e9c6571d561b72897#subdirectory=services/storage/client-sdk/python
379378
# via -r requirements/_test.in
380379
six==1.17.0

services/storage/requirements/_tools.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ typing-extensions==4.12.2
8787
virtualenv==20.29.2
8888
# via pre-commit
8989
watchdog==6.0.0
90-
# via -r requirements/_tools.in
90+
# via
91+
# -c requirements/_base.txt
92+
# -r requirements/_tools.in
9193
wheel==0.45.1
9294
# via pip-tools

services/storage/src/simcore_service_storage/api/_worker_tasks/__init__.py

Whitespace-only changes.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import logging
2+
from pathlib import Path
3+
4+
from celery import Task # type: ignore[import-untyped]
5+
from models_library.projects_nodes_io import LocationID
6+
from models_library.users import UserID
7+
from pydantic import ByteSize
8+
from servicelib.logging_utils import log_context
9+
10+
from ...dsm import get_dsm_provider
11+
from ...modules.celery.utils import get_fastapi_app
12+
13+
_logger = logging.getLogger(__name__)
14+
15+
16+
async def compute_path_size(
17+
task: Task, user_id: UserID, location_id: LocationID, path: Path
18+
) -> ByteSize:
19+
with log_context(
20+
_logger,
21+
logging.INFO,
22+
msg=f"computing path size {user_id=}, {location_id=}, {path=}",
23+
):
24+
dsm = get_dsm_provider(get_fastapi_app(task.app)).get(location_id)
25+
return await dsm.compute_path_size(user_id, path=Path(path))
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import logging
2+
3+
from celery import Celery # type: ignore[import-untyped]
4+
from servicelib.logging_utils import log_context
5+
6+
from ...modules.celery._celery_types import register_celery_types
7+
from ...modules.celery._task import define_task
8+
from ...modules.celery.tasks import export_data
9+
from ._paths import compute_path_size
10+
11+
_logger = logging.getLogger(__name__)
12+
13+
14+
def setup_worker_tasks(app: Celery) -> None:
15+
register_celery_types()
16+
with log_context(
17+
_logger,
18+
logging.INFO,
19+
msg="Storage setup Worker Tasks",
20+
):
21+
define_task(app, export_data)
22+
define_task(app, compute_path_size)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from pathlib import Path
2+
3+
from fastapi import FastAPI
4+
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
5+
AsyncJobGet,
6+
AsyncJobNameData,
7+
)
8+
from models_library.projects_nodes_io import LocationID
9+
from servicelib.rabbitmq import RPCRouter
10+
11+
from ...modules.celery import get_celery_client
12+
from .._worker_tasks._paths import compute_path_size as remote_compute_path_size
13+
14+
router = RPCRouter()
15+
16+
17+
@router.expose(reraise_if_error_type=None)
18+
async def compute_path_size(
19+
app: FastAPI,
20+
job_id_data: AsyncJobNameData,
21+
# user_id: UserID,
22+
location_id: LocationID,
23+
path: Path,
24+
) -> AsyncJobGet:
25+
assert app # nosec
26+
27+
task_uuid = await get_celery_client(app).send_task(
28+
remote_compute_path_size.__name__,
29+
task_context=job_id_data.model_dump(),
30+
user_id=job_id_data.user_id,
31+
location_id=location_id,
32+
path=path,
33+
)
34+
35+
return AsyncJobGet(job_id=task_uuid)

services/storage/src/simcore_service_storage/api/rpc/routes.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,16 @@
66
from servicelib.rabbitmq import RPCRouter
77

88
from ...modules.rabbitmq import get_rabbitmq_rpc_server
9-
from . import _async_jobs, _data_export
9+
from . import _async_jobs, _data_export, _paths
1010

1111
_logger = logging.getLogger(__name__)
1212

1313

14-
ROUTERS: list[RPCRouter] = [_data_export.router, _async_jobs.router]
14+
ROUTERS: list[RPCRouter] = [
15+
_async_jobs.router,
16+
_data_export.router,
17+
_paths.router,
18+
]
1519

1620

1721
def setup_rpc_api_routes(app: FastAPI) -> None:

services/storage/src/simcore_service_storage/core/application.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from ..dsm import setup_dsm
3434
from ..dsm_cleaner import setup_dsm_cleaner
3535
from ..exceptions.handlers import set_exception_handlers
36+
from ..modules.celery import setup_celery_client
3637
from ..modules.db import setup_db
3738
from ..modules.long_running_tasks import setup_rest_api_long_running_tasks_for_uploads
3839
from ..modules.rabbitmq import setup as setup_rabbitmq
@@ -53,7 +54,7 @@
5354
_logger = logging.getLogger(__name__)
5455

5556

56-
def create_app(settings: ApplicationSettings) -> FastAPI:
57+
def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901
5758
# keep mostly quiet noisy loggers
5859
quiet_level: int = max(
5960
min(logging.root.level + _LOG_LEVEL_STEP, logging.CRITICAL), logging.WARNING
@@ -86,6 +87,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
8687
setup_rabbitmq(app)
8788
if not settings.STORAGE_WORKER_MODE:
8889
setup_rpc_api_routes(app)
90+
setup_celery_client(app)
8991
setup_rest_api_long_running_tasks_for_uploads(app)
9092
setup_rest_api_routes(app, API_VTAG)
9193
set_exception_handlers(app)

services/storage/src/simcore_service_storage/main.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from servicelib.logging_utils import config_all_loggers
66
from simcore_service_storage.core.application import create_app
77
from simcore_service_storage.core.settings import ApplicationSettings
8-
from simcore_service_storage.modules.celery import setup_celery
98

109
_settings = ApplicationSettings.create_from_envs()
1110

@@ -20,7 +19,4 @@
2019

2120
_logger = logging.getLogger(__name__)
2221

23-
fastapi_app = create_app(_settings)
24-
setup_celery(fastapi_app)
25-
26-
app = fastapi_app
22+
app = create_app(_settings)

0 commit comments

Comments
 (0)