Skip to content

Commit 068ee5d

Browse files
authored
Refactor Metadata in Transaction (#1903)
# Rationale for this change Today, we have a copy of the `TableMetadata` on the `Table` and the `Transaction`. This PR changes that logic to re-use the one on the table, and add the changes to the one on the `Transaction`. This also allows us to stack changes, for example, to first change a schema, and then write data with the new schema right away. Also a prerequisite for #1772 # Are these changes tested? Includes a new test :) # Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 02eecb6 commit 068ee5d

File tree

4 files changed

+46
-8
lines changed

4 files changed

+46
-8
lines changed

pyiceberg/table/__init__.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,6 @@ class TableProperties:
244244

245245
class Transaction:
246246
_table: Table
247-
table_metadata: TableMetadata
248247
_autocommit: bool
249248
_updates: Tuple[TableUpdate, ...]
250249
_requirements: Tuple[TableRequirement, ...]
@@ -256,12 +255,15 @@ def __init__(self, table: Table, autocommit: bool = False):
256255
table: The table that will be altered.
257256
autocommit: Option to automatically commit the changes when they are staged.
258257
"""
259-
self.table_metadata = table.metadata
260258
self._table = table
261259
self._autocommit = autocommit
262260
self._updates = ()
263261
self._requirements = ()
264262

263+
@property
264+
def table_metadata(self) -> TableMetadata:
265+
return update_table_metadata(self._table.metadata, self._updates)
266+
265267
def __enter__(self) -> Transaction:
266268
"""Start a transaction to update the table."""
267269
return self
@@ -287,8 +289,6 @@ def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequ
287289
if type(new_requirement) not in existing_requirements:
288290
self._requirements = self._requirements + (new_requirement,)
289291

290-
self.table_metadata = update_table_metadata(self.table_metadata, updates)
291-
292292
if self._autocommit:
293293
self.commit_transaction()
294294
self._updates = ()

pyiceberg/table/update/__init__.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,8 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: _Ta
360360
@_apply_table_update.register(AddPartitionSpecUpdate)
361361
def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
362362
for spec in base_metadata.partition_specs:
363-
if spec.spec_id == update.spec.spec_id:
363+
# Only raise in case of a discrepancy
364+
if spec.spec_id == update.spec.spec_id and spec != update.spec:
364365
raise ValueError(f"Partition spec with id {spec.spec_id} already exists: {spec}")
365366

366367
metadata_updates: Dict[str, Any] = {
@@ -525,6 +526,11 @@ def _(update: RemoveSnapshotRefUpdate, base_metadata: TableMetadata, context: _T
525526

526527
@_apply_table_update.register(AddSortOrderUpdate)
527528
def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
529+
for sort in base_metadata.sort_orders:
530+
# Only raise in case of a discrepancy
531+
if sort.order_id == update.sort_order.order_id and sort != update.sort_order:
532+
raise ValueError(f"Sort-order with id {sort.order_id} already exists: {sort}")
533+
528534
context.add_update(update)
529535
return base_metadata.model_copy(
530536
update={

tests/integration/test_rest_schema.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def test_schema_evolution_via_transaction(catalog: Catalog) -> None:
154154
NestedField(field_id=4, name="col_integer", field_type=IntegerType(), required=False),
155155
)
156156

157-
with pytest.raises(CommitFailedException) as exc_info:
157+
with pytest.raises(CommitFailedException, match="Requirement failed: current schema id has changed: expected 2, found 3"):
158158
with tbl.transaction() as tx:
159159
# Start a new update
160160
schema_update = tx.update_schema()
@@ -165,8 +165,6 @@ def test_schema_evolution_via_transaction(catalog: Catalog) -> None:
165165
# stage another update in the transaction
166166
schema_update.add_column("col_double", DoubleType()).commit()
167167

168-
assert "Requirement failed: current schema changed: expected id 2 != 3" in str(exc_info.value)
169-
170168
assert tbl.schema() == Schema(
171169
NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
172170
NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),

tests/integration/test_writes/test_writes.py

+34
Original file line numberDiff line numberDiff line change
@@ -1776,3 +1776,37 @@ def test_write_optional_list(session_catalog: Catalog) -> None:
17761776
session_catalog.load_table(identifier).append(df_2)
17771777

17781778
assert len(session_catalog.load_table(identifier).scan().to_arrow()) == 4
1779+
1780+
1781+
@pytest.mark.integration
1782+
@pytest.mark.parametrize("format_version", [1, 2])
1783+
def test_evolve_and_write(
1784+
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
1785+
) -> None:
1786+
identifier = "default.test_evolve_and_write"
1787+
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}, schema=Schema())
1788+
other_table = session_catalog.load_table(identifier)
1789+
1790+
numbers = pa.array([1, 2, 3, 4], type=pa.int32())
1791+
1792+
with tbl.update_schema() as upd:
1793+
# This is not known by other_table
1794+
upd.add_column("id", IntegerType())
1795+
1796+
with other_table.transaction() as tx:
1797+
# Refreshes the underlying metadata, and the schema
1798+
other_table.refresh()
1799+
tx.append(
1800+
pa.Table.from_arrays(
1801+
[
1802+
numbers,
1803+
],
1804+
schema=pa.schema(
1805+
[
1806+
pa.field("id", pa.int32(), nullable=True),
1807+
]
1808+
),
1809+
)
1810+
)
1811+
1812+
assert session_catalog.load_table(identifier).scan().to_arrow().column(0).combine_chunks() == numbers

0 commit comments

Comments
 (0)