Skip to content

feat: validate_deleted_data_files #1938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion pyiceberg/table/update/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Iterator, Optional

from pyiceberg.exceptions import ValidationException
from pyiceberg.manifest import ManifestContent, ManifestFile
from pyiceberg.expressions import BooleanExpression
from pyiceberg.expressions.visitors import _StrictMetricsEvaluator
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
from pyiceberg.typedef import Record

VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}


def validation_history(
Expand Down Expand Up @@ -69,3 +75,74 @@ def validation_history(
raise ValidationException("No matching snapshot found.")

return manifests_files, snapshots


def deleted_data_files(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we mark these as private? I don't think we want to expose this directly to the user, and marking it private makes it easier to change the signature later on (since we don't have to go through the deprecation cycle).

Suggested change
def deleted_data_files(
def _deleted_data_files(

table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
parent_snapshot: Optional[Snapshot],
partition_set: Optional[set[Record]],
Comment on lines +84 to +85
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks out of order:

Suggested change
parent_snapshot: Optional[Snapshot],
partition_set: Optional[set[Record]],
partition_set: Optional[set[Record]],
parent_snapshot: Optional[Snapshot],

It looks like the function call in validate_deleted_data_files is assuming that parent_snapshot is the last argument

    conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs some more work. Next to @sungwy's comment, we in the code below:

(entry.data_file.spec_id, entry.data_file.partition) not in partition_set

Where it checks if a tuple is in a partition_set, but the partition_set only contains the Record according to the signature.

This triggered me, because if you do:

ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(pickup_timestamp);

-- and then
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(dropoff_timestamp);

Both of the partitioning strategies will produce a Record[int] because it will contain the number of days since epoch. But the meaning is completely different.

) -> Iterator[ManifestEntry]:
"""Find deleted data files matching a filter since a starting snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find deleted data files
partition_set: a set of partitions to find deleted data files
parent_snapshot: Ending snapshot on the branch being validated

Returns:
List of deleted data files matching the filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
List of deleted data files matching the filter
List of conflicting manifest-entries

"""
# if there is no current table state, no files have been deleted
if parent_snapshot is None:
return

manifests, snapshot_ids = validation_history(
table,
starting_snapshot,
parent_snapshot,
Comment on lines +105 to +106
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks out of order to, because validation_history has to_snapshot as the second argument, and from_snapshot as the third argument.

Suggested change
starting_snapshot,
parent_snapshot,
starting_snapshot,
parent_snapshot,

Do you think it would be better to update validation_history function to use the following function signature instead? I think it's a lot more expected to have from_snapshot then to_snapshot

def validation_history(
    table: Table,
    from_snapshot: Snapshot,
    to_snapshot: Snapshot,
    matching_operations: set[Operation],
    manifest_content_filter: ManifestContent,
)

VALIDATE_DATA_FILES_EXIST_OPERATIONS,
ManifestContent.DATA,
)

if data_filter is not None:
evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too sure if this is correct, because ManifestGroup.entries seems to be using inclusive projection.

Should we be using inclusive_projection here instead?

Summoning @Fokko for a second review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that this should be inclusive projection, since we want to know if there are any matches. Inclusive projection returns rows_might_match and rows_cannot_match. If they cannot be matched, then we can skip it :)


for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
if entry.snapshot_id not in snapshot_ids:
continue

if entry.status != ManifestEntryStatus.DELETED:
continue

if data_filter is not None and not evaluator(entry.data_file):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to make it explicit:

Suggested change
if data_filter is not None and not evaluator(entry.data_file):
if data_filter is not None and not evaluator(entry.data_file) is ROWS_CANNOT_MATCH:

continue

if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set:
continue

yield entry


def validate_deleted_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
parent_snapshot: Snapshot,
) -> None:
"""Validate that no files matching a filter have been deleted from the table since a starting snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find deleted data files
parent_snapshot: Ending snapshot on the branch being validated

"""
conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
if any(conflicting_entries):
raise ValidationException("Deleted data files were found matching the filter.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe nice to also add to the Exception which snapshot-id(s) are conflicting

67 changes: 64 additions & 3 deletions tests/table/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

from pyiceberg.exceptions import ValidationException
from pyiceberg.io import FileIO
from pyiceberg.manifest import ManifestContent, ManifestFile
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot
from pyiceberg.table.update.validate import validation_history
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.table.update.validate import deleted_data_files, validation_history


@pytest.fixture
Expand Down Expand Up @@ -136,3 +136,64 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF
{Operation.APPEND},
ManifestContent.DATA,
)


def test_deleted_data_files(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
) -> None:
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests

oldest_snapshot = table.snapshots()[0]
newest_snapshot = cast(Snapshot, table.current_snapshot())

def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
"""Mock the manifests method to use the snapshot_id for lookup."""
snapshot_id = self.snapshot_id
if snapshot_id in mock_manifests:
return mock_manifests[snapshot_id]
return []

# every snapshot is an append, so we should get nothing!
with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect):
result = list(
deleted_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)
)

assert result == []

# modify second to last snapshot to be a delete
snapshots = table.snapshots()
altered_snapshot = snapshots[-2]
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=Operation.DELETE)})
snapshots[-2] = altered_snapshot

table.metadata = table.metadata.model_copy(
update={"snapshots": snapshots},
)

my_entry = ManifestEntry.from_args(
status=ManifestEntryStatus.DELETED,
snapshot_id=altered_snapshot.snapshot_id,
)

with (
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", return_value=[my_entry]),
):
result = list(
deleted_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)
)

assert result == [my_entry]