Skip to content

feat: validate snapshot write compatibility #1772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
33 changes: 33 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
Snapshot,
SnapshotSummaryCollector,
Summary,
ancestors_of,
update_snapshot_summaries,
)
from pyiceberg.table.update import (
Expand All @@ -80,6 +81,7 @@
from pyiceberg.utils.bin_packing import ListPacker
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.properties import property_as_bool, property_as_int
from pyiceberg.utils.snapshot import ancestors_between

if TYPE_CHECKING:
from pyiceberg.table import Transaction
Expand Down Expand Up @@ -251,6 +253,13 @@ def _commit(self) -> UpdatesAndRequirements:
)
location_provider = self._transaction._table.location_provider()
manifest_list_file_path = location_provider.new_metadata_location(file_name)

# get current snapshot id and starting snapshot id, and validate that there are no conflicts
if self._transaction._table.__class__.__name__ != "StagedTable":
starting_snapshot = self._transaction.table_metadata.current_snapshot()
current_snapshot = self._transaction._table.refresh().metadata.current_snapshot()
self._validate(starting_snapshot, current_snapshot)

with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
Expand Down Expand Up @@ -279,6 +288,30 @@ def _commit(self) -> UpdatesAndRequirements:
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),),
)

def _validate(self, starting_snapshot: Optional[Snapshot], current_snapshot: Optional[Snapshot]) -> None:
# Define allowed operations for each type of operation
allowed_operations = {
Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE},
Operation.REPLACE: {Operation.APPEND},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Operation.REPLACE: {Operation.APPEND},
Operation.REPLACE: {},

I think the spec may need a re-review because I think it's inaccurate to say that we only need to verify that the files we are trying to delete are still available when we are executing a REPLACE or DELETE operation.

In Spark, we also validate whether there's been a conflicting appends when we use SERIALIZABLE isolation level:

https://github.com/apache/iceberg/blob/9fc49e187069c7ec2493ac0abf20f73175b3df89/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L368-L374

I think it would be helpful to introduce all three types of isolation levels NONE, SERIALIZABLE and SNAPSHOT, and verify if conflicting appends or deletes have been introduced in the underlying partitions to be aligned with the implementation in Spark

Copy link
Contributor

@Fokko Fokko Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sungwy for jumping in here, and creating the issues 🙌

Indeed, depending on whether we do snapshot or serializable isolation, we should allow for new data (or not). Would you be willing to split out the different levels in a separate PR? It would be nice to get this in so we can start working independently on the subtasks that you created.

I think this one was mostly blocked on #1903

Operation.OVERWRITE: set(),
Operation.DELETE: set(),
}

# get all the snapshots between the current snapshot id and the parent id
snapshots = ancestors_of(current_snapshot, self._transaction._table.metadata)

for snapshot in snapshots:
if snapshot.snapshot_id == starting_snapshot.snapshot_id:
break

snapshot_operation = snapshot.summary.operation

if snapshot_operation not in allowed_operations[self._operation]:
raise ValueError(
f"Operation {snapshot_operation} is not allowed when performing {self._operation}. "
"Check for overlaps or conflicts."
)

@property
def snapshot_id(self) -> int:
return self._snapshot_id
Expand Down
Loading