Skip to content

1848 add permission rights to async jobs #7262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 90 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
156a19a
factor out tasks part in storage rpc api
bisgaard-itis Feb 14, 2025
64c137d
use 'async_jobs' terminology and refactor rpc client for reusability
bisgaard-itis Feb 14, 2025
f6e73e3
further refactoring
bisgaard-itis Feb 14, 2025
50fa599
make async job rpc client more reusable
bisgaard-itis Feb 14, 2025
1ce7158
tasks -> async jobs
bisgaard-itis Feb 15, 2025
1229e92
tasks -> async_job
bisgaard-itis Feb 15, 2025
ce064ce
start adding data-export endpoints in webserver
bisgaard-itis Feb 15, 2025
223631e
further renaming
bisgaard-itis Feb 16, 2025
0bd94d6
create openapi-specs make target for webserver to ensure github workf…
bisgaard-itis Feb 16, 2025
b43b22d
add webserver endpoint for triggering data export
bisgaard-itis Feb 16, 2025
9795838
add webserver endpoint for getting async job status
bisgaard-itis Feb 16, 2025
224dfe2
minor fix
bisgaard-itis Feb 17, 2025
3fb022e
add initial test
bisgaard-itis Feb 17, 2025
ac06db4
pass login and permission in test
bisgaard-itis Feb 17, 2025
249967e
add get status test
bisgaard-itis Feb 17, 2025
8b154af
add location id in webserver endpoint
bisgaard-itis Feb 17, 2025
ac20937
task_id -> job_id
bisgaard-itis Feb 17, 2025
527e7a6
cleanup
bisgaard-itis Feb 17, 2025
999907e
add test for abort endpoint
bisgaard-itis Feb 17, 2025
520c043
add test for endpoint of getting result
bisgaard-itis Feb 17, 2025
1b42a8f
update webserver openapi specs
bisgaard-itis Feb 17, 2025
a649c71
start adding exceptions and propagating them
bisgaard-itis Feb 17, 2025
4ad6aac
add exception handling to endpoint for triggering data export
bisgaard-itis Feb 18, 2025
33b5ccf
remove 'data_export' from async jobs model lib section
bisgaard-itis Feb 18, 2025
661b3b2
add exception handling for remaining endpoints
bisgaard-itis Feb 18, 2025
6972cf5
merge master into 7197-add-zipping-endpoints-in-storage
bisgaard-itis Feb 18, 2025
e19eea7
update openapi specs of webserver
bisgaard-itis Feb 18, 2025
311ba29
add user id to data export endpoint
bisgaard-itis Feb 18, 2025
d614e7e
fix typecheck
bisgaard-itis Feb 18, 2025
db55e79
fix openapi spec test
bisgaard-itis Feb 18, 2025
8bd254d
merge master into 7197-add-zipping-endpoints-in-storage
bisgaard-itis Feb 18, 2025
1718df8
@sanderegg fix absolute imports
bisgaard-itis Feb 18, 2025
7faf55f
@sanderegg @pcrespov remove 'rpc' from classnames
bisgaard-itis Feb 18, 2025
434602f
task_progress -> progress
bisgaard-itis Feb 18, 2025
fd777ab
rename storage schemas exposed via webserver
bisgaard-itis Feb 18, 2025
400a6c1
use ProgressReport
bisgaard-itis Feb 18, 2025
3383265
restructuring
bisgaard-itis Feb 18, 2025
b0f8a7f
services/webserver api version: 0.58.0 → 0.59.0
bisgaard-itis Feb 18, 2025
8acc41c
storage/_handlers.py -> storage/_rest.py
bisgaard-itis Feb 18, 2025
df04839
merge master into 7197-add-zipping-endpoints-in-storage
bisgaard-itis Feb 18, 2025
c3758bd
update opena api specs
bisgaard-itis Feb 18, 2025
ccff359
@pcrespov streamline to_rpc_schema methods
bisgaard-itis Feb 19, 2025
09a3650
merge master into 7197-add-zipping-endpoints-in-storage
bisgaard-itis Feb 19, 2025
b01e479
add rpc method for getting jobs associated with user
bisgaard-itis Feb 19, 2025
eec16d8
add webserver rest endpoint for getting user jobs
bisgaard-itis Feb 19, 2025
432dbb3
add test of rest endpoint for getting jobs associated with user
bisgaard-itis Feb 19, 2025
8e6162d
minor cleanup
bisgaard-itis Feb 19, 2025
4dfe2d0
merge master into 7197-add-zipping-endpoints-in-storage
bisgaard-itis Feb 19, 2025
f817a52
move thing around
bisgaard-itis Feb 19, 2025
6a19215
make pylint happy
bisgaard-itis Feb 19, 2025
8356158
update openapi specs
bisgaard-itis Feb 19, 2025
da0f6ea
fix webserver mocks
bisgaard-itis Feb 19, 2025
63bc45c
cleanup
bisgaard-itis Feb 19, 2025
8821e57
merge master into 7197-add-zipping-endpoints-in-storage
bisgaard-itis Feb 19, 2025
e5ff440
make pylint happy
bisgaard-itis Feb 19, 2025
f0848a4
export get async jobs method in webserver openapi specs
bisgaard-itis Feb 19, 2025
98da41b
Update api/specs/web-server/_storage.py
bisgaard-itis Feb 19, 2025
83a8941
Update api/specs/web-server/_storage.py
bisgaard-itis Feb 19, 2025
bc49fb0
Update api/specs/web-server/_storage.py
bisgaard-itis Feb 19, 2025
e1488a8
Update api/specs/web-server/_storage.py
bisgaard-itis Feb 19, 2025
247d64f
Update api/specs/web-server/_storage.py
bisgaard-itis Feb 19, 2025
61d0683
use AsyncJobId consistently @GitHK
bisgaard-itis Feb 19, 2025
6132f82
update openapi specs
bisgaard-itis Feb 19, 2025
e50c1aa
make list jobs endpoint generic
bisgaard-itis Feb 19, 2025
57e2576
propagate changes to webserver
bisgaard-itis Feb 19, 2025
25265cb
make pylint happy
bisgaard-itis Feb 19, 2025
dd3fdf9
make submit job endpoint generic @GitHK
bisgaard-itis Feb 20, 2025
fe8f33e
factor out RequestContext and forward product name to storage @matusd…
bisgaard-itis Feb 20, 2025
666ed11
merge master into 7197-add-zipping-endpoints-in-storage
bisgaard-itis Feb 20, 2025
38030ab
fix test in storage
bisgaard-itis Feb 20, 2025
2be7088
merge master into 7197-add-zipping-endpoints-in-storage
bisgaard-itis Feb 20, 2025
c942833
make AsyncJobId a string instead of a uuid
bisgaard-itis Feb 21, 2025
fa156e8
start adding permission check
bisgaard-itis Feb 21, 2025
3d7ae6b
start implementation
bisgaard-itis Feb 21, 2025
e217c90
merge master into 1848-add-permission-rights-to-async-jobs
bisgaard-itis Feb 21, 2025
72ed39c
alternative implementation for simcore
bisgaard-itis Feb 21, 2025
0bc1c16
add check for datcore
bisgaard-itis Feb 21, 2025
38d8385
improve implementation
bisgaard-itis Feb 21, 2025
5ac62fc
finish implement test
bisgaard-itis Feb 21, 2025
1e2a1e2
Merge branch 'master' into 1848-add-permission-rights-to-async-jobs
bisgaard-itis Feb 21, 2025
089d2bf
minor fix
bisgaard-itis Feb 21, 2025
0587b14
merge master into 1848-add-permission-rights-to-async-jobs
bisgaard-itis Feb 26, 2025
fd7298c
add test which also checks folder
bisgaard-itis Feb 26, 2025
42e4318
add separate method for hecking read access of file
bisgaard-itis Feb 26, 2025
97f85ef
merge master into 1848-add-permission-rights-to-async-jobs
bisgaard-itis Feb 26, 2025
a81f959
add job_id_data
bisgaard-itis Feb 26, 2025
0c61038
fix webserver tests
bisgaard-itis Feb 26, 2025
6403313
update openapi specs
bisgaard-itis Feb 26, 2025
f829c5f
merge master into 1848-add-permission-rights-to-async-jobs
bisgaard-itis Feb 26, 2025
0910137
Merge branch 'master' into 1848-add-permission-rights-to-async-jobs
giancarloromeo Feb 27, 2025
fcc1e32
fix pylint
bisgaard-itis Feb 27, 2025
f93e3d0
Merge branch '1848-add-permission-rights-to-async-jobs' of github.com…
bisgaard-itis Feb 27, 2025
af76e39
str -> uuid and make user_id and product_name optional
bisgaard-itis Feb 27, 2025
f85fb59
update openapi specs
bisgaard-itis Feb 27, 2025
f393419
dont allow userid and productname to be none
bisgaard-itis Feb 27, 2025
7bfc431
fix imports in _data_export.py
bisgaard-itis Feb 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from datetime import datetime
from typing import Any, TypeAlias
from uuid import UUID

from models_library.users import UserID
from pydantic import BaseModel, model_validator
from typing_extensions import Self

from ..progress_bar import ProgressReport

AsyncJobId: TypeAlias = UUID
AsyncJobId: TypeAlias = str


class AsyncJobStatus(BaseModel):
Expand Down Expand Up @@ -36,16 +35,15 @@ class AsyncJobResult(BaseModel):

class AsyncJobGet(BaseModel):
job_id: AsyncJobId
job_name: str


class AsyncJobAbort(BaseModel):
result: bool
job_id: AsyncJobId


class AsyncJobAccessData(BaseModel):
class AsyncJobNameData(BaseModel):
"""Data for controlling access to an async job"""

user_id: UserID | None
user_id: UserID
product_name: str
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
# pylint: disable=R6301
from pathlib import Path

from common_library.errors_classes import OsparcErrorMixin
from models_library.projects_nodes_io import LocationID
from models_library.users import UserID
from models_library.projects_nodes_io import LocationID, StorageFileID
from pydantic import BaseModel, Field


class DataExportTaskStartInput(BaseModel):
user_id: UserID
product_name: str
location_id: LocationID
paths: list[Path] = Field(..., min_length=1)
file_and_folder_ids: list[StorageFileID] = Field(..., min_length=1)


### Exceptions


class StorageRpcError(OsparcErrorMixin, RuntimeError):
class StorageRpcBaseError(OsparcErrorMixin, RuntimeError):
pass


class InvalidFileIdentifierError(StorageRpcError):
class InvalidLocationIdError(StorageRpcBaseError):
msg_template: str = "Invalid location_id {location_id}"


class InvalidFileIdentifierError(StorageRpcBaseError):
msg_template: str = "Could not find the file {file_id}"


class AccessRightError(StorageRpcError):
msg_template: str = "User {user_id} does not have access to file {file_id}"
class AccessRightError(StorageRpcBaseError):
msg_template: str = "User {user_id} does not have access to file {file_id} with location {location_id}"


class DataExportError(StorageRpcError):
class DataExportError(StorageRpcBaseError):
msg_template: str = "Could not complete data export job with id {job_id}"
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
)
from ..api_schemas_storage.data_export_async_jobs import DataExportTaskStartInput
from ..progress_bar import ProgressReport
from ..projects_nodes_io import LocationID
from ..projects_nodes_io import LocationID, StorageFileID
from ..rest_pagination import CursorQueryParameters
from ..users import UserID
from ._base import InputSchema, OutputSchema


Expand All @@ -27,15 +26,11 @@ class ListPathsQueryParams(InputSchema, CursorQueryParameters):


class DataExportPost(InputSchema):
paths: list[Path]
paths: list[StorageFileID]

def to_rpc_schema(
self, user_id: UserID, product_name: str, location_id: LocationID
) -> DataExportTaskStartInput:
def to_rpc_schema(self, location_id: LocationID) -> DataExportTaskStartInput:
return DataExportTaskStartInput(
paths=self.paths,
user_id=user_id,
product_name=product_name,
file_and_folder_ids=self.paths,
location_id=location_id,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from models_library.api_schemas_rpc_async_jobs.async_jobs import (
AsyncJobAbort,
AsyncJobAccessData,
AsyncJobGet,
AsyncJobId,
AsyncJobNameData,
AsyncJobResult,
AsyncJobStatus,
)
Expand All @@ -23,13 +23,13 @@ async def abort(
*,
rpc_namespace: RPCNamespace,
job_id: AsyncJobId,
access_data: AsyncJobAccessData | None
job_id_data: AsyncJobNameData
) -> AsyncJobAbort:
result = await rabbitmq_rpc_client.request(
rpc_namespace,
_RPC_METHOD_NAME_ADAPTER.validate_python("abort"),
job_id=job_id,
access_data=access_data,
job_id_data=job_id_data,
timeout_s=_DEFAULT_TIMEOUT_S,
)
assert isinstance(result, AsyncJobAbort)
Expand All @@ -41,13 +41,13 @@ async def get_status(
*,
rpc_namespace: RPCNamespace,
job_id: AsyncJobId,
access_data: AsyncJobAccessData | None
job_id_data: AsyncJobNameData
) -> AsyncJobStatus:
result = await rabbitmq_rpc_client.request(
rpc_namespace,
_RPC_METHOD_NAME_ADAPTER.validate_python("get_status"),
job_id=job_id,
access_data=access_data,
job_id_data=job_id_data,
timeout_s=_DEFAULT_TIMEOUT_S,
)
assert isinstance(result, AsyncJobStatus)
Expand All @@ -59,26 +59,31 @@ async def get_result(
*,
rpc_namespace: RPCNamespace,
job_id: AsyncJobId,
access_data: AsyncJobAccessData | None
job_id_data: AsyncJobNameData
) -> AsyncJobResult:
result = await rabbitmq_rpc_client.request(
rpc_namespace,
_RPC_METHOD_NAME_ADAPTER.validate_python("get_result"),
job_id=job_id,
access_data=access_data,
job_id_data=job_id_data,
timeout_s=_DEFAULT_TIMEOUT_S,
)
assert isinstance(result, AsyncJobResult)
return result


async def list_jobs(
rabbitmq_rpc_client: RabbitMQRPCClient, *, rpc_namespace: RPCNamespace, filter_: str
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
rpc_namespace: RPCNamespace,
filter_: str,
job_id_data: AsyncJobNameData
) -> list[AsyncJobGet]:
result: list[AsyncJobGet] = await rabbitmq_rpc_client.request(
rpc_namespace,
_RPC_METHOD_NAME_ADAPTER.validate_python("list_jobs"),
filter_=filter_,
job_id_data=job_id_data,
timeout_s=_DEFAULT_TIMEOUT_S,
)
return result
Expand All @@ -88,12 +93,14 @@ async def submit_job(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
rpc_namespace: RPCNamespace,
job_name: str,
method_name: str,
job_id_data: AsyncJobNameData,
**kwargs
) -> AsyncJobGet:
result = await rabbitmq_rpc_client.request(
rpc_namespace,
_RPC_METHOD_NAME_ADAPTER.validate_python(job_name),
_RPC_METHOD_NAME_ADAPTER.validate_python(method_name),
job_id_data=job_id_data,
**kwargs,
timeout_s=_DEFAULT_TIMEOUT_S,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from fastapi import FastAPI
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
AsyncJobAbort,
AsyncJobAccessData,
AsyncJobGet,
AsyncJobId,
AsyncJobNameData,
AsyncJobResult,
AsyncJobStatus,
)
Expand All @@ -23,17 +23,19 @@

@router.expose()
async def abort(
app: FastAPI, job_id: AsyncJobId, access_data: AsyncJobAccessData | None
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
) -> AsyncJobAbort:
assert app # nosec
assert job_id_data # nosec
return AsyncJobAbort(result=True, job_id=job_id)


@router.expose(reraise_if_error_type=(StatusError,))
async def get_status(
app: FastAPI, job_id: AsyncJobId, access_data: AsyncJobAccessData | None
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
) -> AsyncJobStatus:
assert app # nosec
assert job_id_data # nosec
progress_report = ProgressReport(actual_value=0.5, total=1.0, attempt=1)
return AsyncJobStatus(
job_id=job_id,
Expand All @@ -46,14 +48,17 @@ async def get_status(

@router.expose(reraise_if_error_type=(ResultError,))
async def get_result(
app: FastAPI, job_id: AsyncJobId, access_data: AsyncJobAccessData | None
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
) -> AsyncJobResult:
assert app # nosec
assert job_id # nosec
assert job_id_data # nosec
return AsyncJobResult(result="Here's your result.", error=None)


@router.expose()
async def list_jobs(app: FastAPI, filter_: str) -> list[AsyncJobGet]:
async def list_jobs(
app: FastAPI, filter_: str, job_id_data: AsyncJobNameData
) -> list[AsyncJobGet]:
assert app # nosec
return [AsyncJobGet(job_id=AsyncJobId(f"{uuid4()}"), job_name="myjob")]
return [AsyncJobGet(job_id=AsyncJobId(f"{uuid4()}"))]
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from uuid import uuid4

from fastapi import FastAPI
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobGet, AsyncJobId
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
AsyncJobGet,
AsyncJobId,
AsyncJobNameData,
)
from models_library.api_schemas_storage.data_export_async_jobs import (
AccessRightError,
DataExportError,
Expand All @@ -10,6 +14,10 @@
)
from servicelib.rabbitmq import RPCRouter

from ...dsm import DatCoreDataManager, get_dsm_provider
from ...modules.datcore_adapter.datcore_adapter import DatcoreAdapterError
from ...simcore_s3_dsm import FileAccessRightError, SimcoreS3DataManager

router = RPCRouter()


Expand All @@ -21,10 +29,28 @@
)
)
async def start_data_export(
app: FastAPI, paths: DataExportTaskStartInput
app: FastAPI,
data_export_start: DataExportTaskStartInput,
job_id_data: AsyncJobNameData,
) -> AsyncJobGet:
assert app # nosec

dsm = get_dsm_provider(app).get(data_export_start.location_id)

try:
for _id in data_export_start.file_and_folder_ids:
if isinstance(dsm, DatCoreDataManager):
_ = await dsm.get_file(user_id=job_id_data.user_id, file_id=_id)
elif isinstance(dsm, SimcoreS3DataManager):
await dsm.can_read_file(user_id=job_id_data.user_id, file_id=_id)

except (FileAccessRightError, DatcoreAdapterError) as err:
raise AccessRightError(
user_id=job_id_data.user_id,
file_id=_id,
location_id=data_export_start.location_id,
) from err

return AsyncJobGet(
job_id=AsyncJobId(f"{uuid4()}"),
job_name=", ".join(str(p) for p in paths.paths),
)
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ async def get_file(self, user_id: UserID, file_id: StorageFileID) -> FileMetaDat
fmd = await self._update_database_from_storage(fmd)
return convert_db_to_model(fmd)

async def can_read_file(self, user_id: UserID, file_id: StorageFileID):
async with self.engine.connect() as conn:
can = await get_file_access_rights(conn, int(user_id), file_id)
if not can.read:
raise FileAccessRightError(access_right="read", file_id=file_id)

async def create_file_upload_links(
self,
user_id: UserID,
Expand Down
40 changes: 39 additions & 1 deletion services/storage/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@
from simcore_service_storage.core.application import create_app
from simcore_service_storage.core.settings import ApplicationSettings
from simcore_service_storage.dsm import get_dsm_provider
from simcore_service_storage.models import S3BucketName
from simcore_service_storage.models import FileMetaData, FileMetaDataAtDB, S3BucketName
from simcore_service_storage.modules.long_running_tasks import (
get_completed_upload_tasks,
)
from simcore_service_storage.modules.s3 import get_s3_client
from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager
from sqlalchemy import literal_column
from sqlalchemy.ext.asyncio import AsyncEngine
from tenacity.asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
Expand Down Expand Up @@ -849,3 +850,40 @@ async def with_random_project_with_files(
faker: Faker,
) -> tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, FileIDDict]],]:
return await random_project_with_files(project_params)


@pytest.fixture()
async def output_file(
user_id: UserID, project_id: str, sqlalchemy_async_engine: AsyncEngine, faker: Faker
) -> AsyncIterator[FileMetaData]:
node_id = "fd6f9737-1988-341b-b4ac-0614b646fa82"

# pylint: disable=no-value-for-parameter

file = FileMetaData.from_simcore_node(
user_id=user_id,
file_id=f"{project_id}/{node_id}/filename.txt",
bucket=TypeAdapter(S3BucketName).validate_python("master-simcore"),
location_id=SimcoreS3DataManager.get_location_id(),
location_name=SimcoreS3DataManager.get_location_name(),
sha256_checksum=faker.sha256(),
)
file.entity_tag = "df9d868b94e53d18009066ca5cd90e9f"
file.file_size = ByteSize(12)
file.user_id = user_id
async with sqlalchemy_async_engine.begin() as conn:
stmt = (
file_meta_data.insert()
.values(jsonable_encoder(FileMetaDataAtDB.model_validate(file)))
.returning(literal_column("*"))
)
result = await conn.execute(stmt)
row = result.one()
assert row

yield file

async with sqlalchemy_async_engine.begin() as conn:
result = await conn.execute(
file_meta_data.delete().where(file_meta_data.c.file_id == row.file_id)
)
Loading
Loading