Skip to content

Commit 0e381fa

Browse files
authored
Add history inspect table (#828)
1 parent 4049971 commit 0e381fa

File tree

3 files changed

+118
-0
lines changed

3 files changed

+118
-0
lines changed

mkdocs/docs/api.md

+21
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,27 @@ latest_schema_id: [[null,0,0,0]]
679679
latest_sequence_number: [[null,0,0,0]]
680680
```
681681

682+
### History
683+
684+
To show a table's history:
685+
686+
```python
687+
table.inspect.history()
688+
```
689+
690+
```
691+
pyarrow.Table
692+
made_current_at: timestamp[ms] not null
693+
snapshot_id: int64 not null
694+
parent_id: int64
695+
is_current_ancestor: bool not null
696+
----
697+
made_current_at: [[2024-06-18 16:17:48.768,2024-06-18 16:17:49.240,2024-06-18 16:17:49.343,2024-06-18 16:17:49.511]]
698+
snapshot_id: [[4358109269873137077,3380769165026943338,4358109269873137077,3089420140651211776]]
699+
parent_id: [[null,4358109269873137077,null,4358109269873137077]]
700+
is_current_ancestor: [[true,false,true,true]]
701+
```
702+
682703
## Add Files
683704

684705
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.

pyiceberg/table/__init__.py

+28
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
SnapshotLogEntry,
114114
SnapshotSummaryCollector,
115115
Summary,
116+
ancestors_of,
116117
update_snapshot_summaries,
117118
)
118119
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -3879,6 +3880,33 @@ def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any
38793880
schema=table_schema,
38803881
)
38813882

3883+
def history(self) -> "pa.Table":
3884+
import pyarrow as pa
3885+
3886+
history_schema = pa.schema([
3887+
pa.field("made_current_at", pa.timestamp(unit="ms"), nullable=False),
3888+
pa.field("snapshot_id", pa.int64(), nullable=False),
3889+
pa.field("parent_id", pa.int64(), nullable=True),
3890+
pa.field("is_current_ancestor", pa.bool_(), nullable=False),
3891+
])
3892+
3893+
ancestors_ids = {snapshot.snapshot_id for snapshot in ancestors_of(self.tbl.current_snapshot(), self.tbl.metadata)}
3894+
3895+
history = []
3896+
metadata = self.tbl.metadata
3897+
3898+
for snapshot_entry in metadata.snapshot_log:
3899+
snapshot = metadata.snapshot_by_id(snapshot_entry.snapshot_id)
3900+
3901+
history.append({
3902+
"made_current_at": datetime.utcfromtimestamp(snapshot_entry.timestamp_ms / 1000.0),
3903+
"snapshot_id": snapshot_entry.snapshot_id,
3904+
"parent_id": snapshot.parent_snapshot_id if snapshot else None,
3905+
"is_current_ancestor": snapshot_entry.snapshot_id in ancestors_ids,
3906+
})
3907+
3908+
return pa.Table.from_pylist(history, schema=history_schema)
3909+
38823910

38833911
@dataclass(frozen=True)
38843912
class TablePartition:

tests/integration/test_inspect_table.py

+69
Original file line numberDiff line numberDiff line change
@@ -568,3 +568,72 @@ def test_inspect_metadata_log_entries(
568568
if column == "timestamp":
569569
continue
570570
assert left == right, f"Difference in column {column}: {left} != {right}"
571+
572+
573+
@pytest.mark.integration
574+
@pytest.mark.parametrize("format_version", [1, 2])
575+
def test_inspect_history(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
576+
identifier = "default.table_history"
577+
578+
try:
579+
session_catalog.drop_table(identifier=identifier)
580+
except NoSuchTableError:
581+
pass
582+
583+
spark.sql(
584+
f"""
585+
CREATE TABLE {identifier} (
586+
id int,
587+
data string
588+
)
589+
PARTITIONED BY (data)
590+
"""
591+
)
592+
593+
spark.sql(
594+
f"""
595+
INSERT INTO {identifier} VALUES (1, "a")
596+
"""
597+
)
598+
599+
table = session_catalog.load_table(identifier)
600+
first_snapshot = table.current_snapshot()
601+
snapshot_id = None if not first_snapshot else first_snapshot.snapshot_id
602+
603+
spark.sql(
604+
f"""
605+
INSERT INTO {identifier} VALUES (2, "b")
606+
"""
607+
)
608+
609+
spark.sql(
610+
f"""
611+
CALL integration.system.rollback_to_snapshot('{identifier}', {snapshot_id})
612+
"""
613+
)
614+
615+
spark.sql(
616+
f"""
617+
INSERT INTO {identifier} VALUES (3, "c")
618+
"""
619+
)
620+
621+
table.refresh()
622+
623+
df = table.inspect.history()
624+
625+
assert df.column_names == [
626+
"made_current_at",
627+
"snapshot_id",
628+
"parent_id",
629+
"is_current_ancestor",
630+
]
631+
632+
lhs = spark.table(f"{identifier}.history").toPandas()
633+
rhs = df.to_pandas()
634+
for column in df.column_names:
635+
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
636+
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
637+
# NaN != NaN in Python
638+
continue
639+
assert left == right, f"Difference in column {column}: {left} != {right}"

0 commit comments

Comments
 (0)