-
Notifications
You must be signed in to change notification settings - Fork 275
Rewrite manifests #1661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Rewrite manifests #1661
Changes from all commits
23488e5
3915bf5
b434828
bf33716
76740df
d705c76
bf68848
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -112,11 +112,7 @@ | |
update_table_metadata, | ||
) | ||
from pyiceberg.table.update.schema import UpdateSchema | ||
from pyiceberg.table.update.snapshot import ( | ||
ManageSnapshots, | ||
UpdateSnapshot, | ||
_FastAppendFiles, | ||
) | ||
from pyiceberg.table.update.snapshot import ManageSnapshots, RewriteManifestsResult, UpdateSnapshot, _FastAppendFiles | ||
from pyiceberg.table.update.spec import UpdateSpec | ||
from pyiceberg.table.update.statistics import UpdateStatistics | ||
from pyiceberg.transforms import IdentityTransform | ||
|
@@ -223,6 +219,9 @@ class TableProperties: | |
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep" | ||
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1 | ||
|
||
SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled" | ||
SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = False | ||
Comment on lines
+222
to
+223
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These seem unused, and I don't think we want to add this to PyIceberg. This was introduced in Java to test V2 features on a V1 table. |
||
|
||
|
||
class Transaction: | ||
_table: Table | ||
|
@@ -421,6 +420,13 @@ def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> U | |
""" | ||
return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties) | ||
|
||
def rewrite_manifests(self, spec_id: Optional[int] = None) -> RewriteManifestsResult: | ||
if self._table.current_snapshot() is None: | ||
return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) | ||
with self.update_snapshot().rewrite() as rewrite: | ||
rewritten = rewrite.rewrite_manifests() | ||
return rewritten | ||
|
||
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: | ||
""" | ||
Shorthand API for appending a PyArrow table to a table transaction. | ||
|
@@ -1168,6 +1174,20 @@ def add_files( | |
file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files | ||
) | ||
|
||
def rewrite_manifests( | ||
self, | ||
spec_id: Optional[int] = None, | ||
) -> RewriteManifestsResult: | ||
""" | ||
Shorthand API for Rewriting manifests for the table. | ||
|
||
Args: | ||
spec_id: Spec id of the manifests to rewrite (defaults to current spec id) | ||
|
||
""" | ||
with self.transaction() as tx: | ||
return tx.rewrite_manifests(spec_id=spec_id) | ||
|
||
def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: | ||
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -22,8 +22,9 @@ | |||||||||||||||||
from abc import abstractmethod | ||||||||||||||||||
from collections import defaultdict | ||||||||||||||||||
from concurrent.futures import Future | ||||||||||||||||||
from dataclasses import dataclass | ||||||||||||||||||
from functools import cached_property | ||||||||||||||||||
from typing import TYPE_CHECKING, Callable, Dict, Generic, List, Optional, Set, Tuple | ||||||||||||||||||
from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, List, Optional, Set, Tuple | ||||||||||||||||||
|
||||||||||||||||||
from sortedcontainers import SortedList | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -81,7 +82,7 @@ | |||||||||||||||||
from pyiceberg.utils.properties import property_as_bool, property_as_int | ||||||||||||||||||
|
||||||||||||||||||
if TYPE_CHECKING: | ||||||||||||||||||
from pyiceberg.table import Transaction | ||||||||||||||||||
from pyiceberg.table import Table, Transaction | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: | ||||||||||||||||||
|
@@ -477,6 +478,20 @@ def _deleted_entries(self) -> List[ManifestEntry]: | |||||||||||||||||
return [] | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
@dataclass(init=False) | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think of making this Using the
Suggested change
|
||||||||||||||||||
class RewriteManifestsResult: | ||||||||||||||||||
rewritten_manifests: List[ManifestFile] | ||||||||||||||||||
added_manifests: List[ManifestFile] | ||||||||||||||||||
Comment on lines
+483
to
+484
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about slapping on some default factories: https://docs.python.org/3/library/dataclasses.html#dataclasses.field
Suggested change
|
||||||||||||||||||
|
||||||||||||||||||
def __init__( | ||||||||||||||||||
self, | ||||||||||||||||||
rewritten_manifests: Optional[List[ManifestFile]], | ||||||||||||||||||
added_manifests: Optional[List[ManifestFile]], | ||||||||||||||||||
) -> None: | ||||||||||||||||||
self.rewritten_manifests = rewritten_manifests or [] | ||||||||||||||||||
self.added_manifests = added_manifests or [] | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
class _MergeAppendFiles(_FastAppendFiles): | ||||||||||||||||||
_target_size_bytes: int | ||||||||||||||||||
_min_count_to_merge: int | ||||||||||||||||||
|
@@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile | |||||||||||||||||
return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): | ||||||||||||||||||
_target_size_bytes: int | ||||||||||||||||||
rewritten_manifests: List[ManifestFile] = [] | ||||||||||||||||||
added_manifests: List[ManifestFile] = [] | ||||||||||||||||||
kept_manifests: List[ManifestFile] = [] | ||||||||||||||||||
|
||||||||||||||||||
def __init__( | ||||||||||||||||||
self, | ||||||||||||||||||
table: Table, | ||||||||||||||||||
transaction: Transaction, | ||||||||||||||||||
io: FileIO, | ||||||||||||||||||
spec_id: Optional[int] = None, | ||||||||||||||||||
snapshot_properties: Dict[str, str] = EMPTY_DICT, | ||||||||||||||||||
): | ||||||||||||||||||
from pyiceberg.table import TableProperties | ||||||||||||||||||
|
||||||||||||||||||
_table: Table | ||||||||||||||||||
_spec: PartitionSpec | ||||||||||||||||||
Comment on lines
+562
to
+563
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think these are used. |
||||||||||||||||||
|
||||||||||||||||||
super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) | ||||||||||||||||||
self._target_size_bytes = property_as_int( | ||||||||||||||||||
self._transaction.table_metadata.properties, | ||||||||||||||||||
TableProperties.MANIFEST_TARGET_SIZE_BYTES, | ||||||||||||||||||
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, | ||||||||||||||||||
) # type: ignore | ||||||||||||||||||
self._table = table | ||||||||||||||||||
self._spec_id = spec_id or table.spec().spec_id | ||||||||||||||||||
Comment on lines
+571
to
+572
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also add these at the class level? Just below 546. |
||||||||||||||||||
|
||||||||||||||||||
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: | ||||||||||||||||||
from pyiceberg.table import TableProperties | ||||||||||||||||||
|
||||||||||||||||||
ssc = SnapshotSummaryCollector() | ||||||||||||||||||
partition_summary_limit = int( | ||||||||||||||||||
self._transaction.table_metadata.properties.get( | ||||||||||||||||||
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT | ||||||||||||||||||
) | ||||||||||||||||||
) | ||||||||||||||||||
Comment on lines
+578
to
+582
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like we're cloning this logic, I've created an issue to resolve this in another PR: #1779 |
||||||||||||||||||
ssc.set_partition_summary_limit(partition_summary_limit) | ||||||||||||||||||
|
||||||||||||||||||
props = { | ||||||||||||||||||
"manifests-kept": str(len([])), | ||||||||||||||||||
"manifests-created": str(len(self.added_manifests)), | ||||||||||||||||||
"manifests-replaced": str(len(self.rewritten_manifests)), | ||||||||||||||||||
"entries-processed": str(len([])), | ||||||||||||||||||
Comment on lines
+586
to
+589
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
} | ||||||||||||||||||
previous_snapshot = ( | ||||||||||||||||||
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) | ||||||||||||||||||
if self._parent_snapshot_id is not None | ||||||||||||||||||
else None | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
return update_snapshot_summaries( | ||||||||||||||||||
summary=Summary(operation=self._operation, **ssc.build(), **props), | ||||||||||||||||||
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, | ||||||||||||||||||
truncate_full_table=False, | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
def rewrite_manifests(self) -> RewriteManifestsResult: | ||||||||||||||||||
data_result = self._find_matching_manifests(ManifestContent.DATA) | ||||||||||||||||||
|
||||||||||||||||||
self.rewritten_manifests.extend(data_result.rewritten_manifests) | ||||||||||||||||||
self.added_manifests.extend(data_result.added_manifests) | ||||||||||||||||||
|
||||||||||||||||||
deletes_result = self._find_matching_manifests(ManifestContent.DELETES) | ||||||||||||||||||
self.rewritten_manifests.extend(deletes_result.rewritten_manifests) | ||||||||||||||||||
self.added_manifests.extend(deletes_result.added_manifests) | ||||||||||||||||||
|
||||||||||||||||||
if not self.rewritten_manifests: | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, just for clarity that it is a list:
Suggested change
|
||||||||||||||||||
return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) | ||||||||||||||||||
|
||||||||||||||||||
return RewriteManifestsResult(rewritten_manifests=self.rewritten_manifests, added_manifests=self.added_manifests) | ||||||||||||||||||
|
||||||||||||||||||
def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifestsResult: | ||||||||||||||||||
snapshot = self._table.current_snapshot() | ||||||||||||||||||
if self._spec_id and self._spec_id not in self._table.specs(): | ||||||||||||||||||
raise ValueError(f"Cannot find spec with id: {self._spec_id}") | ||||||||||||||||||
|
||||||||||||||||||
if not snapshot: | ||||||||||||||||||
raise ValueError("Cannot rewrite manifests without a current snapshot") | ||||||||||||||||||
Comment on lines
+619
to
+624
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about pushing this to thei |
||||||||||||||||||
|
||||||||||||||||||
manifests = [ | ||||||||||||||||||
manifest | ||||||||||||||||||
for manifest in snapshot.manifests(io=self._io) | ||||||||||||||||||
if manifest.partition_spec_id == self._spec_id and manifest.content == content | ||||||||||||||||||
] | ||||||||||||||||||
Comment on lines
+626
to
+630
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a followup it might be worthwile to see if we can cache this result, since we're going over the manifests twice (once for data, once for delete). |
||||||||||||||||||
|
||||||||||||||||||
data_manifest_merge_manager = _ManifestMergeManager( | ||||||||||||||||||
target_size_bytes=self._target_size_bytes, | ||||||||||||||||||
min_count_to_merge=2, | ||||||||||||||||||
merge_enabled=True, | ||||||||||||||||||
Comment on lines
+634
to
+635
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in Java we set the properties from the table properties: |
||||||||||||||||||
snapshot_producer=self, | ||||||||||||||||||
) | ||||||||||||||||||
new_manifests = data_manifest_merge_manager.merge_manifests(manifests=manifests) | ||||||||||||||||||
|
||||||||||||||||||
return RewriteManifestsResult(rewritten_manifests=manifests, added_manifests=new_manifests) | ||||||||||||||||||
|
||||||||||||||||||
def _copy_manifest_file(self, manifest_file: ManifestFile, snapshot_id: int) -> ManifestFile: | ||||||||||||||||||
return ManifestFile( | ||||||||||||||||||
manifest_path=manifest_file.manifest_path, | ||||||||||||||||||
manifest_length=manifest_file.manifest_length, | ||||||||||||||||||
partition_spec_id=manifest_file.partition_spec_id, | ||||||||||||||||||
content=manifest_file.content, | ||||||||||||||||||
sequence_number=manifest_file.sequence_number, | ||||||||||||||||||
min_sequence_number=manifest_file.min_sequence_number, | ||||||||||||||||||
added_snapshot_id=snapshot_id, | ||||||||||||||||||
added_files_count=manifest_file.added_files_count, | ||||||||||||||||||
existing_files_count=manifest_file.existing_files_count, | ||||||||||||||||||
deleted_files_count=manifest_file.deleted_files_count, | ||||||||||||||||||
added_rows_count=manifest_file.added_rows_count, | ||||||||||||||||||
existing_rows_count=manifest_file.existing_rows_count, | ||||||||||||||||||
deleted_rows_count=manifest_file.deleted_rows_count, | ||||||||||||||||||
partitions=manifest_file.partitions, | ||||||||||||||||||
key_metadata=manifest_file.key_metadata, | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
def __exit__(self, _: Any, value: Any, traceback: Any) -> None: | ||||||||||||||||||
"""Commit only if we have rewritten any manifests.""" | ||||||||||||||||||
if self.rewritten_manifests: | ||||||||||||||||||
self.commit() | ||||||||||||||||||
|
||||||||||||||||||
def _existing_manifests(self) -> List[ManifestFile]: | ||||||||||||||||||
"""Determine if there are any existing manifest files.""" | ||||||||||||||||||
return [self._copy_manifest_file(manifest, self.snapshot_id) for manifest in self.added_manifests] | ||||||||||||||||||
|
||||||||||||||||||
def _deleted_entries(self) -> List[ManifestEntry]: | ||||||||||||||||||
"""To determine if we need to record any deleted manifest entries. | ||||||||||||||||||
|
||||||||||||||||||
In case of an append, nothing is deleted. | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Copy paste :) |
||||||||||||||||||
""" | ||||||||||||||||||
return [] | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]): | ||||||||||||||||||
"""Overwrites data from the table. This will produce an OVERWRITE snapshot. | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -625,6 +772,14 @@ def merge_append(self) -> _MergeAppendFiles: | |||||||||||||||||
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
def rewrite(self) -> _RewriteManifests: | ||||||||||||||||||
return _RewriteManifests( | ||||||||||||||||||
table=self._transaction._table, | ||||||||||||||||||
transaction=self._transaction, | ||||||||||||||||||
io=self._io, | ||||||||||||||||||
snapshot_properties=self._snapshot_properties, | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles: | ||||||||||||||||||
return _OverwriteFiles( | ||||||||||||||||||
commit_uuid=commit_uuid, | ||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we already do this here:
iceberg-python/pyiceberg/manifest.py
Line 823 in 7648803