Skip to content

Rewrite manifests #1661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,10 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]

def __hash__(self) -> int:
"""Return the hash of the file path."""
return hash(self.manifest_path)


@cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list))
def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]:
Expand Down Expand Up @@ -861,6 +865,7 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter:
entry.snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file
)
)
self._existing_files += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we already do this here:

self._existing_files += 1

return self


Expand Down
30 changes: 25 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@
update_table_metadata,
)
from pyiceberg.table.update.schema import UpdateSchema
from pyiceberg.table.update.snapshot import (
ManageSnapshots,
UpdateSnapshot,
_FastAppendFiles,
)
from pyiceberg.table.update.snapshot import ManageSnapshots, RewriteManifestsResult, UpdateSnapshot, _FastAppendFiles
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
from pyiceberg.transforms import IdentityTransform
Expand Down Expand Up @@ -223,6 +219,9 @@ class TableProperties:
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1

SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled"
SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = False
Comment on lines +222 to +223
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem unused, and I don't think we want to add this to PyIceberg. This was introduced in Java to test V2 features on a V1 table.



class Transaction:
_table: Table
Expand Down Expand Up @@ -421,6 +420,13 @@ def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> U
"""
return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)

def rewrite_manifests(self, spec_id: Optional[int] = None) -> RewriteManifestsResult:
if self._table.current_snapshot() is None:
return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[])
with self.update_snapshot().rewrite() as rewrite:
rewritten = rewrite.rewrite_manifests()
return rewritten

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Shorthand API for appending a PyArrow table to a table transaction.
Expand Down Expand Up @@ -1168,6 +1174,20 @@ def add_files(
file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files
)

def rewrite_manifests(
self,
spec_id: Optional[int] = None,
) -> RewriteManifestsResult:
"""
Shorthand API for Rewriting manifests for the table.

Args:
spec_id: Spec id of the manifests to rewrite (defaults to current spec id)

"""
with self.transaction() as tx:
return tx.rewrite_manifests(spec_id=spec_id)

def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)

Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def get_prop(prop: str) -> int:
def update_snapshot_summaries(
summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False
) -> Summary:
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}:
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE, Operation.REPLACE}:
raise ValueError(f"Operation not implemented: {summary.operation}")

if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
Expand Down
159 changes: 157 additions & 2 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
from abc import abstractmethod
from collections import defaultdict
from concurrent.futures import Future
from dataclasses import dataclass
from functools import cached_property
from typing import TYPE_CHECKING, Callable, Dict, Generic, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, List, Optional, Set, Tuple

from sortedcontainers import SortedList

Expand Down Expand Up @@ -81,7 +82,7 @@
from pyiceberg.utils.properties import property_as_bool, property_as_int

if TYPE_CHECKING:
from pyiceberg.table import Transaction
from pyiceberg.table import Table, Transaction


def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
Expand Down Expand Up @@ -477,6 +478,20 @@ def _deleted_entries(self) -> List[ManifestEntry]:
return []


@dataclass(init=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of making this Frozen? This gives some nice benefits like being hashable: https://docs.python.org/3/library/dataclasses.html#frozen-instances

Using the default_factory's, we can also drop the init.

Suggested change
@dataclass(init=False)
@dataclass(frozen=True)

class RewriteManifestsResult:
rewritten_manifests: List[ManifestFile]
added_manifests: List[ManifestFile]
Comment on lines +483 to +484
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about slapping on some default factories: https://docs.python.org/3/library/dataclasses.html#dataclasses.field

Suggested change
rewritten_manifests: List[ManifestFile]
added_manifests: List[ManifestFile]
rewritten_manifests: List[ManifestFile] = field(default_factory=list)
added_manifests: List[ManifestFile] = field(default_factory=list)


def __init__(
self,
rewritten_manifests: Optional[List[ManifestFile]],
added_manifests: Optional[List[ManifestFile]],
) -> None:
self.rewritten_manifests = rewritten_manifests or []
self.added_manifests = added_manifests or []


class _MergeAppendFiles(_FastAppendFiles):
_target_size_bytes: int
_min_count_to_merge: int
Expand Down Expand Up @@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile
return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests


class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]):
_target_size_bytes: int
rewritten_manifests: List[ManifestFile] = []
added_manifests: List[ManifestFile] = []
kept_manifests: List[ManifestFile] = []

def __init__(
self,
table: Table,
transaction: Transaction,
io: FileIO,
spec_id: Optional[int] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
):
from pyiceberg.table import TableProperties

_table: Table
_spec: PartitionSpec
Comment on lines +562 to +563
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these are used.


super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties)
self._target_size_bytes = property_as_int(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
) # type: ignore
self._table = table
self._spec_id = spec_id or table.spec().spec_id
Comment on lines +571 to +572
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add these at the class level? Just below 546.


def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
from pyiceberg.table import TableProperties

ssc = SnapshotSummaryCollector()
partition_summary_limit = int(
self._transaction.table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
)
Comment on lines +578 to +582
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're cloning this logic, I've created an issue to resolve this in another PR: #1779

ssc.set_partition_summary_limit(partition_summary_limit)

props = {
"manifests-kept": str(len([])),
"manifests-created": str(len(self.added_manifests)),
"manifests-replaced": str(len(self.rewritten_manifests)),
"entries-processed": str(len([])),
Comment on lines +586 to +589
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"manifests-kept": str(len([])),
"manifests-created": str(len(self.added_manifests)),
"manifests-replaced": str(len(self.rewritten_manifests)),
"entries-processed": str(len([])),
"manifests-kept": "0",
"manifests-created": str(len(self.added_manifests)),
"manifests-replaced": str(len(self.rewritten_manifests)),
"entries-processed": "0"

}
previous_snapshot = (
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
if self._parent_snapshot_id is not None
else None
)

return update_snapshot_summaries(
summary=Summary(operation=self._operation, **ssc.build(), **props),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
truncate_full_table=False,
)

def rewrite_manifests(self) -> RewriteManifestsResult:
data_result = self._find_matching_manifests(ManifestContent.DATA)

self.rewritten_manifests.extend(data_result.rewritten_manifests)
self.added_manifests.extend(data_result.added_manifests)

deletes_result = self._find_matching_manifests(ManifestContent.DELETES)
self.rewritten_manifests.extend(deletes_result.rewritten_manifests)
self.added_manifests.extend(deletes_result.added_manifests)

if not self.rewritten_manifests:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, just for clarity that it is a list:

Suggested change
if not self.rewritten_manifests:
if len(self.rewritten_manifests) == 0:

return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[])

return RewriteManifestsResult(rewritten_manifests=self.rewritten_manifests, added_manifests=self.added_manifests)

def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifestsResult:
snapshot = self._table.current_snapshot()
if self._spec_id and self._spec_id not in self._table.specs():
raise ValueError(f"Cannot find spec with id: {self._spec_id}")

if not snapshot:
raise ValueError("Cannot rewrite manifests without a current snapshot")
Comment on lines +619 to +624
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about pushing this to thei __init__ so we don't have to do this every time


manifests = [
manifest
for manifest in snapshot.manifests(io=self._io)
if manifest.partition_spec_id == self._spec_id and manifest.content == content
]
Comment on lines +626 to +630
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a followup it might be worthwile to see if we can cache this result, since we're going over the manifests twice (once for data, once for delete).


data_manifest_merge_manager = _ManifestMergeManager(
target_size_bytes=self._target_size_bytes,
min_count_to_merge=2,
merge_enabled=True,
Comment on lines +634 to +635
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snapshot_producer=self,
)
new_manifests = data_manifest_merge_manager.merge_manifests(manifests=manifests)

return RewriteManifestsResult(rewritten_manifests=manifests, added_manifests=new_manifests)

def _copy_manifest_file(self, manifest_file: ManifestFile, snapshot_id: int) -> ManifestFile:
return ManifestFile(
manifest_path=manifest_file.manifest_path,
manifest_length=manifest_file.manifest_length,
partition_spec_id=manifest_file.partition_spec_id,
content=manifest_file.content,
sequence_number=manifest_file.sequence_number,
min_sequence_number=manifest_file.min_sequence_number,
added_snapshot_id=snapshot_id,
added_files_count=manifest_file.added_files_count,
existing_files_count=manifest_file.existing_files_count,
deleted_files_count=manifest_file.deleted_files_count,
added_rows_count=manifest_file.added_rows_count,
existing_rows_count=manifest_file.existing_rows_count,
deleted_rows_count=manifest_file.deleted_rows_count,
partitions=manifest_file.partitions,
key_metadata=manifest_file.key_metadata,
)

def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
"""Commit only if we have rewritten any manifests."""
if self.rewritten_manifests:
self.commit()

def _existing_manifests(self) -> List[ManifestFile]:
"""Determine if there are any existing manifest files."""
return [self._copy_manifest_file(manifest, self.snapshot_id) for manifest in self.added_manifests]

def _deleted_entries(self) -> List[ManifestEntry]:
"""To determine if we need to record any deleted manifest entries.

In case of an append, nothing is deleted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy paste :)

"""
return []


class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]):
"""Overwrites data from the table. This will produce an OVERWRITE snapshot.

Expand Down Expand Up @@ -625,6 +772,14 @@ def merge_append(self) -> _MergeAppendFiles:
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
)

def rewrite(self) -> _RewriteManifests:
return _RewriteManifests(
table=self._transaction._table,
transaction=self._transaction,
io=self._io,
snapshot_properties=self._snapshot_properties,
)

def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
return _OverwriteFiles(
commit_uuid=commit_uuid,
Expand Down
Loading
Loading