Skip to content

feat: validate snapshot write compatibility #1772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
41 changes: 41 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from sortedcontainers import SortedList

from pyiceberg.exceptions import CommitFailedException
from pyiceberg.expressions import (
AlwaysFalse,
BooleanExpression,
Expand Down Expand Up @@ -60,6 +61,7 @@
Snapshot,
SnapshotSummaryCollector,
Summary,
ancestors_of,
update_snapshot_summaries,
)
from pyiceberg.table.update import (
Expand Down Expand Up @@ -251,6 +253,21 @@ def _commit(self) -> UpdatesAndRequirements:
)
location_provider = self._transaction._table.location_provider()
manifest_list_file_path = location_provider.new_metadata_location(file_name)

# get current snapshot id and starting snapshot id, and validate that there are no conflicts
from pyiceberg.table import StagedTable

if not isinstance(self._transaction._table, StagedTable):
starting_snapshot = self._transaction.table_metadata.current_snapshot()
current_snapshot = self._transaction._table.refresh().metadata.current_snapshot()

if starting_snapshot is not None and current_snapshot is not None:
self._validate(starting_snapshot, current_snapshot)

# If the current snapshot is not the same as the parent snapshot, update the parent snapshot id
if current_snapshot is not None and current_snapshot.snapshot_id != self._parent_snapshot_id:
self._parent_snapshot_id = current_snapshot.snapshot_id

with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
Expand Down Expand Up @@ -279,6 +296,30 @@ def _commit(self) -> UpdatesAndRequirements:
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),),
)

def _validate(self, starting_snapshot: Snapshot, current_snapshot: Snapshot) -> None:
# Define allowed operations for each type of operation
allowed_operations = {
Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE},
Operation.REPLACE: {Operation.APPEND},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Operation.REPLACE: {Operation.APPEND},
Operation.REPLACE: {},

I think the spec may need a re-review because I think it's inaccurate to say that we only need to verify that the files we are trying to delete are still available when we are executing a REPLACE or DELETE operation.

In Spark, we also validate whether there's been a conflicting appends when we use SERIALIZABLE isolation level:

https://github.com/apache/iceberg/blob/9fc49e187069c7ec2493ac0abf20f73175b3df89/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L368-L374

I think it would be helpful to introduce all three types of isolation levels NONE, SERIALIZABLE and SNAPSHOT, and verify if conflicting appends or deletes have been introduced in the underlying partitions to be aligned with the implementation in Spark

Copy link
Contributor

@Fokko Fokko Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sungwy for jumping in here, and creating the issues 🙌

Indeed, depending on whether we do snapshot or serializable isolation, we should allow for new data (or not). Would you be willing to split out the different levels in a separate PR? It would be nice to get this in so we can start working independently on the subtasks that you created.

I think this one was mostly blocked on #1903

Operation.OVERWRITE: set(),
Operation.DELETE: set(),
}

# get all the snapshots between the current snapshot id and the parent id
snapshots = ancestors_of(current_snapshot, self._transaction._table.metadata)

for snapshot in snapshots:
if snapshot.snapshot_id == starting_snapshot.snapshot_id:
break

snapshot_operation = snapshot.summary.operation if snapshot.summary is not None else None

if snapshot_operation not in allowed_operations[self._operation]:
raise CommitFailedException(
f"Operation {snapshot_operation} is not allowed when performing {self._operation}. "
"Check for overlaps or conflicts."
)

@property
def snapshot_id(self) -> int:
return self._snapshot_id
Expand Down
69 changes: 68 additions & 1 deletion tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pytest_mock.plugin import MockerFixture

from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.exceptions import CommitFailedException, NoSuchTableError
from pyiceberg.io import FileIO
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
Expand Down Expand Up @@ -850,3 +850,70 @@ def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file
with pytest.raises(ValueError) as exc_info:
tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True)
assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_delete_delete(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_conflict"
tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema)
tbl1.append(arrow_table_with_null)
tbl2 = session_catalog.load_table(identifier)

tbl1.delete("string == 'z'")

with pytest.raises(
CommitFailedException, match="Operation .* is not allowed when performing .*. Check for overlaps or conflicts."
):
# tbl2 isn't aware of the commit by tbl1
tbl2.delete("string == 'z'")


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_delete_append(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_conflict"
tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema)
tbl1.append(arrow_table_with_null)
tbl2 = session_catalog.load_table(identifier)

# This is allowed
tbl1.delete("string == 'z'")
tbl2.append(arrow_table_with_null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should verify the content of the table here



@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_append_delete(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_conflict"
tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema)
tbl1.append(arrow_table_with_null)
tbl2 = session_catalog.load_table(identifier)

tbl1.append(arrow_table_with_null)

with pytest.raises(
CommitFailedException, match="Operation .* is not allowed when performing .*. Check for overlaps or conflicts."
):
# tbl2 isn't aware of the commit by tbl1
tbl2.delete("string == 'z'")


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_append_append(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_conflict"
tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema)
tbl1.append(arrow_table_with_null)
tbl2 = session_catalog.load_table(identifier)

tbl1.append(arrow_table_with_null)
tbl2.append(arrow_table_with_null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we introduce an assertion here to verify the content of the table is as we'd expect? (with 3*arrow_table_with_null data)

Loading