Skip to content

feat: validate snapshot write compatibility #1772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
28 changes: 28 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from pyiceberg.utils.bin_packing import ListPacker
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.properties import property_as_bool, property_as_int
from pyiceberg.utils.snapshot import ancestors_between

if TYPE_CHECKING:
from pyiceberg.table import Transaction
Expand Down Expand Up @@ -251,6 +252,12 @@ def _commit(self) -> UpdatesAndRequirements:
)
location_provider = self._transaction._table.location_provider()
manifest_list_file_path = location_provider.new_metadata_location(file_name)

# get current snapshot id and starting snapshot id, and validate that there are no conflicts
starting_snapshot_id = self._parent_snapshot_id
current_snapshot_id = self._transaction._table.refresh().metadata.current_snapshot_id
self._validate(starting_snapshot_id, current_snapshot_id)

with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
Expand Down Expand Up @@ -279,6 +286,27 @@ def _commit(self) -> UpdatesAndRequirements:
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),),
)

def _validate(self, starting_snapshot_id: Optional[int], current_snapshot_id: Optional[int]) -> None:
# get all the snapshots between the current snapshot id and the parent id
snapshots = ancestors_between(starting_snapshot_id, current_snapshot_id, self._transaction._table.metadata.snapshot_by_id)

# Define allowed operations for each type of operation
allowed_operations = {
Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE},
Operation.REPLACE: {Operation.APPEND},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Operation.REPLACE: {Operation.APPEND},
Operation.REPLACE: {},

I think the spec may need a re-review because I think it's inaccurate to say that we only need to verify that the files we are trying to delete are still available when we are executing a REPLACE or DELETE operation.

In Spark, we also validate whether there's been a conflicting appends when we use SERIALIZABLE isolation level:

https://github.com/apache/iceberg/blob/9fc49e187069c7ec2493ac0abf20f73175b3df89/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L368-L374

I think it would be helpful to introduce all three types of isolation levels NONE, SERIALIZABLE and SNAPSHOT, and verify if conflicting appends or deletes have been introduced in the underlying partitions to be aligned with the implementation in Spark

Copy link
Contributor

@Fokko Fokko Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sungwy for jumping in here, and creating the issues 🙌

Indeed, depending on whether we do snapshot or serializable isolation, we should allow for new data (or not). Would you be willing to split out the different levels in a separate PR? It would be nice to get this in so we can start working independently on the subtasks that you created.

I think this one was mostly blocked on #1903

Operation.OVERWRITE: set(),
Operation.DELETE: set(),
}

for snapshot in snapshots:
snapshot_operation = snapshot.summary.operation

if snapshot_operation not in allowed_operations[self._operation]:
raise ValueError(
f"Operation {snapshot_operation} is not allowed when performing {self._operation}. "
"Check for overlaps or conflicts."
)

@property
def snapshot_id(self) -> int:
return self._snapshot_id
Expand Down
53 changes: 53 additions & 0 deletions pyiceberg/utils/snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Callable, Iterable, Iterator, Optional
from pyiceberg.table.snapshots import Snapshot


def ancestors_of(snapshot_id: Optional[int], lookup_fn: Callable[[int], Optional[Snapshot]]) -> Iterable[Snapshot]:
def _snapshot_iterator(snapshot: Snapshot) -> Iterator[Snapshot]:
next_snapshot: Optional[Snapshot] = snapshot
consumed = False

while next_snapshot is not None:
if not consumed:
yield next_snapshot
consumed = True

parent_id = next_snapshot.parent_snapshot_id
if parent_id is None:
break

next_snapshot = lookup_fn(parent_id)
consumed = False

snapshot: Optional[Snapshot] = lookup_fn(snapshot_id)
if snapshot is not None:
return _snapshot_iterator(snapshot)
else:
return iter([])

def ancestors_between(starting_snapshot_id: Optional[int], current_snapshot_id: Optional[int], lookup_fn: Callable[[int], Optional[Snapshot]]) -> Iterable[Snapshot]:
if starting_snapshot_id == current_snapshot_id:
return iter([])

return ancestors_of(
current_snapshot_id,
lambda snapshot_id: lookup_fn(snapshot_id) if snapshot_id != starting_snapshot_id else None
)

Loading