Skip to content

✨Storage: new paths entrypoint with pagination #7200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions api/specs/web-server/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
# pylint: disable=too-many-arguments


from typing import TypeAlias
from typing import Annotated, TypeAlias
from uuid import UUID

from fastapi import APIRouter, Query, status
from fastapi import APIRouter, Depends, Query, status
from fastapi_pagination.cursor import CursorPage
from models_library.api_schemas_storage.storage_schemas import (
FileLocation,
FileMetaDataGet,
Expand All @@ -20,9 +21,11 @@
)
from models_library.api_schemas_webserver.storage import (
DataExportPost,
ListPathsQueryParams,
StorageAsyncJobGet,
StorageAsyncJobResult,
StorageAsyncJobStatus,
StorageLocationPathParams,
)
from models_library.generics import Envelope
from models_library.projects_nodes_io import LocationID
Expand Down Expand Up @@ -53,12 +56,25 @@ async def list_storage_locations():
"""Returns the list of available storage locations"""


@router.get(
"/storage/locations/{location_id}/paths",
response_model=CursorPage[FileMetaDataGet],
)
async def list_storage_paths(
_path: Annotated[StorageLocationPathParams, Depends()],
_query: Annotated[ListPathsQueryParams, Depends()],
):
"""Lists the files/directories in WorkingDirectory"""


@router.get(
"/storage/locations/{location_id}/datasets",
response_model=Envelope[list[DatasetMetaData]],
description="Get datasets metadata",
)
async def list_datasets_metadata(location_id: LocationID):
async def list_datasets_metadata(
_path: Annotated[StorageLocationPathParams, Depends()],
):
"""returns all the top level datasets a user has access to"""


Expand All @@ -68,7 +84,7 @@ async def list_datasets_metadata(location_id: LocationID):
description="Get datasets metadata",
)
async def get_files_metadata(
location_id: LocationID,
_path: Annotated[StorageLocationPathParams, Depends()],
uuid_filter: str = "",
expand_dirs: bool = Query(
True,
Expand Down
1 change: 1 addition & 0 deletions api/specs/web-server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
--constraint ../../../requirements/constraints.txt

fastapi
fastapi-pagination
jsonref
pydantic
pydantic-extra-types
Expand Down
111 changes: 106 additions & 5 deletions packages/aws-library/src/aws_library/s3/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,28 @@
from settings_library.s3 import S3Settings
from types_aiobotocore_s3 import S3Client
from types_aiobotocore_s3.literals import BucketLocationConstraintType
from types_aiobotocore_s3.type_defs import ObjectIdentifierTypeDef
from types_aiobotocore_s3.type_defs import (
ListObjectsV2RequestRequestTypeDef,
ObjectIdentifierTypeDef,
)

from ._constants import MULTIPART_COPY_THRESHOLD, MULTIPART_UPLOADS_MIN_TOTAL_SIZE
from ._constants import (
MULTIPART_COPY_THRESHOLD,
MULTIPART_UPLOADS_MIN_TOTAL_SIZE,
S3_OBJECT_DELIMITER,
)
from ._error_handler import s3_exception_handler, s3_exception_handler_async_gen
from ._errors import S3DestinationNotEmptyError, S3KeyNotFoundError
from ._models import (
MultiPartUploadLinks,
PathCursor,
S3DirectoryMetaData,
S3MetaData,
S3ObjectKey,
S3ObjectPrefix,
UploadID,
)
from ._utils import compute_num_file_chunks
from ._utils import compute_num_file_chunks, create_final_prefix

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -167,7 +176,99 @@ async def get_directory_metadata(
size = 0
async for s3_object in self._list_all_objects(bucket=bucket, prefix=prefix):
size += s3_object.size
return S3DirectoryMetaData(size=size)
return S3DirectoryMetaData(prefix=S3ObjectPrefix(prefix), size=ByteSize(size))

@s3_exception_handler(_logger)
async def count_objects(
self,
*,
bucket: S3BucketName,
prefix: S3ObjectPrefix | None,
start_after: S3ObjectKey | None,
is_partial_prefix: bool = False,
use_delimiter: bool = True,
) -> int:
"""returns the number of entries in the bucket, defined
by prefix and start_after same as list_objects
"""
paginator = self._client.get_paginator("list_objects_v2")
total_count = 0
async for page in paginator.paginate(
Bucket=bucket,
Prefix=create_final_prefix(prefix, is_partial_prefix=is_partial_prefix),
StartAfter=start_after or "",
Delimiter=S3_OBJECT_DELIMITER if use_delimiter else "",
):
total_count += page.get("KeyCount", 0)
return total_count

@s3_exception_handler(_logger)
async def list_objects(
self,
*,
bucket: S3BucketName,
prefix: S3ObjectPrefix | None,
start_after: S3ObjectKey | None,
limit: int = _MAX_ITEMS_PER_PAGE,
next_cursor: PathCursor | None = None,
is_partial_prefix: bool = False,
) -> tuple[list[S3MetaData | S3DirectoryMetaData], PathCursor | None]:
"""returns a number of entries in the bucket, defined by limit
the entries are sorted alphabetically by key. If a cursor is returned
then the client can call the function again with the cursor to get the
next entries.

the first entry is defined by start_after
if start_after is None, the first entry is the first one in the bucket
if prefix is not None, only entries with the given prefix are returned
if prefix is None, all entries in the bucket are returned
if next_cursor is set, then the call will return the next entries after the cursor
if is_partial_prefix is set then the prefix is not auto-delimited
(if False equivalent to `ls /home/user/`
if True equivalent to `ls /home/user*`)
limit must be >= 1 and <= _AWS_MAX_ITEMS_PER_PAGE

Raises:
ValueError: in case of invalid limit
"""
if limit < 1:
msg = "num_objects must be >= 1"
raise ValueError(msg)
if limit > _AWS_MAX_ITEMS_PER_PAGE:
msg = f"num_objects must be <= {_AWS_MAX_ITEMS_PER_PAGE}"
raise ValueError(msg)

list_config: ListObjectsV2RequestRequestTypeDef = {
"Bucket": bucket,
"Prefix": create_final_prefix(prefix, is_partial_prefix=is_partial_prefix),
"MaxKeys": limit,
"Delimiter": S3_OBJECT_DELIMITER,
}
if start_after:
list_config["StartAfter"] = start_after
if next_cursor:
list_config["ContinuationToken"] = next_cursor
listed_objects = await self._client.list_objects_v2(**list_config)
found_objects: list[S3MetaData | S3DirectoryMetaData] = []
if "CommonPrefixes" in listed_objects:
# we have folders here
list_subfolders = listed_objects["CommonPrefixes"]
found_objects.extend(
S3DirectoryMetaData.model_construct(
prefix=S3ObjectPrefix(subfolder["Prefix"], size=None)
)
for subfolder in list_subfolders
if "Prefix" in subfolder
)
if "Contents" in listed_objects:
found_objects.extend(
S3MetaData.from_botocore_list_objects(obj)
for obj in listed_objects["Contents"]
)
next_cursor = None
if listed_objects["IsTruncated"]:
next_cursor = listed_objects["NextContinuationToken"]
return found_objects, next_cursor

@s3_exception_handler_async_gen(_logger)
async def list_objects_paginated(
Expand Down Expand Up @@ -459,7 +560,7 @@ async def copy_objects_recursively(
dst_metadata = await self.get_directory_metadata(
bucket=bucket, prefix=dst_prefix
)
if dst_metadata.size > 0:
if dst_metadata.size and dst_metadata.size > 0:
raise S3DestinationNotEmptyError(dst_prefix=dst_prefix)
await limited_gather(
*[
Expand Down
1 change: 1 addition & 0 deletions packages/aws-library/src/aws_library/s3/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@

PRESIGNED_LINK_MAX_SIZE: Final[ByteSize] = TypeAdapter(ByteSize).validate_python("5GiB")
S3_MAX_FILE_SIZE: Final[ByteSize] = TypeAdapter(ByteSize).validate_python("5TiB")
S3_OBJECT_DELIMITER: Final[str] = "/"
43 changes: 22 additions & 21 deletions packages/aws-library/src/aws_library/s3/_models.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
import datetime
from dataclasses import dataclass
from typing import TypeAlias
from pathlib import Path
from typing import TypeAlias, cast

from models_library.api_schemas_storage.storage_schemas import ETag
from models_library.basic_types import SHA256Str
from pydantic import AnyUrl, BaseModel, ByteSize
from pydantic import AnyUrl, BaseModel, ByteSize, Field
from types_aiobotocore_s3.type_defs import HeadObjectOutputTypeDef, ObjectTypeDef

S3ObjectKey: TypeAlias = str
S3ObjectPrefix: TypeAlias = Path
UploadID: TypeAlias = str
PathCursor: TypeAlias = str


@dataclass(frozen=True, slots=True, kw_only=True)
class S3MetaData:
class S3MetaData(BaseModel, frozen=True):
object_key: S3ObjectKey
last_modified: datetime.datetime
e_tag: ETag
sha256_checksum: SHA256Str | None
size: int
size: ByteSize

@staticmethod
def from_botocore_head_object(
Expand All @@ -27,12 +28,8 @@ def from_botocore_head_object(
object_key=object_key,
last_modified=obj["LastModified"],
e_tag=obj["ETag"].strip('"'),
sha256_checksum=(
SHA256Str(obj.get("ChecksumSHA256"))
if obj.get("ChecksumSHA256")
else None
),
size=obj["ContentLength"],
sha256_checksum=obj.get("ChecksumSHA256"),
size=ByteSize(obj["ContentLength"]),
)

@staticmethod
Expand All @@ -47,18 +44,22 @@ def from_botocore_list_objects(
object_key=obj["Key"],
last_modified=obj["LastModified"],
e_tag=obj["ETag"].strip('"'),
sha256_checksum=(
SHA256Str(obj.get("ChecksumSHA256"))
if obj.get("ChecksumSHA256")
else None
),
size=obj["Size"],
sha256_checksum=cast(SHA256Str | None, obj.get("ChecksumSHA256")),
size=ByteSize(obj["Size"]),
)

def as_path(self) -> Path:
return Path(self.object_key)

@dataclass(frozen=True)
class S3DirectoryMetaData:
size: int

class S3DirectoryMetaData(BaseModel, frozen=True):
prefix: S3ObjectPrefix
size: ByteSize | None = Field(
..., description="Size of the directory if computed, None if unknown"
)

def as_path(self) -> Path:
return self.prefix


class MultiPartUploadLinks(BaseModel):
Expand Down
15 changes: 15 additions & 0 deletions packages/aws-library/src/aws_library/s3/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from pydantic import ByteSize, TypeAdapter

from ._constants import S3_OBJECT_DELIMITER
from ._models import S3ObjectPrefix

_MULTIPART_MAX_NUMBER_OF_PARTS: Final[int] = 10000

# this is artifically defined, if possible we keep a maximum number of requests for parallel
Expand Down Expand Up @@ -34,3 +37,15 @@ def compute_num_file_chunks(file_size: ByteSize) -> tuple[int, ByteSize]:
raise ValueError(
msg,
)


def create_final_prefix(
prefix: S3ObjectPrefix | None, *, is_partial_prefix: bool
) -> str:
final_prefix = f"{prefix}" if prefix else ""
if prefix and not is_partial_prefix:
final_prefix = (
f"{final_prefix.rstrip(S3_OBJECT_DELIMITER)}{S3_OBJECT_DELIMITER}"
)

return final_prefix
Loading
Loading