From beff92b38eb370bb33bc782126e332c6e197e0bf Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 15:53:45 -0400 Subject: [PATCH 01/24] feat: validation history --- pyiceberg/table/snapshots.py | 20 +++++++++++- pyiceberg/table/update/validate.py | 50 ++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 pyiceberg/table/update/validate.py diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index a5515f12b0..d6b8a5a7db 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -25,7 +25,7 @@ from pydantic import Field, PrivateAttr, model_serializer from pyiceberg.io import FileIO -from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, _manifests from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema @@ -255,6 +255,14 @@ def manifests(self, io: FileIO) -> List[ManifestFile]: """Return the manifests for the given snapshot.""" return list(_manifests(io, self.manifest_list)) + def data_manifests(self, io: FileIO) -> List[ManifestFile]: + """Return the data manifests for the given snapshot.""" + return [manifest for manifest in self.manifests(io) if manifest.content == ManifestContent.DATA] + + def delete_manifests(self, io: FileIO) -> List[ManifestFile]: + """Return the delete manifests for the given snapshot.""" + return [manifest for manifest in self.manifests(io) if manifest.content == ManifestContent.DELETES] + class MetadataLogEntry(IcebergBaseModel): metadata_file: str = Field(alias="metadata-file") @@ -429,3 +437,13 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta if snapshot.parent_snapshot_id is None: break snapshot = table_metadata.snapshot_by_id(snapshot.parent_snapshot_id) + + +def ancestors_between( + current_snapshot: Optional[Snapshot], oldest_snapshot: Optional[Snapshot], table_metadata: TableMetadata +) -> Iterable[Snapshot]: + """Get the ancestors of and including the given snapshot between the latest and oldest snapshot.""" + for snapshot in ancestors_of(current_snapshot, table_metadata): + if snapshot.snapshot_id == oldest_snapshot.snapshot_id: + break + yield snapshot diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py new file mode 100644 index 0000000000..61343831b5 --- /dev/null +++ b/pyiceberg/table/update/validate.py @@ -0,0 +1,50 @@ +class ValidationException(Exception): + """Raised when validation fails.""" + + + +from pyiceberg.table import Table +from pyiceberg.table.snapshots import Snapshot, Operation, ancestors_between +from pyiceberg.manifest import ManifestFile, ManifestContent + + +def validation_history( + table: Table, + starting_snapshot_id: int, + matching_operations: set[Operation], + manifest_content: ManifestContent, + parent: Snapshot, +) -> tuple[list[ManifestFile], set[Snapshot]]: + """Return newly added manifests and snapshot IDs between the starting snapshot ID and parent snapshot + + Args: + table: Table to get the history from + starting_snapshot_id: ID of the starting snapshot + matching_operations: Operations to match on + manifest_content: Manifest content type to filter + parent: Parent snapshot to get the history from + + Raises: + ValidationException: If no matching snapshot is found or only one snapshot is found + + Returns: + List of manifest files and set of snapshots matching conditions + """ + manifests_files: list[ManifestFile] = [] + snapshots: set[Snapshot] = set() + + last_snapshot = None + for snapshot in ancestors_between(starting_snapshot_id, parent.snapshot_id, table.metadata): + last_snapshot = snapshot + if snapshot.operation in matching_operations: + snapshots.add(snapshot) + if manifest_content == ManifestContent.DATA: + manifests_files.extend([manifest for manifest in snapshot.data_manifests(table.io) if manifest.added_snapshot_id == snapshot.snapshot_id]) + else: + manifests_files.extend([manifest for manifest in snapshot.delete_manifests(table.io) if manifest.added_snapshot_id == snapshot.snapshot_id]) + + if last_snapshot is None or last_snapshot.snapshot_id == starting_snapshot_id: + raise ValidationException("No matching snapshot found.") + + return manifests_files, snapshots + From c36972063071f085f8e6fd3326e1120864a28a9f Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 16:00:44 -0400 Subject: [PATCH 02/24] format --- pyiceberg/table/snapshots.py | 2 +- pyiceberg/table/update/validate.py | 58 +++++++++++++++++++++--------- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index d6b8a5a7db..59753b284a 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -440,7 +440,7 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta def ancestors_between( - current_snapshot: Optional[Snapshot], oldest_snapshot: Optional[Snapshot], table_metadata: TableMetadata + current_snapshot: Optional[Snapshot], oldest_snapshot: Snapshot, table_metadata: TableMetadata ) -> Iterable[Snapshot]: """Get the ancestors of and including the given snapshot between the latest and oldest snapshot.""" for snapshot in ancestors_of(current_snapshot, table_metadata): diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 61343831b5..ae342ea492 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -1,28 +1,43 @@ -class ValidationException(Exception): - """Raised when validation fails.""" - +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from pyiceberg.manifest import ManifestContent, ManifestFile +from pyiceberg.table import Table +from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between -from pyiceberg.table import Table -from pyiceberg.table.snapshots import Snapshot, Operation, ancestors_between -from pyiceberg.manifest import ManifestFile, ManifestContent +class ValidationException(Exception): + """Raised when validation fails.""" def validation_history( table: Table, - starting_snapshot_id: int, + starting_snapshot: Snapshot, matching_operations: set[Operation], manifest_content: ManifestContent, - parent: Snapshot, + parent_snapshot: Snapshot, ) -> tuple[list[ManifestFile], set[Snapshot]]: - """Return newly added manifests and snapshot IDs between the starting snapshot ID and parent snapshot + """Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot. Args: table: Table to get the history from - starting_snapshot_id: ID of the starting snapshot + starting_snapshot: Starting snapshot matching_operations: Operations to match on manifest_content: Manifest content type to filter - parent: Parent snapshot to get the history from + parent_snapshot: Parent snapshot to get the history from Raises: ValidationException: If no matching snapshot is found or only one snapshot is found @@ -34,17 +49,28 @@ def validation_history( snapshots: set[Snapshot] = set() last_snapshot = None - for snapshot in ancestors_between(starting_snapshot_id, parent.snapshot_id, table.metadata): + for snapshot in ancestors_between(starting_snapshot, parent_snapshot, table.metadata): last_snapshot = snapshot if snapshot.operation in matching_operations: snapshots.add(snapshot) if manifest_content == ManifestContent.DATA: - manifests_files.extend([manifest for manifest in snapshot.data_manifests(table.io) if manifest.added_snapshot_id == snapshot.snapshot_id]) + manifests_files.extend( + [ + manifest + for manifest in snapshot.data_manifests(table.io) + if manifest.added_snapshot_id == snapshot.snapshot_id + ] + ) else: - manifests_files.extend([manifest for manifest in snapshot.delete_manifests(table.io) if manifest.added_snapshot_id == snapshot.snapshot_id]) + manifests_files.extend( + [ + manifest + for manifest in snapshot.delete_manifests(table.io) + if manifest.added_snapshot_id == snapshot.snapshot_id + ] + ) - if last_snapshot is None or last_snapshot.snapshot_id == starting_snapshot_id: + if last_snapshot is None or last_snapshot.snapshot_id == starting_snapshot.snapshot_id: raise ValidationException("No matching snapshot found.") return manifests_files, snapshots - From 41bb8a475d55d58b822f1ac46cb6bf41d40888f5 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 16:32:32 -0400 Subject: [PATCH 03/24] almost a working test --- pyiceberg/table/update/validate.py | 5 ++- tests/table/test_init.py | 39 ------------------ tests/table/test_snapshots.py | 65 +++++++++++++++++++++++++++++- tests/table/test_validate.py | 33 +++++++++++++++ 4 files changed, 101 insertions(+), 41 deletions(-) create mode 100644 tests/table/test_validate.py diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index ae342ea492..2919885d33 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -51,7 +51,10 @@ def validation_history( last_snapshot = None for snapshot in ancestors_between(starting_snapshot, parent_snapshot, table.metadata): last_snapshot = snapshot - if snapshot.operation in matching_operations: + summary = snapshot.summary + if summary is None: + continue + if summary.operation in matching_operations: snapshots.add(snapshot) if manifest_content == ManifestContent.DATA: manifests_files.extend( diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 69bbab527e..c5ccd56ce2 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -57,7 +57,6 @@ Snapshot, SnapshotLogEntry, Summary, - ancestors_of, ) from pyiceberg.table.sorting import ( NullOrder, @@ -225,44 +224,6 @@ def test_snapshot_by_timestamp(table_v2: Table) -> None: assert table_v2.snapshot_as_of_timestamp(1515100955770, inclusive=False) is None -def test_ancestors_of(table_v2: Table) -> None: - assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [ - Snapshot( - snapshot_id=3055729675574597004, - parent_snapshot_id=3051729675574597004, - sequence_number=1, - timestamp_ms=1555100955770, - manifest_list="s3://a/b/2.avro", - summary=Summary(Operation.APPEND), - schema_id=1, - ), - Snapshot( - snapshot_id=3051729675574597004, - parent_snapshot_id=None, - sequence_number=0, - timestamp_ms=1515100955770, - manifest_list="s3://a/b/1.avro", - summary=Summary(Operation.APPEND), - schema_id=None, - ), - ] - - -def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: Table) -> None: - # Test RecursionError: maximum recursion depth exceeded - assert ( - len( - list( - ancestors_of( - table_v2_with_extensive_snapshots.current_snapshot(), - table_v2_with_extensive_snapshots.metadata, - ) - ) - ) - == 2000 - ) - - def test_snapshot_by_id_does_not_exist(table_v2: Table) -> None: assert table_v2.snapshot_by_id(-1) is None diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index b4dde217d4..f5ebfe0659 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -20,7 +20,16 @@ from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, update_snapshot_summaries +from pyiceberg.table import Table +from pyiceberg.table.snapshots import ( + Operation, + Snapshot, + SnapshotSummaryCollector, + Summary, + ancestors_between, + ancestors_of, + update_snapshot_summaries, +) from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import Record from pyiceberg.types import ( @@ -341,3 +350,57 @@ def test_invalid_type() -> None: ) assert "Could not parse summary property total-data-files to an int: abc" in str(e.value) + + +def test_ancestors_of(table_v2: Table) -> None: + assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [ + Snapshot( + snapshot_id=3055729675574597004, + parent_snapshot_id=3051729675574597004, + sequence_number=1, + timestamp_ms=1555100955770, + manifest_list="s3://a/b/2.avro", + summary=Summary(Operation.APPEND), + schema_id=1, + ), + Snapshot( + snapshot_id=3051729675574597004, + parent_snapshot_id=None, + sequence_number=0, + timestamp_ms=1515100955770, + manifest_list="s3://a/b/1.avro", + summary=Summary(Operation.APPEND), + schema_id=None, + ), + ] + + +def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: Table) -> None: + # Test RecursionError: maximum recursion depth exceeded + assert ( + len( + list( + ancestors_of( + table_v2_with_extensive_snapshots.current_snapshot(), + table_v2_with_extensive_snapshots.metadata, + ) + ) + ) + == 2000 + ) + + +def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: + oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] + assert ( + len( + list( + ancestors_between( + table_v2_with_extensive_snapshots.current_snapshot(), + oldest_snapshot, + table_v2_with_extensive_snapshots.metadata, + ) + ) + ) + == 1999 + ) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py new file mode 100644 index 0000000000..9394696ea2 --- /dev/null +++ b/tests/table/test_validate.py @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name,eval-used +from typing import cast + +from pyiceberg.manifest import ManifestContent +from pyiceberg.table import Table +from pyiceberg.table.snapshots import Operation, Snapshot +from pyiceberg.table.update.validate import validation_history + + +def test_validation_history(table_v2_with_extensive_snapshots: Table) -> None: + """Test the validation history function.""" + oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] + newest_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) + manifests, snapshots = validation_history( + table_v2_with_extensive_snapshots, newest_snapshot, {Operation.APPEND}, ManifestContent.DATA, oldest_snapshot + ) + assert len(snapshots) == 2 From 763e9f4f10e1ffb38fe75ace0b028c83f2d6faa3 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 16:55:11 -0400 Subject: [PATCH 04/24] allow content_filter in snapshot.manifests --- pyiceberg/table/snapshots.py | 23 ++++++++++++----------- pyiceberg/table/update/validate.py | 27 +++++++++------------------ 2 files changed, 21 insertions(+), 29 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 59753b284a..dd7be3650b 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -251,17 +251,18 @@ def __str__(self) -> str: result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}" return result_str - def manifests(self, io: FileIO) -> List[ManifestFile]: - """Return the manifests for the given snapshot.""" - return list(_manifests(io, self.manifest_list)) - - def data_manifests(self, io: FileIO) -> List[ManifestFile]: - """Return the data manifests for the given snapshot.""" - return [manifest for manifest in self.manifests(io) if manifest.content == ManifestContent.DATA] - - def delete_manifests(self, io: FileIO) -> List[ManifestFile]: - """Return the delete manifests for the given snapshot.""" - return [manifest for manifest in self.manifests(io) if manifest.content == ManifestContent.DELETES] + def manifests(self, io: FileIO, content_filter: Optional[ManifestContent] = None) -> List[ManifestFile]: + """Return the manifests for the given snapshot. + + Args: + io: The IO instance to read the manifest list. + content_filter: The content filter to apply to the manifests. One of ManifestContent.DATA or ManifestContent.DELETES. + """ + all_manifests = list(_manifests(io, self.manifest_list)) + if content_filter is not None: + all_manifests = [manifest for manifest in all_manifests if manifest.content == content_filter] + + return all_manifests class MetadataLogEntry(IcebergBaseModel): diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 2919885d33..fc366ad81a 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -27,7 +27,7 @@ def validation_history( table: Table, starting_snapshot: Snapshot, matching_operations: set[Operation], - manifest_content: ManifestContent, + manifest_content_filter: ManifestContent, parent_snapshot: Snapshot, ) -> tuple[list[ManifestFile], set[Snapshot]]: """Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot. @@ -36,7 +36,7 @@ def validation_history( table: Table to get the history from starting_snapshot: Starting snapshot matching_operations: Operations to match on - manifest_content: Manifest content type to filter + manifest_content_filter: Manifest content type to filter parent_snapshot: Parent snapshot to get the history from Raises: @@ -56,22 +56,13 @@ def validation_history( continue if summary.operation in matching_operations: snapshots.add(snapshot) - if manifest_content == ManifestContent.DATA: - manifests_files.extend( - [ - manifest - for manifest in snapshot.data_manifests(table.io) - if manifest.added_snapshot_id == snapshot.snapshot_id - ] - ) - else: - manifests_files.extend( - [ - manifest - for manifest in snapshot.delete_manifests(table.io) - if manifest.added_snapshot_id == snapshot.snapshot_id - ] - ) + manifests_files.extend( + [ + manifest + for manifest in snapshot.manifests(table.io, manifest_content_filter) + if manifest.added_snapshot_id == snapshot.snapshot_id + ] + ) if last_snapshot is None or last_snapshot.snapshot_id == starting_snapshot.snapshot_id: raise ValidationException("No matching snapshot found.") From f200beb00b47a57a1c1f04c67f067f5156613471 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 16:59:03 -0400 Subject: [PATCH 05/24] simplify order of arguments to validation_history --- pyiceberg/table/update/validate.py | 4 ++-- tests/table/test_validate.py | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index fc366ad81a..0f8798f80e 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -26,18 +26,18 @@ class ValidationException(Exception): def validation_history( table: Table, starting_snapshot: Snapshot, + parent_snapshot: Snapshot, matching_operations: set[Operation], manifest_content_filter: ManifestContent, - parent_snapshot: Snapshot, ) -> tuple[list[ManifestFile], set[Snapshot]]: """Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot. Args: table: Table to get the history from starting_snapshot: Starting snapshot + parent_snapshot: Parent snapshot to get the history from matching_operations: Operations to match on manifest_content_filter: Manifest content type to filter - parent_snapshot: Parent snapshot to get the history from Raises: ValidationException: If no matching snapshot is found or only one snapshot is found diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 9394696ea2..e2f052b304 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -28,6 +28,10 @@ def test_validation_history(table_v2_with_extensive_snapshots: Table) -> None: oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] newest_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) manifests, snapshots = validation_history( - table_v2_with_extensive_snapshots, newest_snapshot, {Operation.APPEND}, ManifestContent.DATA, oldest_snapshot + table_v2_with_extensive_snapshots, + newest_snapshot, + oldest_snapshot, + {Operation.APPEND}, + ManifestContent.DATA, ) assert len(snapshots) == 2 From 7f6bf9db90a2e8479524f8b00e6d8d5c6feaf47c Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 17:07:02 -0400 Subject: [PATCH 06/24] simplify return in snapshot.manifests --- pyiceberg/table/snapshots.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index dd7be3650b..83fe5895c2 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -258,11 +258,11 @@ def manifests(self, io: FileIO, content_filter: Optional[ManifestContent] = None io: The IO instance to read the manifest list. content_filter: The content filter to apply to the manifests. One of ManifestContent.DATA or ManifestContent.DELETES. """ - all_manifests = list(_manifests(io, self.manifest_list)) - if content_filter is not None: - all_manifests = [manifest for manifest in all_manifests if manifest.content == content_filter] - - return all_manifests + return [ + manifest + for manifest in _manifests(io, self.manifest_list) + if content_filter is None or manifest.content == content_filter + ] class MetadataLogEntry(IcebergBaseModel): From c63cc5540f0d7e4bc1b9a469ab254d6f2dba335c Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 21:17:46 -0400 Subject: [PATCH 07/24] tests passing --- pyiceberg/table/snapshots.py | 17 +++-------- pyiceberg/table/update/validate.py | 4 +-- tests/table/test_validate.py | 48 ++++++++++++++++++++++++------ 3 files changed, 45 insertions(+), 24 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 83fe5895c2..fa3d09ea7c 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -25,7 +25,7 @@ from pydantic import Field, PrivateAttr, model_serializer from pyiceberg.io import FileIO -from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, _manifests +from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema @@ -251,18 +251,9 @@ def __str__(self) -> str: result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}" return result_str - def manifests(self, io: FileIO, content_filter: Optional[ManifestContent] = None) -> List[ManifestFile]: - """Return the manifests for the given snapshot. - - Args: - io: The IO instance to read the manifest list. - content_filter: The content filter to apply to the manifests. One of ManifestContent.DATA or ManifestContent.DELETES. - """ - return [ - manifest - for manifest in _manifests(io, self.manifest_list) - if content_filter is None or manifest.content == content_filter - ] + def manifests(self, io: FileIO) -> List[ManifestFile]: + """Return the manifests for the given snapshot.""" + return list(_manifests(io, self.manifest_list)) class MetadataLogEntry(IcebergBaseModel): diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 0f8798f80e..38c880bfae 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -59,8 +59,8 @@ def validation_history( manifests_files.extend( [ manifest - for manifest in snapshot.manifests(table.io, manifest_content_filter) - if manifest.added_snapshot_id == snapshot.snapshot_id + for manifest in snapshot.manifests(table.io) + if manifest.added_snapshot_id == snapshot.snapshot_id and manifest.content == manifest_content_filter ] ) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index e2f052b304..9bb4e98cfa 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -16,8 +16,10 @@ # under the License. # pylint:disable=redefined-outer-name,eval-used from typing import cast +from unittest.mock import patch -from pyiceberg.manifest import ManifestContent +from pyiceberg.io import FileIO +from pyiceberg.manifest import ManifestContent, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot from pyiceberg.table.update.validate import validation_history @@ -25,13 +27,41 @@ def test_validation_history(table_v2_with_extensive_snapshots: Table) -> None: """Test the validation history function.""" + mock_manifests = {} + + for i, snapshot in enumerate(table_v2_with_extensive_snapshots.snapshots()): + mock_manifest = ManifestFile( + manifest_path=f"foo/bar/{i}", + manifest_length=1, + partition_spec_id=1, + content=ManifestContent.DATA if i % 2 == 0 else ManifestContent.DELETES, + sequence_number=1, + min_sequence_number=1, + added_snapshot_id=snapshot.snapshot_id, + ) + + # Store the manifest for this specific snapshot + mock_manifests[snapshot.snapshot_id] = [mock_manifest] + + expected_manifest_data_counts = len([m for m in mock_manifests.values() if m[0].content == ManifestContent.DATA]) - 1 + oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] newest_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) - manifests, snapshots = validation_history( - table_v2_with_extensive_snapshots, - newest_snapshot, - oldest_snapshot, - {Operation.APPEND}, - ManifestContent.DATA, - ) - assert len(snapshots) == 2 + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect): + manifests, snapshots = validation_history( + table_v2_with_extensive_snapshots, + newest_snapshot, + oldest_snapshot, + {Operation.APPEND}, + ManifestContent.DATA, + ) + + assert len(manifests) == expected_manifest_data_counts From f2f3a885ce20a22d08d77f122b125f522ac0f915 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 10:25:25 -0400 Subject: [PATCH 08/24] correct ancestors_between --- pyiceberg/table/snapshots.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index fa3d09ea7c..283f140957 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -432,10 +432,14 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta def ancestors_between( - current_snapshot: Optional[Snapshot], oldest_snapshot: Snapshot, table_metadata: TableMetadata + current_snapshot: Snapshot, oldest_snapshot: Optional[Snapshot], table_metadata: TableMetadata ) -> Iterable[Snapshot]: """Get the ancestors of and including the given snapshot between the latest and oldest snapshot.""" - for snapshot in ancestors_of(current_snapshot, table_metadata): - if snapshot.snapshot_id == oldest_snapshot.snapshot_id: - break - yield snapshot + if oldest_snapshot is not None: + for snapshot in ancestors_of(current_snapshot, table_metadata): + if snapshot.snapshot_id == oldest_snapshot.snapshot_id: + break + yield snapshot + else: + for snapshot in ancestors_of(current_snapshot, table_metadata): + yield snapshot From 74d5569bf73f4a8399aafc2a9fa0c25412040bc5 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 10:50:44 -0400 Subject: [PATCH 09/24] fix to/from logic and allow optional `to_snapshot` arg in `validation_history` --- pyiceberg/table/snapshots.py | 11 +++++------ pyiceberg/table/update/validate.py | 14 ++++++++------ tests/table/test_snapshots.py | 5 ++++- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 283f140957..472d421bec 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -432,14 +432,13 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta def ancestors_between( - current_snapshot: Snapshot, oldest_snapshot: Optional[Snapshot], table_metadata: TableMetadata + to_snapshot: Snapshot, from_snapshot: Optional[Snapshot], table_metadata: TableMetadata ) -> Iterable[Snapshot]: """Get the ancestors of and including the given snapshot between the latest and oldest snapshot.""" - if oldest_snapshot is not None: - for snapshot in ancestors_of(current_snapshot, table_metadata): - if snapshot.snapshot_id == oldest_snapshot.snapshot_id: + if from_snapshot is not None: + for snapshot in ancestors_of(to_snapshot, table_metadata): + if snapshot == from_snapshot: break yield snapshot else: - for snapshot in ancestors_of(current_snapshot, table_metadata): - yield snapshot + yield from ancestors_of(to_snapshot, table_metadata) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 38c880bfae..ad7da9d20c 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -14,6 +14,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from typing import Optional + from pyiceberg.manifest import ManifestContent, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between @@ -25,8 +27,8 @@ class ValidationException(Exception): def validation_history( table: Table, - starting_snapshot: Snapshot, - parent_snapshot: Snapshot, + from_snapshot: Snapshot, + to_snapshot: Optional[Snapshot], matching_operations: set[Operation], manifest_content_filter: ManifestContent, ) -> tuple[list[ManifestFile], set[Snapshot]]: @@ -34,8 +36,8 @@ def validation_history( Args: table: Table to get the history from - starting_snapshot: Starting snapshot - parent_snapshot: Parent snapshot to get the history from + from_snapshot: Parent snapshot to get the history from + to_snapshot: Starting snapshot matching_operations: Operations to match on manifest_content_filter: Manifest content type to filter @@ -49,7 +51,7 @@ def validation_history( snapshots: set[Snapshot] = set() last_snapshot = None - for snapshot in ancestors_between(starting_snapshot, parent_snapshot, table.metadata): + for snapshot in ancestors_between(from_snapshot, to_snapshot, table.metadata): last_snapshot = snapshot summary = snapshot.summary if summary is None: @@ -64,7 +66,7 @@ def validation_history( ] ) - if last_snapshot is None or last_snapshot.snapshot_id == starting_snapshot.snapshot_id: + if last_snapshot is None or last_snapshot.snapshot_id == from_snapshot.snapshot_id: raise ValidationException("No matching snapshot found.") return manifests_files, snapshots diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index f5ebfe0659..cb2c317c7a 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name,eval-used +from typing import cast + import pytest from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile @@ -392,11 +394,12 @@ def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: Table) def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] + current_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) assert ( len( list( ancestors_between( - table_v2_with_extensive_snapshots.current_snapshot(), + current_snapshot, oldest_snapshot, table_v2_with_extensive_snapshots.metadata, ) From 167f9e44fdb4f50d1dddfd59384b090624521afa Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 10:57:56 -0400 Subject: [PATCH 10/24] remove a level of nesting with smarter clause --- pyiceberg/table/update/validate.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index ad7da9d20c..3b4c504232 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -54,17 +54,17 @@ def validation_history( for snapshot in ancestors_between(from_snapshot, to_snapshot, table.metadata): last_snapshot = snapshot summary = snapshot.summary - if summary is None: + if summary is None or summary.matching_operations not in matching_operations: continue - if summary.operation in matching_operations: - snapshots.add(snapshot) - manifests_files.extend( - [ - manifest - for manifest in snapshot.manifests(table.io) - if manifest.added_snapshot_id == snapshot.snapshot_id and manifest.content == manifest_content_filter - ] - ) + + snapshots.add(snapshot) + manifests_files.extend( + [ + manifest + for manifest in snapshot.manifests(table.io) + if manifest.added_snapshot_id == snapshot.snapshot_id and manifest.content == manifest_content_filter + ] + ) if last_snapshot is None or last_snapshot.snapshot_id == from_snapshot.snapshot_id: raise ValidationException("No matching snapshot found.") From efe50b48c4a64024fd79ff1e7468d768d104d307 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 10:58:44 -0400 Subject: [PATCH 11/24] fix bad accessor --- pyiceberg/table/update/validate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 3b4c504232..2606c2b03e 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -54,7 +54,7 @@ def validation_history( for snapshot in ancestors_between(from_snapshot, to_snapshot, table.metadata): last_snapshot = snapshot summary = snapshot.summary - if summary is None or summary.matching_operations not in matching_operations: + if summary is None or summary.operation not in matching_operations: continue snapshots.add(snapshot) From 0793713359b9659d0c140fe0b847673d29f21ea7 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 10:59:34 -0400 Subject: [PATCH 12/24] fix docstring --- pyiceberg/table/snapshots.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 472d421bec..afcf8c8b43 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -434,7 +434,7 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta def ancestors_between( to_snapshot: Snapshot, from_snapshot: Optional[Snapshot], table_metadata: TableMetadata ) -> Iterable[Snapshot]: - """Get the ancestors of and including the given snapshot between the latest and oldest snapshot.""" + """Get the ancestors of and including the given snapshot between the to and from snapshots.""" if from_snapshot is not None: for snapshot in ancestors_of(to_snapshot, table_metadata): if snapshot == from_snapshot: From 89120aacc00699b8ebfa7eb3c498477f3a39e393 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 15:39:13 -0400 Subject: [PATCH 13/24] [wip] feat: `validate_deleted_data_files` --- pyiceberg/table/update/validate.py | 82 +++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 2606c2b03e..0764f6c2b3 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -14,11 +14,16 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import Optional +from typing import Iterator, Optional -from pyiceberg.manifest import ManifestContent, ManifestFile +from pyiceberg.expressions import BooleanExpression +from pyiceberg.expressions.visitors import _StrictMetricsEvaluator +from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between +from pyiceberg.typedef import Record + +VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} class ValidationException(Exception): @@ -70,3 +75,76 @@ def validation_history( raise ValidationException("No matching snapshot found.") return manifests_files, snapshots + + +def deleted_data_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: Optional[BooleanExpression], + parent_snapshot: Optional[Snapshot], + partition_set: Optional[set[Record]], +) -> Iterator[ManifestEntry]: + """Find deleted data files matching a filter since a starting snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find deleted data files + partition_set: a set of partitions to find deleted data files + parent_snapshot: Ending snapshot on the branch being validated + + Returns: + List of deleted data files matching the filter + """ + # if there is no current table state, no files have been deleted + if parent_snapshot is None: + return + + manifests, new_snapshots = validation_history( + table, + starting_snapshot, + parent_snapshot, + VALIDATE_DATA_FILES_EXIST_OPERATIONS, + ManifestContent.DATA, + ) + + if data_filter is not None: + evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval + + new_snapshot_ids = {s.snapshot_id for s in new_snapshots} + + for manifest in manifests: + for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): + if entry.snapshot_id not in new_snapshot_ids: + continue + + if entry.status != ManifestEntryStatus.DELETED: + continue + + if data_filter is not None and not evaluator(entry.data_file): + continue + + if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set: + continue + + yield entry + + +def validate_deleted_data_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: Optional[BooleanExpression], + parent_snapshot: Snapshot, +) -> None: + """Validate that no files matching a filter have been deleted from the table since a starting snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find deleted data files + parent_snapshot: Ending snapshot on the branch being validated + + """ + conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) + if any(conflicting_entries): + raise ValidationException("Deleted data files were found matching the filter.") From a39abd2d8ce6c817bf575770ad9b547b665d7b58 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 16:28:26 -0400 Subject: [PATCH 14/24] first dummy test case --- tests/table/test_validate.py | 46 +++++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 9bb4e98cfa..89b9ab103a 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -22,7 +22,7 @@ from pyiceberg.manifest import ManifestContent, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot -from pyiceberg.table.update.validate import validation_history +from pyiceberg.table.update.validate import deleted_data_files, validation_history def test_validation_history(table_v2_with_extensive_snapshots: Table) -> None: @@ -65,3 +65,47 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF ) assert len(manifests) == expected_manifest_data_counts + + +def test_deleted_data_files( + table_v2_with_extensive_snapshots: Table, +) -> None: + mock_manifests = {} + + for i, snapshot in enumerate(table_v2_with_extensive_snapshots.snapshots()): + mock_manifest = ManifestFile( + manifest_path=f"foo/bar/{i}", + manifest_length=1, + partition_spec_id=1, + content=ManifestContent.DATA if i % 2 == 0 else ManifestContent.DELETES, + sequence_number=1, + min_sequence_number=1, + added_snapshot_id=snapshot.snapshot_id, + ) + + # Store the manifest for this specific snapshot + mock_manifests[snapshot.snapshot_id] = [mock_manifest] + + oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] + newest_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + # every snapshot is an append, so we should get nothing! + with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect): + result = list( + deleted_data_files( + table=table_v2_with_extensive_snapshots, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + ) + + assert result == [] From cf5b06131752607a6a632a188ec2124893206add Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 16:48:03 -0400 Subject: [PATCH 15/24] first working tests (needs cleanup) --- tests/table/test_validate.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 89b9ab103a..2ea94c2119 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -19,9 +19,9 @@ from unittest.mock import patch from pyiceberg.io import FileIO -from pyiceberg.manifest import ManifestContent, ManifestFile +from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table -from pyiceberg.table.snapshots import Operation, Snapshot +from pyiceberg.table.snapshots import Operation, Snapshot, Summary from pyiceberg.table.update.validate import deleted_data_files, validation_history @@ -109,3 +109,34 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF ) assert result == [] + + # modify second to last snapshot to be a delete + snapshots = table_v2_with_extensive_snapshots.snapshots() + altered_snapshot = snapshots[-2] + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=Operation.DELETE)}) + snapshots[-2] = altered_snapshot + + table_v2_with_extensive_snapshots.metadata = table_v2_with_extensive_snapshots.metadata.model_copy( + update={"snapshots": snapshots}, + ) + + my_entry = ManifestEntry( + status=ManifestEntryStatus.DELETED, + snapshot_id=altered_snapshot.snapshot_id, + ) + + with ( + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", return_value=[my_entry]), + ): + result = list( + deleted_data_files( + table=table_v2_with_extensive_snapshots, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + ) + + assert result == [my_entry] From caf7e36b0cc011db684e1828159373aee47bf457 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Thu, 1 May 2025 09:11:37 -0400 Subject: [PATCH 16/24] bring back all code from silly merge --- pyiceberg/table/update/validate.py | 79 +++++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 7caaf1d521..9e6099e509 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -14,11 +14,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from typing import Iterator, Optional from pyiceberg.exceptions import ValidationException -from pyiceberg.manifest import ManifestContent, ManifestFile +from pyiceberg.expressions import BooleanExpression +from pyiceberg.expressions.visitors import _StrictMetricsEvaluator +from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between +from pyiceberg.typedef import Record + +VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} def validation_history( @@ -69,3 +75,74 @@ def validation_history( raise ValidationException("No matching snapshot found.") return manifests_files, snapshots + + +def deleted_data_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: Optional[BooleanExpression], + parent_snapshot: Optional[Snapshot], + partition_set: Optional[set[Record]], +) -> Iterator[ManifestEntry]: + """Find deleted data files matching a filter since a starting snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find deleted data files + partition_set: a set of partitions to find deleted data files + parent_snapshot: Ending snapshot on the branch being validated + + Returns: + List of deleted data files matching the filter + """ + # if there is no current table state, no files have been deleted + if parent_snapshot is None: + return + + manifests, snapshot_ids = validation_history( + table, + starting_snapshot, + parent_snapshot, + VALIDATE_DATA_FILES_EXIST_OPERATIONS, + ManifestContent.DATA, + ) + + if data_filter is not None: + evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval + + for manifest in manifests: + for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): + if entry.snapshot_id not in snapshot_ids: + continue + + if entry.status != ManifestEntryStatus.DELETED: + continue + + if data_filter is not None and not evaluator(entry.data_file): + continue + + if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set: + continue + + yield entry + + +def validate_deleted_data_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: Optional[BooleanExpression], + parent_snapshot: Snapshot, +) -> None: + """Validate that no files matching a filter have been deleted from the table since a starting snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find deleted data files + parent_snapshot: Ending snapshot on the branch being validated + + """ + conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) + if any(conflicting_entries): + raise ValidationException("Deleted data files were found matching the filter.") From a52c422d042c97eb3d840ee8d6be14dff28c8cfe Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Thu, 1 May 2025 09:15:13 -0400 Subject: [PATCH 17/24] last tweaks --- tests/table/test_validate.py | 67 ++++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 3 deletions(-) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index eac3733f2d..ddbf566c79 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -22,10 +22,10 @@ from pyiceberg.exceptions import ValidationException from pyiceberg.io import FileIO -from pyiceberg.manifest import ManifestContent, ManifestFile +from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table -from pyiceberg.table.snapshots import Operation, Snapshot -from pyiceberg.table.update.validate import validation_history +from pyiceberg.table.snapshots import Operation, Snapshot, Summary +from pyiceberg.table.update.validate import deleted_data_files, validation_history @pytest.fixture @@ -136,3 +136,64 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF {Operation.APPEND}, ManifestContent.DATA, ) + + +def test_deleted_data_files( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + # every snapshot is an append, so we should get nothing! + with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect): + result = list( + deleted_data_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + ) + + assert result == [] + + # modify second to last snapshot to be a delete + snapshots = table.snapshots() + altered_snapshot = snapshots[-2] + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=Operation.DELETE)}) + snapshots[-2] = altered_snapshot + + table.metadata = table.metadata.model_copy( + update={"snapshots": snapshots}, + ) + + my_entry = ManifestEntry.from_args( + status=ManifestEntryStatus.DELETED, + snapshot_id=altered_snapshot.snapshot_id, + ) + + with ( + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", return_value=[my_entry]), + ): + result = list( + deleted_data_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + ) + + assert result == [my_entry] From 9eca29f71f5281f88f336c31ddf62abd8369b3c6 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 10 May 2025 11:18:39 -0600 Subject: [PATCH 18/24] fix order of args in deleted_data_files --- pyiceberg/table/update/validate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index a15f093f89..9dc47e7bec 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -81,8 +81,8 @@ def deleted_data_files( table: Table, starting_snapshot: Snapshot, data_filter: Optional[BooleanExpression], - parent_snapshot: Optional[Snapshot], partition_set: Optional[set[Record]], + parent_snapshot: Optional[Snapshot], ) -> Iterator[ManifestEntry]: """Find deleted data files matching a filter since a starting snapshot. From fc83d42e1a768f4b0bae9ea822878740e17838e0 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 10 May 2025 11:23:55 -0600 Subject: [PATCH 19/24] make `deleted_data_files` private --- pyiceberg/table/update/validate.py | 4 ++-- tests/table/test_validate.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 9dc47e7bec..f2a694065c 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -77,7 +77,7 @@ def validation_history( return manifests_files, snapshots -def deleted_data_files( +def _deleted_data_files( table: Table, starting_snapshot: Snapshot, data_filter: Optional[BooleanExpression], @@ -143,6 +143,6 @@ def validate_deleted_data_files( parent_snapshot: Ending snapshot on the branch being validated """ - conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) + conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) if any(conflicting_entries): raise ValidationException("Deleted data files were found matching the filter.") diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 7956fab0f1..23bcc34c16 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -25,7 +25,7 @@ from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, Summary -from pyiceberg.table.update.validate import deleted_data_files, validation_history +from pyiceberg.table.update.validate import _deleted_data_files, validation_history @pytest.fixture @@ -156,7 +156,7 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF # every snapshot is an append, so we should get nothing! with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect): result = list( - deleted_data_files( + _deleted_data_files( table=table, starting_snapshot=newest_snapshot, data_filter=None, @@ -187,7 +187,7 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", return_value=[my_entry]), ): result = list( - deleted_data_files( + _deleted_data_files( table=table, starting_snapshot=newest_snapshot, data_filter=None, From ee3959ac65f46942c77333c1958c94422c78c55d Mon Sep 17 00:00:00 2001 From: Jayce Slesar <47452474+jayceslesar@users.noreply.github.com> Date: Sat, 10 May 2025 11:24:16 -0600 Subject: [PATCH 20/24] Update pyiceberg/table/update/validate.py Co-authored-by: Fokko Driesprong --- pyiceberg/table/update/validate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index f2a694065c..7ae44b6090 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -94,7 +94,7 @@ def _deleted_data_files( parent_snapshot: Ending snapshot on the branch being validated Returns: - List of deleted data files matching the filter + List of conflicting manifest-entries """ # if there is no current table state, no files have been deleted if parent_snapshot is None: From a51c2da3cbcc1fd9d295f7f748c713889ff82963 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 10 May 2025 11:28:56 -0600 Subject: [PATCH 21/24] ise inclusive metrics evaluator and add rows cannot match check --- pyiceberg/table/update/validate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 7ae44b6090..9efb3570a3 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -18,7 +18,7 @@ from pyiceberg.exceptions import ValidationException from pyiceberg.expressions import BooleanExpression -from pyiceberg.expressions.visitors import _StrictMetricsEvaluator +from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between @@ -109,7 +109,7 @@ def _deleted_data_files( ) if data_filter is not None: - evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval + evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval for manifest in manifests: for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): @@ -119,7 +119,7 @@ def _deleted_data_files( if entry.status != ManifestEntryStatus.DELETED: continue - if data_filter is not None and not evaluator(entry.data_file): + if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH: continue if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set: From ec866953c89fe1b53fe6a9aef15d1e1545478d7e Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 10 May 2025 11:29:58 -0600 Subject: [PATCH 22/24] show what snapshot IDs conflict --- pyiceberg/table/update/validate.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 9efb3570a3..d6072a3b43 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -145,4 +145,5 @@ def validate_deleted_data_files( """ conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) if any(conflicting_entries): - raise ValidationException("Deleted data files were found matching the filter.") + conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries} + raise ValidationException(f"Deleted data files were found matching the filter for snapshots {conflicting_snapshots}!") From 0a6b781da7f57e011e2e5353abea57c8b4992544 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 10 May 2025 11:47:59 -0600 Subject: [PATCH 23/24] maybe correct partition_spec impl? --- pyiceberg/table/update/validate.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index d6072a3b43..42a52e928c 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -81,7 +81,7 @@ def _deleted_data_files( table: Table, starting_snapshot: Snapshot, data_filter: Optional[BooleanExpression], - partition_set: Optional[set[Record]], + partition_set: Optional[dict[int, set[Record]]], parent_snapshot: Optional[Snapshot], ) -> Iterator[ManifestEntry]: """Find deleted data files matching a filter since a starting snapshot. @@ -90,7 +90,7 @@ def _deleted_data_files( table: Table to validate starting_snapshot: Snapshot current at the start of the operation data_filter: Expression used to find deleted data files - partition_set: a set of partitions to find deleted data files + partition_set: dict of {spec_id: set[partition]} to filter on parent_snapshot: Ending snapshot on the branch being validated Returns: @@ -122,8 +122,11 @@ def _deleted_data_files( if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH: continue - if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set: - continue + if partition_set is not None: + spec_id = entry.data_file.spec_id + partition = entry.data_file.partition + if spec_id not in partition_set or partition not in partition_set[spec_id]: + continue yield entry From a96da01dae922d75bbaacb55e10e1ff8644db69c Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 10 May 2025 11:51:06 -0600 Subject: [PATCH 24/24] fix ordering to match --- pyiceberg/table/update/validate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 42a52e928c..94d92999f7 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -102,8 +102,8 @@ def _deleted_data_files( manifests, snapshot_ids = validation_history( table, - starting_snapshot, parent_snapshot, + starting_snapshot, VALIDATE_DATA_FILES_EXIST_OPERATIONS, ManifestContent.DATA, )