Skip to content

Commit 1ec5edd

Browse files
committed
Merge branch 'main' into manifest_compaction
2 parents 8510f71 + 94e8a98 commit 1ec5edd

File tree

8 files changed

+258
-188
lines changed

8 files changed

+258
-188
lines changed

mkdocs/requirements.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ mkdocstrings-python==1.10.3
2323
mkdocs-literate-nav==0.6.1
2424
mkdocs-autorefs==1.0.1
2525
mkdocs-gen-files==0.5.0
26-
mkdocs-material==9.5.25
26+
mkdocs-material==9.5.26
2727
mkdocs-material-extensions==1.3.1
2828
mkdocs-section-index==0.3.9

poetry.lock

+146-147
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/io/pyarrow.py

+16-9
Original file line numberDiff line numberDiff line change
@@ -469,15 +469,18 @@ def __setstate__(self, state: Dict[str, Any]) -> None:
469469
self.fs_by_scheme = lru_cache(self._initialize_fs)
470470

471471

472-
def schema_to_pyarrow(schema: Union[Schema, IcebergType], metadata: Dict[bytes, bytes] = EMPTY_DICT) -> pa.schema:
473-
return visit(schema, _ConvertToArrowSchema(metadata))
472+
def schema_to_pyarrow(
473+
schema: Union[Schema, IcebergType], metadata: Dict[bytes, bytes] = EMPTY_DICT, include_field_ids: bool = True
474+
) -> pa.schema:
475+
return visit(schema, _ConvertToArrowSchema(metadata, include_field_ids))
474476

475477

476478
class _ConvertToArrowSchema(SchemaVisitorPerPrimitiveType[pa.DataType]):
477479
_metadata: Dict[bytes, bytes]
478480

479-
def __init__(self, metadata: Dict[bytes, bytes] = EMPTY_DICT) -> None:
481+
def __init__(self, metadata: Dict[bytes, bytes] = EMPTY_DICT, include_field_ids: bool = True) -> None:
480482
self._metadata = metadata
483+
self._include_field_ids = include_field_ids
481484

482485
def schema(self, _: Schema, struct_result: pa.StructType) -> pa.schema:
483486
return pa.schema(list(struct_result), metadata=self._metadata)
@@ -486,13 +489,17 @@ def struct(self, _: StructType, field_results: List[pa.DataType]) -> pa.DataType
486489
return pa.struct(field_results)
487490

488491
def field(self, field: NestedField, field_result: pa.DataType) -> pa.Field:
492+
metadata = {}
493+
if field.doc:
494+
metadata[PYARROW_FIELD_DOC_KEY] = field.doc
495+
if self._include_field_ids:
496+
metadata[PYARROW_PARQUET_FIELD_ID_KEY] = str(field.field_id)
497+
489498
return pa.field(
490499
name=field.name,
491500
type=field_result,
492501
nullable=field.optional,
493-
metadata={PYARROW_FIELD_DOC_KEY: field.doc, PYARROW_PARQUET_FIELD_ID_KEY: str(field.field_id)}
494-
if field.doc
495-
else {PYARROW_PARQUET_FIELD_ID_KEY: str(field.field_id)},
502+
metadata=metadata,
496503
)
497504

498505
def list(self, list_type: ListType, element_result: pa.DataType) -> pa.DataType:
@@ -1130,7 +1137,7 @@ def project_table(
11301137
tables = [f.result() for f in completed_futures if f.result()]
11311138

11321139
if len(tables) < 1:
1133-
return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema))
1140+
return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema, include_field_ids=False))
11341141

11351142
result = pa.concat_tables(tables)
11361143

@@ -1161,7 +1168,7 @@ def __init__(self, file_schema: Schema):
11611168
def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
11621169
file_field = self.file_schema.find_field(field.field_id)
11631170
if field.field_type.is_primitive and field.field_type != file_field.field_type:
1164-
return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type)))
1171+
return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=False))
11651172
return values
11661173

11671174
def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Field:
@@ -1188,7 +1195,7 @@ def struct(
11881195
field_arrays.append(array)
11891196
fields.append(self._construct_field(field, array.type))
11901197
elif field.optional:
1191-
arrow_type = schema_to_pyarrow(field.field_type)
1198+
arrow_type = schema_to_pyarrow(field.field_type, include_field_ids=False)
11921199
field_arrays.append(pa.nulls(len(struct_array), type=arrow_type))
11931200
fields.append(self._construct_field(field, arrow_type))
11941201
else:

pyiceberg/table/__init__.py

+12
Original file line numberDiff line numberDiff line change
@@ -1359,6 +1359,18 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
13591359
return self.snapshot_by_id(ref.snapshot_id)
13601360
return None
13611361

1362+
def snapshot_as_of_timestamp(self, timestamp_ms: int, inclusive: bool = True) -> Optional[Snapshot]:
1363+
"""Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.
1364+
1365+
Args:
1366+
timestamp_ms: Find snapshot that was current at/before this timestamp
1367+
inclusive: Includes timestamp_ms in search when True. Excludes timestamp_ms when False
1368+
"""
1369+
for log_entry in reversed(self.history()):
1370+
if (inclusive and log_entry.timestamp_ms <= timestamp_ms) or log_entry.timestamp_ms < timestamp_ms:
1371+
return self.snapshot_by_id(log_entry.snapshot_id)
1372+
return None
1373+
13621374
def history(self) -> List[SnapshotLogEntry]:
13631375
"""Get the snapshot history of this table."""
13641376
return self.metadata.snapshot_log

pyiceberg/table/snapshots.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,22 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from __future__ import annotations
18+
1719
import time
1820
from collections import defaultdict
1921
from enum import Enum
20-
from typing import Any, DefaultDict, Dict, List, Mapping, Optional
22+
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional
2123

2224
from pydantic import Field, PrivateAttr, model_serializer
2325

2426
from pyiceberg.io import FileIO
2527
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list
2628
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
2729
from pyiceberg.schema import Schema
30+
31+
if TYPE_CHECKING:
32+
from pyiceberg.table.metadata import TableMetadata
2833
from pyiceberg.typedef import IcebergBaseModel
2934

3035
ADDED_DATA_FILES = "added-data-files"
@@ -412,3 +417,12 @@ def _update_totals(total_property: str, added_property: str, removed_property: s
412417
def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
413418
if num > 0:
414419
properties[property_name] = str(num)
420+
421+
422+
def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMetadata) -> Iterable[Snapshot]:
423+
"""Get the ancestors of and including the given snapshot."""
424+
if current_snapshot:
425+
yield current_snapshot
426+
if current_snapshot.parent_snapshot_id is not None:
427+
if parent := table_metadata.snapshot_by_id(current_snapshot.parent_snapshot_id):
428+
yield from ancestors_of(parent, table_metadata)

pyproject.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ zstandard = ">=0.13.0,<1.0.0"
6060
tenacity = ">=8.2.3,<9.0.0"
6161
pyarrow = { version = ">=9.0.0,<17.0.0", optional = true }
6262
pandas = { version = ">=1.0.0,<3.0.0", optional = true }
63-
duckdb = { version = ">=0.5.0,<1.0.0", optional = true }
63+
duckdb = { version = ">=0.5.0,<2.0.0", optional = true }
6464
ray = { version = ">=2.0.0,<2.10.0", optional = true }
6565
python-snappy = { version = ">=0.6.0,<1.0.0", optional = true }
6666
thrift = { version = ">=0.13.0,<1.0.0", optional = true }
@@ -82,7 +82,7 @@ fastavro = "1.9.4"
8282
coverage = { version = "^7.4.2", extras = ["toml"] }
8383
requests-mock = "1.12.1"
8484
moto = { version = "^5.0.2", extras = ["server"] }
85-
typing-extensions = "4.12.0"
85+
typing-extensions = "4.12.2"
8686
pytest-mock = "3.14.0"
8787
pyspark = "3.5.1"
8888
cython = "3.0.10"

tests/io/test_pyarrow.py

+29-28
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ def test_deleting_hdfs_file_not_found() -> None:
344344
assert "Cannot delete file, does not exist:" in str(exc_info.value)
345345

346346

347-
def test_schema_to_pyarrow_schema(table_schema_nested: Schema) -> None:
347+
def test_schema_to_pyarrow_schema_include_field_ids(table_schema_nested: Schema) -> None:
348348
actual = schema_to_pyarrow(table_schema_nested)
349349
expected = """foo: string
350350
-- field metadata --
@@ -402,6 +402,30 @@ def test_schema_to_pyarrow_schema(table_schema_nested: Schema) -> None:
402402
assert repr(actual) == expected
403403

404404

405+
def test_schema_to_pyarrow_schema_exclude_field_ids(table_schema_nested: Schema) -> None:
406+
actual = schema_to_pyarrow(table_schema_nested, include_field_ids=False)
407+
expected = """foo: string
408+
bar: int32 not null
409+
baz: bool
410+
qux: list<element: string not null> not null
411+
child 0, element: string not null
412+
quux: map<string, map<string, int32>> not null
413+
child 0, entries: struct<key: string not null, value: map<string, int32> not null> not null
414+
child 0, key: string not null
415+
child 1, value: map<string, int32> not null
416+
child 0, entries: struct<key: string not null, value: int32 not null> not null
417+
child 0, key: string not null
418+
child 1, value: int32 not null
419+
location: list<element: struct<latitude: float, longitude: float> not null> not null
420+
child 0, element: struct<latitude: float, longitude: float> not null
421+
child 0, latitude: float
422+
child 1, longitude: float
423+
person: struct<name: string, age: int32 not null>
424+
child 0, name: string
425+
child 1, age: int32 not null"""
426+
assert repr(actual) == expected
427+
428+
405429
def test_fixed_type_to_pyarrow() -> None:
406430
length = 22
407431
iceberg_type = FixedType(length)
@@ -945,23 +969,13 @@ def test_projection_add_column(file_int: str) -> None:
945969
== """id: int32
946970
list: list<element: int32>
947971
child 0, element: int32
948-
-- field metadata --
949-
PARQUET:field_id: '21'
950972
map: map<int32, string>
951973
child 0, entries: struct<key: int32 not null, value: string> not null
952974
child 0, key: int32 not null
953-
-- field metadata --
954-
PARQUET:field_id: '31'
955975
child 1, value: string
956-
-- field metadata --
957-
PARQUET:field_id: '32'
958976
location: struct<lat: double, lon: double>
959977
child 0, lat: double
960-
-- field metadata --
961-
PARQUET:field_id: '41'
962-
child 1, lon: double
963-
-- field metadata --
964-
PARQUET:field_id: '42'"""
978+
child 1, lon: double"""
965979
)
966980

967981

@@ -1014,11 +1028,7 @@ def test_projection_add_column_struct(schema_int: Schema, file_int: str) -> None
10141028
== """id: map<int32, string>
10151029
child 0, entries: struct<key: int32 not null, value: string> not null
10161030
child 0, key: int32 not null
1017-
-- field metadata --
1018-
PARQUET:field_id: '3'
1019-
child 1, value: string
1020-
-- field metadata --
1021-
PARQUET:field_id: '4'"""
1031+
child 1, value: string"""
10221032
)
10231033

10241034

@@ -1062,12 +1072,7 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None:
10621072
def test_projection_filter(schema_int: Schema, file_int: str) -> None:
10631073
result_table = project(schema_int, [file_int], GreaterThan("id", 4))
10641074
assert len(result_table.columns[0]) == 0
1065-
assert (
1066-
repr(result_table.schema)
1067-
== """id: int32
1068-
-- field metadata --
1069-
PARQUET:field_id: '1'"""
1070-
)
1075+
assert repr(result_table.schema) == """id: int32"""
10711076

10721077

10731078
def test_projection_filter_renamed_column(file_int: str) -> None:
@@ -1304,11 +1309,7 @@ def test_projection_nested_struct_different_parent_id(file_struct: str) -> None:
13041309
repr(result_table.schema)
13051310
== """location: struct<lat: double, long: double>
13061311
child 0, lat: double
1307-
-- field metadata --
1308-
PARQUET:field_id: '41'
1309-
child 1, long: double
1310-
-- field metadata --
1311-
PARQUET:field_id: '42'"""
1312+
child 1, long: double"""
13121313
)
13131314

13141315

tests/table/test_init.py

+37
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
Snapshot,
7777
SnapshotLogEntry,
7878
Summary,
79+
ancestors_of,
7980
)
8081
from pyiceberg.table.sorting import (
8182
NullOrder,
@@ -204,6 +205,42 @@ def test_snapshot_by_id(table_v2: Table) -> None:
204205
)
205206

206207

208+
def test_snapshot_by_timestamp(table_v2: Table) -> None:
209+
assert table_v2.snapshot_as_of_timestamp(1515100955770) == Snapshot(
210+
snapshot_id=3051729675574597004,
211+
parent_snapshot_id=None,
212+
sequence_number=0,
213+
timestamp_ms=1515100955770,
214+
manifest_list="s3://a/b/1.avro",
215+
summary=Summary(Operation.APPEND),
216+
schema_id=None,
217+
)
218+
assert table_v2.snapshot_as_of_timestamp(1515100955770, inclusive=False) is None
219+
220+
221+
def test_ancestors_of(table_v2: Table) -> None:
222+
assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [
223+
Snapshot(
224+
snapshot_id=3055729675574597004,
225+
parent_snapshot_id=3051729675574597004,
226+
sequence_number=1,
227+
timestamp_ms=1555100955770,
228+
manifest_list="s3://a/b/2.avro",
229+
summary=Summary(Operation.APPEND),
230+
schema_id=1,
231+
),
232+
Snapshot(
233+
snapshot_id=3051729675574597004,
234+
parent_snapshot_id=None,
235+
sequence_number=0,
236+
timestamp_ms=1515100955770,
237+
manifest_list="s3://a/b/1.avro",
238+
summary=Summary(Operation.APPEND),
239+
schema_id=None,
240+
),
241+
]
242+
243+
207244
def test_snapshot_by_id_does_not_exist(table_v2: Table) -> None:
208245
assert table_v2.snapshot_by_id(-1) is None
209246

0 commit comments

Comments
 (0)