Skip to content

Commit 9b15c86

Browse files
committed
Table statistics
1 parent 384e229 commit 9b15c86

File tree

7 files changed

+215
-45
lines changed

7 files changed

+215
-45
lines changed

dev/provision.py

+24
Original file line numberDiff line numberDiff line change
@@ -399,3 +399,27 @@
399399
)
400400
spark.sql(f"ALTER TABLE {catalog_name}.default.test_empty_scan_ordered_str WRITE ORDERED BY id")
401401
spark.sql(f"INSERT INTO {catalog_name}.default.test_empty_scan_ordered_str VALUES 'a', 'c'")
402+
403+
spark.sql(
404+
f"""
405+
CREATE OR REPLACE TABLE {catalog_name}.default.test_table_statistics_operations (
406+
number integer
407+
)
408+
USING iceberg
409+
TBLPROPERTIES (
410+
'format-version'='2'
411+
);
412+
"""
413+
)
414+
spark.sql(
415+
f"""
416+
INSERT INTO {catalog_name}.default.test_table_statistics_operations
417+
VALUES (1)
418+
"""
419+
)
420+
spark.sql(
421+
f"""
422+
INSERT INTO {catalog_name}.default.test_table_statistics_operations
423+
VALUES (2)
424+
"""
425+
)

mkdocs/docs/api.md

+22
Original file line numberDiff line numberDiff line change
@@ -1129,6 +1129,28 @@ with table.manage_snapshots() as ms:
11291129
ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")
11301130
```
11311131

1132+
## Table Statistics Management
1133+
1134+
Manage table statistics with operations through the `Table` API:
1135+
1136+
```python
1137+
# To run a specific operation
1138+
table.update_statistics().set_statistics(snapshot_id, statistics_file).commit()
1139+
# To run multiple operations
1140+
table.update_statistics()
1141+
.set_statistics(snapshot_id1, statistics_file1)
1142+
.remove_statistics(snapshot_id2)
1143+
# Operations are applied on commit.
1144+
```
1145+
1146+
You can also use context managers to make more changes:
1147+
1148+
```python
1149+
with table.update_statistics() as update:
1150+
update.set_statistics(1, statistics_file)
1151+
update.remove_statistics(2)
1152+
```
1153+
11321154
## Query the data
11331155

11341156
To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:

pyiceberg/table/__init__.py

+18-39
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@
8484
SnapshotLogEntry,
8585
)
8686
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
87-
from pyiceberg.table.statistics import StatisticsFile
8887
from pyiceberg.table.update import (
8988
AddPartitionSpecUpdate,
9089
AddSchemaUpdate,
@@ -95,14 +94,12 @@
9594
AssertTableUUID,
9695
AssignUUIDUpdate,
9796
RemovePropertiesUpdate,
98-
RemoveStatisticsUpdate,
9997
SetCurrentSchemaUpdate,
10098
SetDefaultSortOrderUpdate,
10199
SetDefaultSpecUpdate,
102100
SetLocationUpdate,
103101
SetPropertiesUpdate,
104102
SetSnapshotRefUpdate,
105-
SetStatisticsUpdate,
106103
TableRequirement,
107104
TableUpdate,
108105
UpdatesAndRequirements,
@@ -119,6 +116,7 @@
119116
_OverwriteFiles,
120117
)
121118
from pyiceberg.table.update.spec import UpdateSpec
119+
from pyiceberg.table.update.statistics import UpdateStatistics
122120
from pyiceberg.typedef import (
123121
EMPTY_DICT,
124122
IcebergBaseModel,
@@ -666,42 +664,6 @@ def update_location(self, location: str) -> Transaction:
666664
"""
667665
raise NotImplementedError("Not yet implemented")
668666

669-
def set_statistics(self, snapshot_id: int, statistics_file: StatisticsFile) -> Transaction:
670-
"""Set the statistics for a snapshot.
671-
672-
Args:
673-
snapshot_id: The snapshot ID to set the statistics for.
674-
statistics_file: The statistics file to set.
675-
676-
Returns:
677-
The alter table builder.
678-
"""
679-
updates = (
680-
SetStatisticsUpdate(
681-
snapshot_id=snapshot_id,
682-
statistics=statistics_file,
683-
),
684-
)
685-
686-
return self._apply(updates, ())
687-
688-
def remove_statistics(self, snapshot_id: int) -> Transaction:
689-
"""Remove the statistics for a snapshot.
690-
691-
Args:
692-
snapshot_id: The snapshot ID to remove the statistics for.
693-
694-
Returns:
695-
The alter table builder.
696-
"""
697-
updates = (
698-
RemoveStatisticsUpdate(
699-
snapshot_id=snapshot_id,
700-
),
701-
)
702-
703-
return self._apply(updates, ())
704-
705667
def commit_transaction(self) -> Table:
706668
"""Commit the changes to the catalog.
707669
@@ -1021,6 +983,23 @@ def manage_snapshots(self) -> ManageSnapshots:
1021983
"""
1022984
return ManageSnapshots(transaction=Transaction(self, autocommit=True))
1023985

986+
def update_statistics(self) -> UpdateStatistics:
987+
"""
988+
Shorthand to run statistics management operations like add statistics and remove statistics.
989+
990+
Use table.update_statistics().<operation>().commit() to run a specific operation.
991+
Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.
992+
993+
Pending changes are applied on commit.
994+
995+
We can also use context managers to make more changes. For example:
996+
997+
with table.update_statistics() as update:
998+
update.set_statistics(snapshot_id=1, statistics_file=statistics_file)
999+
update.remove_statistics(snapshot_id=2)
1000+
"""
1001+
return UpdateStatistics(transaction=Transaction(self, autocommit=True))
1002+
10241003
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
10251004
"""Create a new UpdateSchema to alter the columns of this table.
10261005

pyiceberg/table/statistics.py

+8
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,12 @@ class StatisticsFile(IcebergBaseModel):
3838
statistics_path: str = Field(alias="statistics-path")
3939
file_size_in_bytes: int = Field(alias="file-size-in-bytes")
4040
file_footer_size_in_bytes: int = Field(alias="file-footer-size-in-bytes")
41+
key_metadata: Optional[str] = Field(alias="key-metadata")
4142
blob_metadata: List[BlobMetadata] = Field(alias="blob-metadata")
43+
44+
45+
def reject_statistics(
46+
statistics: List[StatisticsFile],
47+
reject_snapshot_id: int,
48+
) -> List[StatisticsFile]:
49+
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]

pyiceberg/table/update/__init__.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
SnapshotLogEntry,
3838
)
3939
from pyiceberg.table.sorting import SortOrder
40-
from pyiceberg.table.statistics import StatisticsFile
40+
from pyiceberg.table.statistics import StatisticsFile, reject_statistics
4141
from pyiceberg.typedef import (
4242
IcebergBaseModel,
4343
Properties,
@@ -496,19 +496,18 @@ def _(update: SetStatisticsUpdate, base_metadata: TableMetadata, context: _Table
496496
if update.snapshot_id != update.statistics.snapshot_id:
497497
raise ValueError("Snapshot id in statistics does not match the snapshot id in the update")
498498

499-
rest_statistics = [stat for stat in base_metadata.statistics if stat.snapshot_id != update.snapshot_id]
500-
499+
statistics = reject_statistics(base_metadata.statistics, update.snapshot_id)
501500
context.add_update(update)
502-
return base_metadata.model_copy(update={"statistics": rest_statistics + [update.statistics]})
501+
502+
return base_metadata.model_copy(update={"statistics": statistics + [update.statistics]})
503503

504504

505505
@_apply_table_update.register(RemoveStatisticsUpdate)
506506
def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
507507
if not any(stat.snapshot_id == update.snapshot_id for stat in base_metadata.statistics):
508508
raise ValueError(f"Statistics with snapshot id {update.snapshot_id} does not exist")
509509

510-
statistics = [stat for stat in base_metadata.statistics if stat.snapshot_id != update.snapshot_id]
511-
510+
statistics = reject_statistics(base_metadata.statistics, update.snapshot_id)
512511
context.add_update(update)
513512

514513
return base_metadata.model_copy(update={"statistics": statistics})

pyiceberg/table/update/statistics.py

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from typing import TYPE_CHECKING, Tuple
18+
19+
from pyiceberg.table.statistics import StatisticsFile
20+
from pyiceberg.table.update import (
21+
RemoveStatisticsUpdate,
22+
SetStatisticsUpdate,
23+
TableUpdate,
24+
UpdatesAndRequirements,
25+
UpdateTableMetadata,
26+
)
27+
28+
if TYPE_CHECKING:
29+
from pyiceberg.table import Transaction
30+
31+
32+
class UpdateStatistics(UpdateTableMetadata["UpdateStatistics"]):
33+
"""
34+
Run statistics management operations using APIs.
35+
36+
APIs include set_statistics and remove statistics operations.
37+
38+
Use table.update_statistics().<operation>().commit() to run a specific operation.
39+
Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.
40+
41+
Pending changes are applied on commit.
42+
43+
We can also use context managers to make more changes. For example:
44+
45+
with table.update_statistics() as update:
46+
update.set_statistics(snapshot_id=1, statistics_file=statistics_file)
47+
update.remove_statistics(snapshot_id=2)
48+
"""
49+
50+
_updates: Tuple[TableUpdate, ...] = ()
51+
52+
def __init__(self, transaction: "Transaction") -> None:
53+
super().__init__(transaction)
54+
55+
def set_statistics(self, snapshot_id: int, statistics_file: StatisticsFile) -> "UpdateStatistics":
56+
self._updates += (
57+
SetStatisticsUpdate(
58+
snapshot_id=snapshot_id,
59+
statistics=statistics_file,
60+
),
61+
)
62+
63+
return self
64+
65+
def remove_statistics(self, snapshot_id: int) -> "UpdateStatistics":
66+
self._updates = (
67+
RemoveStatisticsUpdate(
68+
snapshot_id=snapshot_id,
69+
),
70+
)
71+
72+
return self
73+
74+
def _commit(self) -> UpdatesAndRequirements:
75+
return self._updates, ()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import pytest
18+
19+
from pyiceberg.catalog import Catalog
20+
from pyiceberg.table.statistics import BlobMetadata, StatisticsFile
21+
22+
23+
@pytest.mark.integration
24+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
25+
def test_manage_statistics(catalog: Catalog) -> None:
26+
identifier = "default.test_table_statistics_operations"
27+
tbl = catalog.load_table(identifier)
28+
29+
add_snapshot_id_1 = tbl.history()[0].snapshot_id
30+
add_snapshot_id_2 = tbl.history()[1].snapshot_id
31+
32+
def create_statistics_file(snapshot_id: int) -> StatisticsFile:
33+
blob_metadata = BlobMetadata(
34+
type="boring-type",
35+
snapshot_id=snapshot_id,
36+
sequence_number=2,
37+
fields=[1],
38+
properties={"prop-key": "prop-value"},
39+
)
40+
41+
statistics_file = StatisticsFile(
42+
snapshot_id=snapshot_id,
43+
statistics_path="s3://bucket/warehouse/stats.puffin",
44+
file_size_in_bytes=124,
45+
file_footer_size_in_bytes=27,
46+
blob_metadata=[blob_metadata],
47+
)
48+
49+
return statistics_file
50+
51+
statistics_file_snap_1 = create_statistics_file(add_snapshot_id_1)
52+
statistics_file_snap_2 = create_statistics_file(add_snapshot_id_2)
53+
54+
with tbl.update_statistics() as update:
55+
update.set_statistics(add_snapshot_id_1, statistics_file_snap_1)
56+
update.set_statistics(add_snapshot_id_2, statistics_file_snap_2)
57+
58+
assert len(tbl.metadata.statistics) == 2
59+
60+
with tbl.update_statistics() as update:
61+
update.remove_statistics(add_snapshot_id_1)
62+
63+
assert len(tbl.metadata.statistics) == 1

0 commit comments

Comments
 (0)