-
Notifications
You must be signed in to change notification settings - Fork 275
feat: validate_deleted_data_files
#1938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
beff92b
c369720
41bb8a4
763e9f4
f200beb
7f6bf9d
c63cc55
f2f3a88
74d5569
167f9e4
efe50b4
0793713
89120aa
a39abd2
cf5b061
645e8df
caf7e36
a52c422
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -14,11 +14,17 @@ | |||||||||
# KIND, either express or implied. See the License for the | ||||||||||
# specific language governing permissions and limitations | ||||||||||
# under the License. | ||||||||||
from typing import Iterator, Optional | ||||||||||
|
||||||||||
from pyiceberg.exceptions import ValidationException | ||||||||||
from pyiceberg.manifest import ManifestContent, ManifestFile | ||||||||||
from pyiceberg.expressions import BooleanExpression | ||||||||||
from pyiceberg.expressions.visitors import _StrictMetricsEvaluator | ||||||||||
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile | ||||||||||
from pyiceberg.table import Table | ||||||||||
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between | ||||||||||
from pyiceberg.typedef import Record | ||||||||||
|
||||||||||
VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} | ||||||||||
|
||||||||||
|
||||||||||
def validation_history( | ||||||||||
|
@@ -69,3 +75,74 @@ def validation_history( | |||||||||
raise ValidationException("No matching snapshot found.") | ||||||||||
|
||||||||||
return manifests_files, snapshots | ||||||||||
|
||||||||||
|
||||||||||
def deleted_data_files( | ||||||||||
table: Table, | ||||||||||
starting_snapshot: Snapshot, | ||||||||||
data_filter: Optional[BooleanExpression], | ||||||||||
parent_snapshot: Optional[Snapshot], | ||||||||||
partition_set: Optional[set[Record]], | ||||||||||
Comment on lines
+84
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks out of order:
Suggested change
It looks like the function call in
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs some more work. Next to @sungwy's comment, we in the code below: (entry.data_file.spec_id, entry.data_file.partition) not in partition_set Where it checks if a tuple is in a This triggered me, because if you do: ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(pickup_timestamp);
-- and then
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(dropoff_timestamp); Both of the partitioning strategies will produce a |
||||||||||
) -> Iterator[ManifestEntry]: | ||||||||||
"""Find deleted data files matching a filter since a starting snapshot. | ||||||||||
|
||||||||||
Args: | ||||||||||
table: Table to validate | ||||||||||
starting_snapshot: Snapshot current at the start of the operation | ||||||||||
data_filter: Expression used to find deleted data files | ||||||||||
partition_set: a set of partitions to find deleted data files | ||||||||||
parent_snapshot: Ending snapshot on the branch being validated | ||||||||||
|
||||||||||
Returns: | ||||||||||
List of deleted data files matching the filter | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
""" | ||||||||||
# if there is no current table state, no files have been deleted | ||||||||||
if parent_snapshot is None: | ||||||||||
return | ||||||||||
|
||||||||||
manifests, snapshot_ids = validation_history( | ||||||||||
table, | ||||||||||
starting_snapshot, | ||||||||||
parent_snapshot, | ||||||||||
Comment on lines
+105
to
+106
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks out of order to, because
Suggested change
Do you think it would be better to update
|
||||||||||
VALIDATE_DATA_FILES_EXIST_OPERATIONS, | ||||||||||
ManifestContent.DATA, | ||||||||||
) | ||||||||||
|
||||||||||
if data_filter is not None: | ||||||||||
evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not too sure if this is correct, because Should we be using Summoning @Fokko for a second review There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I agree that this should be inclusive projection, since we want to know if there are any matches. Inclusive projection returns |
||||||||||
|
||||||||||
for manifest in manifests: | ||||||||||
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): | ||||||||||
if entry.snapshot_id not in snapshot_ids: | ||||||||||
continue | ||||||||||
|
||||||||||
if entry.status != ManifestEntryStatus.DELETED: | ||||||||||
continue | ||||||||||
|
||||||||||
if data_filter is not None and not evaluator(entry.data_file): | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe better to make it explicit:
Suggested change
|
||||||||||
continue | ||||||||||
|
||||||||||
if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set: | ||||||||||
continue | ||||||||||
|
||||||||||
yield entry | ||||||||||
|
||||||||||
|
||||||||||
def validate_deleted_data_files( | ||||||||||
table: Table, | ||||||||||
starting_snapshot: Snapshot, | ||||||||||
data_filter: Optional[BooleanExpression], | ||||||||||
parent_snapshot: Snapshot, | ||||||||||
) -> None: | ||||||||||
"""Validate that no files matching a filter have been deleted from the table since a starting snapshot. | ||||||||||
|
||||||||||
Args: | ||||||||||
table: Table to validate | ||||||||||
starting_snapshot: Snapshot current at the start of the operation | ||||||||||
data_filter: Expression used to find deleted data files | ||||||||||
parent_snapshot: Ending snapshot on the branch being validated | ||||||||||
|
||||||||||
""" | ||||||||||
conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) | ||||||||||
if any(conflicting_entries): | ||||||||||
raise ValidationException("Deleted data files were found matching the filter.") | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe nice to also add to the Exception which snapshot-id(s) are conflicting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we mark these as private? I don't think we want to expose this directly to the user, and marking it private makes it easier to change the signature later on (since we don't have to go through the deprecation cycle).