-
Notifications
You must be signed in to change notification settings - Fork 297
feat: validate snapshot write compatibility #1772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
949e140
e631ddf
740db96
611b017
0923dc4
57e0f90
5122039
66849dd
0824c35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -60,6 +60,7 @@ | |||||
Snapshot, | ||||||
SnapshotSummaryCollector, | ||||||
Summary, | ||||||
ancestors_of, | ||||||
update_snapshot_summaries, | ||||||
) | ||||||
from pyiceberg.table.update import ( | ||||||
|
@@ -80,6 +81,7 @@ | |||||
from pyiceberg.utils.bin_packing import ListPacker | ||||||
from pyiceberg.utils.concurrent import ExecutorFactory | ||||||
from pyiceberg.utils.properties import property_as_bool, property_as_int | ||||||
from pyiceberg.utils.snapshot import ancestors_between | ||||||
|
||||||
if TYPE_CHECKING: | ||||||
from pyiceberg.table import Transaction | ||||||
|
@@ -251,6 +253,13 @@ def _commit(self) -> UpdatesAndRequirements: | |||||
) | ||||||
location_provider = self._transaction._table.location_provider() | ||||||
manifest_list_file_path = location_provider.new_metadata_location(file_name) | ||||||
|
||||||
# get current snapshot id and starting snapshot id, and validate that there are no conflicts | ||||||
if self._transaction._table.__class__.__name__ != "StagedTable": | ||||||
starting_snapshot = self._transaction.table_metadata.current_snapshot() | ||||||
current_snapshot = self._transaction._table.refresh().metadata.current_snapshot() | ||||||
self._validate(starting_snapshot, current_snapshot) | ||||||
|
||||||
with write_manifest_list( | ||||||
format_version=self._transaction.table_metadata.format_version, | ||||||
output_file=self._io.new_output(manifest_list_file_path), | ||||||
|
@@ -279,6 +288,30 @@ def _commit(self) -> UpdatesAndRequirements: | |||||
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), | ||||||
) | ||||||
|
||||||
def _validate(self, starting_snapshot: Optional[Snapshot], current_snapshot: Optional[Snapshot]) -> None: | ||||||
# Define allowed operations for each type of operation | ||||||
allowed_operations = { | ||||||
Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE}, | ||||||
Operation.REPLACE: {Operation.APPEND}, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think the spec may need a re-review because I think it's inaccurate to say that we only need to verify that the files we are trying to delete are still available when we are executing a In Spark, we also validate whether there's been a conflicting appends when we use I think it would be helpful to introduce all three types of isolation levels There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @sungwy for jumping in here, and creating the issues 🙌 Indeed, depending on whether we do snapshot or serializable isolation, we should allow for new data (or not). Would you be willing to split out the different levels in a separate PR? It would be nice to get this in so we can start working independently on the subtasks that you created. I think this one was mostly blocked on #1903 |
||||||
Operation.OVERWRITE: set(), | ||||||
Operation.DELETE: set(), | ||||||
} | ||||||
|
||||||
# get all the snapshots between the current snapshot id and the parent id | ||||||
snapshots = ancestors_of(current_snapshot, self._transaction._table.metadata) | ||||||
|
||||||
for snapshot in snapshots: | ||||||
if snapshot.snapshot_id == starting_snapshot.snapshot_id: | ||||||
break | ||||||
|
||||||
snapshot_operation = snapshot.summary.operation | ||||||
|
||||||
if snapshot_operation not in allowed_operations[self._operation]: | ||||||
raise ValueError( | ||||||
f"Operation {snapshot_operation} is not allowed when performing {self._operation}. " | ||||||
"Check for overlaps or conflicts." | ||||||
) | ||||||
|
||||||
@property | ||||||
def snapshot_id(self) -> int: | ||||||
return self._snapshot_id | ||||||
|
Uh oh!
There was an error while loading. Please reload this page.