Skip to content

Commit 387826f

Browse files
authored
✨Storage: new paths entrypoint with pagination (#7200)
1 parent 7dd1e0a commit 387826f

File tree

64 files changed

+3768
-1324
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+3768
-1324
lines changed

api/specs/web-server/_storage.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
# pylint: disable=too-many-arguments
55

66

7-
from typing import TypeAlias
7+
from typing import Annotated, TypeAlias
88
from uuid import UUID
99

10-
from fastapi import APIRouter, Query, status
10+
from fastapi import APIRouter, Depends, Query, status
11+
from fastapi_pagination.cursor import CursorPage
1112
from models_library.api_schemas_storage.storage_schemas import (
1213
FileLocation,
1314
FileMetaDataGet,
@@ -20,9 +21,11 @@
2021
)
2122
from models_library.api_schemas_webserver.storage import (
2223
DataExportPost,
24+
ListPathsQueryParams,
2325
StorageAsyncJobGet,
2426
StorageAsyncJobResult,
2527
StorageAsyncJobStatus,
28+
StorageLocationPathParams,
2629
)
2730
from models_library.generics import Envelope
2831
from models_library.projects_nodes_io import LocationID
@@ -53,12 +56,25 @@ async def list_storage_locations():
5356
"""Returns the list of available storage locations"""
5457

5558

59+
@router.get(
60+
"/storage/locations/{location_id}/paths",
61+
response_model=CursorPage[FileMetaDataGet],
62+
)
63+
async def list_storage_paths(
64+
_path: Annotated[StorageLocationPathParams, Depends()],
65+
_query: Annotated[ListPathsQueryParams, Depends()],
66+
):
67+
"""Lists the files/directories in WorkingDirectory"""
68+
69+
5670
@router.get(
5771
"/storage/locations/{location_id}/datasets",
5872
response_model=Envelope[list[DatasetMetaData]],
5973
description="Get datasets metadata",
6074
)
61-
async def list_datasets_metadata(location_id: LocationID):
75+
async def list_datasets_metadata(
76+
_path: Annotated[StorageLocationPathParams, Depends()],
77+
):
6278
"""returns all the top level datasets a user has access to"""
6379

6480

@@ -68,7 +84,7 @@ async def list_datasets_metadata(location_id: LocationID):
6884
description="Get datasets metadata",
6985
)
7086
async def get_files_metadata(
71-
location_id: LocationID,
87+
_path: Annotated[StorageLocationPathParams, Depends()],
7288
uuid_filter: str = "",
7389
expand_dirs: bool = Query(
7490
True,

api/specs/web-server/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
--constraint ../../../requirements/constraints.txt
44

55
fastapi
6+
fastapi-pagination
67
jsonref
78
pydantic
89
pydantic-extra-types

packages/aws-library/src/aws_library/s3/_client.py

Lines changed: 106 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,28 @@
2828
from settings_library.s3 import S3Settings
2929
from types_aiobotocore_s3 import S3Client
3030
from types_aiobotocore_s3.literals import BucketLocationConstraintType
31-
from types_aiobotocore_s3.type_defs import ObjectIdentifierTypeDef
31+
from types_aiobotocore_s3.type_defs import (
32+
ListObjectsV2RequestRequestTypeDef,
33+
ObjectIdentifierTypeDef,
34+
)
3235

33-
from ._constants import MULTIPART_COPY_THRESHOLD, MULTIPART_UPLOADS_MIN_TOTAL_SIZE
36+
from ._constants import (
37+
MULTIPART_COPY_THRESHOLD,
38+
MULTIPART_UPLOADS_MIN_TOTAL_SIZE,
39+
S3_OBJECT_DELIMITER,
40+
)
3441
from ._error_handler import s3_exception_handler, s3_exception_handler_async_gen
3542
from ._errors import S3DestinationNotEmptyError, S3KeyNotFoundError
3643
from ._models import (
3744
MultiPartUploadLinks,
45+
PathCursor,
3846
S3DirectoryMetaData,
3947
S3MetaData,
4048
S3ObjectKey,
49+
S3ObjectPrefix,
4150
UploadID,
4251
)
43-
from ._utils import compute_num_file_chunks
52+
from ._utils import compute_num_file_chunks, create_final_prefix
4453

4554
_logger = logging.getLogger(__name__)
4655

@@ -167,7 +176,99 @@ async def get_directory_metadata(
167176
size = 0
168177
async for s3_object in self._list_all_objects(bucket=bucket, prefix=prefix):
169178
size += s3_object.size
170-
return S3DirectoryMetaData(size=size)
179+
return S3DirectoryMetaData(prefix=S3ObjectPrefix(prefix), size=ByteSize(size))
180+
181+
@s3_exception_handler(_logger)
182+
async def count_objects(
183+
self,
184+
*,
185+
bucket: S3BucketName,
186+
prefix: S3ObjectPrefix | None,
187+
start_after: S3ObjectKey | None,
188+
is_partial_prefix: bool = False,
189+
use_delimiter: bool = True,
190+
) -> int:
191+
"""returns the number of entries in the bucket, defined
192+
by prefix and start_after same as list_objects
193+
"""
194+
paginator = self._client.get_paginator("list_objects_v2")
195+
total_count = 0
196+
async for page in paginator.paginate(
197+
Bucket=bucket,
198+
Prefix=create_final_prefix(prefix, is_partial_prefix=is_partial_prefix),
199+
StartAfter=start_after or "",
200+
Delimiter=S3_OBJECT_DELIMITER if use_delimiter else "",
201+
):
202+
total_count += page.get("KeyCount", 0)
203+
return total_count
204+
205+
@s3_exception_handler(_logger)
206+
async def list_objects(
207+
self,
208+
*,
209+
bucket: S3BucketName,
210+
prefix: S3ObjectPrefix | None,
211+
start_after: S3ObjectKey | None,
212+
limit: int = _MAX_ITEMS_PER_PAGE,
213+
next_cursor: PathCursor | None = None,
214+
is_partial_prefix: bool = False,
215+
) -> tuple[list[S3MetaData | S3DirectoryMetaData], PathCursor | None]:
216+
"""returns a number of entries in the bucket, defined by limit
217+
the entries are sorted alphabetically by key. If a cursor is returned
218+
then the client can call the function again with the cursor to get the
219+
next entries.
220+
221+
the first entry is defined by start_after
222+
if start_after is None, the first entry is the first one in the bucket
223+
if prefix is not None, only entries with the given prefix are returned
224+
if prefix is None, all entries in the bucket are returned
225+
if next_cursor is set, then the call will return the next entries after the cursor
226+
if is_partial_prefix is set then the prefix is not auto-delimited
227+
(if False equivalent to `ls /home/user/`
228+
if True equivalent to `ls /home/user*`)
229+
limit must be >= 1 and <= _AWS_MAX_ITEMS_PER_PAGE
230+
231+
Raises:
232+
ValueError: in case of invalid limit
233+
"""
234+
if limit < 1:
235+
msg = "num_objects must be >= 1"
236+
raise ValueError(msg)
237+
if limit > _AWS_MAX_ITEMS_PER_PAGE:
238+
msg = f"num_objects must be <= {_AWS_MAX_ITEMS_PER_PAGE}"
239+
raise ValueError(msg)
240+
241+
list_config: ListObjectsV2RequestRequestTypeDef = {
242+
"Bucket": bucket,
243+
"Prefix": create_final_prefix(prefix, is_partial_prefix=is_partial_prefix),
244+
"MaxKeys": limit,
245+
"Delimiter": S3_OBJECT_DELIMITER,
246+
}
247+
if start_after:
248+
list_config["StartAfter"] = start_after
249+
if next_cursor:
250+
list_config["ContinuationToken"] = next_cursor
251+
listed_objects = await self._client.list_objects_v2(**list_config)
252+
found_objects: list[S3MetaData | S3DirectoryMetaData] = []
253+
if "CommonPrefixes" in listed_objects:
254+
# we have folders here
255+
list_subfolders = listed_objects["CommonPrefixes"]
256+
found_objects.extend(
257+
S3DirectoryMetaData.model_construct(
258+
prefix=S3ObjectPrefix(subfolder["Prefix"], size=None)
259+
)
260+
for subfolder in list_subfolders
261+
if "Prefix" in subfolder
262+
)
263+
if "Contents" in listed_objects:
264+
found_objects.extend(
265+
S3MetaData.from_botocore_list_objects(obj)
266+
for obj in listed_objects["Contents"]
267+
)
268+
next_cursor = None
269+
if listed_objects["IsTruncated"]:
270+
next_cursor = listed_objects["NextContinuationToken"]
271+
return found_objects, next_cursor
171272

172273
@s3_exception_handler_async_gen(_logger)
173274
async def list_objects_paginated(
@@ -459,7 +560,7 @@ async def copy_objects_recursively(
459560
dst_metadata = await self.get_directory_metadata(
460561
bucket=bucket, prefix=dst_prefix
461562
)
462-
if dst_metadata.size > 0:
563+
if dst_metadata.size and dst_metadata.size > 0:
463564
raise S3DestinationNotEmptyError(dst_prefix=dst_prefix)
464565
await limited_gather(
465566
*[

packages/aws-library/src/aws_library/s3/_constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@
1212

1313
PRESIGNED_LINK_MAX_SIZE: Final[ByteSize] = TypeAdapter(ByteSize).validate_python("5GiB")
1414
S3_MAX_FILE_SIZE: Final[ByteSize] = TypeAdapter(ByteSize).validate_python("5TiB")
15+
S3_OBJECT_DELIMITER: Final[str] = "/"

packages/aws-library/src/aws_library/s3/_models.py

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,24 @@
11
import datetime
2-
from dataclasses import dataclass
3-
from typing import TypeAlias
2+
from pathlib import Path
3+
from typing import TypeAlias, cast
44

55
from models_library.api_schemas_storage.storage_schemas import ETag
66
from models_library.basic_types import SHA256Str
7-
from pydantic import AnyUrl, BaseModel, ByteSize
7+
from pydantic import AnyUrl, BaseModel, ByteSize, Field
88
from types_aiobotocore_s3.type_defs import HeadObjectOutputTypeDef, ObjectTypeDef
99

1010
S3ObjectKey: TypeAlias = str
11+
S3ObjectPrefix: TypeAlias = Path
1112
UploadID: TypeAlias = str
13+
PathCursor: TypeAlias = str
1214

1315

14-
@dataclass(frozen=True, slots=True, kw_only=True)
15-
class S3MetaData:
16+
class S3MetaData(BaseModel, frozen=True):
1617
object_key: S3ObjectKey
1718
last_modified: datetime.datetime
1819
e_tag: ETag
1920
sha256_checksum: SHA256Str | None
20-
size: int
21+
size: ByteSize
2122

2223
@staticmethod
2324
def from_botocore_head_object(
@@ -27,12 +28,8 @@ def from_botocore_head_object(
2728
object_key=object_key,
2829
last_modified=obj["LastModified"],
2930
e_tag=obj["ETag"].strip('"'),
30-
sha256_checksum=(
31-
SHA256Str(obj.get("ChecksumSHA256"))
32-
if obj.get("ChecksumSHA256")
33-
else None
34-
),
35-
size=obj["ContentLength"],
31+
sha256_checksum=obj.get("ChecksumSHA256"),
32+
size=ByteSize(obj["ContentLength"]),
3633
)
3734

3835
@staticmethod
@@ -47,18 +44,22 @@ def from_botocore_list_objects(
4744
object_key=obj["Key"],
4845
last_modified=obj["LastModified"],
4946
e_tag=obj["ETag"].strip('"'),
50-
sha256_checksum=(
51-
SHA256Str(obj.get("ChecksumSHA256"))
52-
if obj.get("ChecksumSHA256")
53-
else None
54-
),
55-
size=obj["Size"],
47+
sha256_checksum=cast(SHA256Str | None, obj.get("ChecksumSHA256")),
48+
size=ByteSize(obj["Size"]),
5649
)
5750

51+
def as_path(self) -> Path:
52+
return Path(self.object_key)
5853

59-
@dataclass(frozen=True)
60-
class S3DirectoryMetaData:
61-
size: int
54+
55+
class S3DirectoryMetaData(BaseModel, frozen=True):
56+
prefix: S3ObjectPrefix
57+
size: ByteSize | None = Field(
58+
..., description="Size of the directory if computed, None if unknown"
59+
)
60+
61+
def as_path(self) -> Path:
62+
return self.prefix
6263

6364

6465
class MultiPartUploadLinks(BaseModel):

packages/aws-library/src/aws_library/s3/_utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
from pydantic import ByteSize, TypeAdapter
44

5+
from ._constants import S3_OBJECT_DELIMITER
6+
from ._models import S3ObjectPrefix
7+
58
_MULTIPART_MAX_NUMBER_OF_PARTS: Final[int] = 10000
69

710
# this is artifically defined, if possible we keep a maximum number of requests for parallel
@@ -34,3 +37,15 @@ def compute_num_file_chunks(file_size: ByteSize) -> tuple[int, ByteSize]:
3437
raise ValueError(
3538
msg,
3639
)
40+
41+
42+
def create_final_prefix(
43+
prefix: S3ObjectPrefix | None, *, is_partial_prefix: bool
44+
) -> str:
45+
final_prefix = f"{prefix}" if prefix else ""
46+
if prefix and not is_partial_prefix:
47+
final_prefix = (
48+
f"{final_prefix.rstrip(S3_OBJECT_DELIMITER)}{S3_OBJECT_DELIMITER}"
49+
)
50+
51+
return final_prefix

0 commit comments

Comments
 (0)