diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 5a32a6330c..b1053c154d 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -636,6 +636,10 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List if not discard_deleted or entry.status != ManifestEntryStatus.DELETED ] + def __hash__(self) -> int: + """Return the hash of the file path.""" + return hash(self.manifest_path) + @cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list)) def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]: @@ -861,6 +865,7 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter: entry.snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file ) ) + self._existing_files += 1 return self diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f857fb8cc0..ad3af67683 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -112,11 +112,7 @@ update_table_metadata, ) from pyiceberg.table.update.schema import UpdateSchema -from pyiceberg.table.update.snapshot import ( - ManageSnapshots, - UpdateSnapshot, - _FastAppendFiles, -) +from pyiceberg.table.update.snapshot import ManageSnapshots, RewriteManifestsResult, UpdateSnapshot, _FastAppendFiles from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics from pyiceberg.transforms import IdentityTransform @@ -223,6 +219,9 @@ class TableProperties: MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep" MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1 + SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled" + SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = False + class Transaction: _table: Table @@ -421,6 +420,13 @@ def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> U """ return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties) + def rewrite_manifests(self, spec_id: Optional[int] = None) -> RewriteManifestsResult: + if self._table.current_snapshot() is None: + return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) + with self.update_snapshot().rewrite() as rewrite: + rewritten = rewrite.rewrite_manifests() + return rewritten + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Shorthand API for appending a PyArrow table to a table transaction. @@ -1168,6 +1174,20 @@ def add_files( file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files ) + def rewrite_manifests( + self, + spec_id: Optional[int] = None, + ) -> RewriteManifestsResult: + """ + Shorthand API for Rewriting manifests for the table. + + Args: + spec_id: Spec id of the manifests to rewrite (defaults to current spec id) + + """ + with self.transaction() as tx: + return tx.rewrite_manifests(spec_id=spec_id) + def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index a5515f12b0..020a452e85 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -352,7 +352,7 @@ def get_prop(prop: str) -> int: def update_snapshot_summaries( summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False ) -> Summary: - if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}: + if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE, Operation.REPLACE}: raise ValueError(f"Operation not implemented: {summary.operation}") if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c0d0056e7c..ba0a1dcdc2 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -22,8 +22,9 @@ from abc import abstractmethod from collections import defaultdict from concurrent.futures import Future +from dataclasses import dataclass from functools import cached_property -from typing import TYPE_CHECKING, Callable, Dict, Generic, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, List, Optional, Set, Tuple from sortedcontainers import SortedList @@ -81,7 +82,7 @@ from pyiceberg.utils.properties import property_as_bool, property_as_int if TYPE_CHECKING: - from pyiceberg.table import Transaction + from pyiceberg.table import Table, Transaction def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: @@ -477,6 +478,20 @@ def _deleted_entries(self) -> List[ManifestEntry]: return [] +@dataclass(init=False) +class RewriteManifestsResult: + rewritten_manifests: List[ManifestFile] + added_manifests: List[ManifestFile] + + def __init__( + self, + rewritten_manifests: Optional[List[ManifestFile]], + added_manifests: Optional[List[ManifestFile]], + ) -> None: + self.rewritten_manifests = rewritten_manifests or [] + self.added_manifests = added_manifests or [] + + class _MergeAppendFiles(_FastAppendFiles): _target_size_bytes: int _min_count_to_merge: int @@ -528,6 +543,138 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _target_size_bytes: int + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + _table: Table + _spec: PartitionSpec + + super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) + self._target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._table = table + self._spec_id = spec_id or table.spec().spec_id + + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: + from pyiceberg.table import TableProperties + + ssc = SnapshotSummaryCollector() + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + ) + ssc.set_partition_summary_limit(partition_summary_limit) + + props = { + "manifests-kept": str(len([])), + "manifests-created": str(len(self.added_manifests)), + "manifests-replaced": str(len(self.rewritten_manifests)), + "entries-processed": str(len([])), + } + previous_snapshot = ( + self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if self._parent_snapshot_id is not None + else None + ) + + return update_snapshot_summaries( + summary=Summary(operation=self._operation, **ssc.build(), **props), + previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, + truncate_full_table=False, + ) + + def rewrite_manifests(self) -> RewriteManifestsResult: + data_result = self._find_matching_manifests(ManifestContent.DATA) + + self.rewritten_manifests.extend(data_result.rewritten_manifests) + self.added_manifests.extend(data_result.added_manifests) + + deletes_result = self._find_matching_manifests(ManifestContent.DELETES) + self.rewritten_manifests.extend(deletes_result.rewritten_manifests) + self.added_manifests.extend(deletes_result.added_manifests) + + if not self.rewritten_manifests: + return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) + + return RewriteManifestsResult(rewritten_manifests=self.rewritten_manifests, added_manifests=self.added_manifests) + + def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifestsResult: + snapshot = self._table.current_snapshot() + if self._spec_id and self._spec_id not in self._table.specs(): + raise ValueError(f"Cannot find spec with id: {self._spec_id}") + + if not snapshot: + raise ValueError("Cannot rewrite manifests without a current snapshot") + + manifests = [ + manifest + for manifest in snapshot.manifests(io=self._io) + if manifest.partition_spec_id == self._spec_id and manifest.content == content + ] + + data_manifest_merge_manager = _ManifestMergeManager( + target_size_bytes=self._target_size_bytes, + min_count_to_merge=2, + merge_enabled=True, + snapshot_producer=self, + ) + new_manifests = data_manifest_merge_manager.merge_manifests(manifests=manifests) + + return RewriteManifestsResult(rewritten_manifests=manifests, added_manifests=new_manifests) + + def _copy_manifest_file(self, manifest_file: ManifestFile, snapshot_id: int) -> ManifestFile: + return ManifestFile( + manifest_path=manifest_file.manifest_path, + manifest_length=manifest_file.manifest_length, + partition_spec_id=manifest_file.partition_spec_id, + content=manifest_file.content, + sequence_number=manifest_file.sequence_number, + min_sequence_number=manifest_file.min_sequence_number, + added_snapshot_id=snapshot_id, + added_files_count=manifest_file.added_files_count, + existing_files_count=manifest_file.existing_files_count, + deleted_files_count=manifest_file.deleted_files_count, + added_rows_count=manifest_file.added_rows_count, + existing_rows_count=manifest_file.existing_rows_count, + deleted_rows_count=manifest_file.deleted_rows_count, + partitions=manifest_file.partitions, + key_metadata=manifest_file.key_metadata, + ) + + def __exit__(self, _: Any, value: Any, traceback: Any) -> None: + """Commit only if we have rewritten any manifests.""" + if self.rewritten_manifests: + self.commit() + + def _existing_manifests(self) -> List[ManifestFile]: + """Determine if there are any existing manifest files.""" + return [self._copy_manifest_file(manifest, self.snapshot_id) for manifest in self.added_manifests] + + def _deleted_entries(self) -> List[ManifestEntry]: + """To determine if we need to record any deleted manifest entries. + + In case of an append, nothing is deleted. + """ + return [] + + class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]): """Overwrites data from the table. This will produce an OVERWRITE snapshot. @@ -625,6 +772,14 @@ def merge_append(self) -> _MergeAppendFiles: operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties ) + def rewrite(self) -> _RewriteManifests: + return _RewriteManifests( + table=self._transaction._table, + transaction=self._transaction, + io=self._io, + snapshot_properties=self._snapshot_properties, + ) + def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles: return _OverwriteFiles( commit_uuid=commit_uuid, diff --git a/tests/integration/test_writes/test_rewrite_manifests.py b/tests/integration/test_writes/test_rewrite_manifests.py new file mode 100644 index 0000000000..922a29d30f --- /dev/null +++ b/tests/integration/test_writes/test_rewrite_manifests.py @@ -0,0 +1,249 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name +from typing import List + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.manifest import ManifestFile +from pyiceberg.table import TableProperties +from utils import _create_table + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v1_without_data" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_without_data]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_only_nulls" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_only_nulls]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v2_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v2_without_data" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_without_data]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v2_with_only_nulls" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_only_nulls]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v2_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_with_null]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_v2_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + with tbl.transaction() as tx: + tx.upgrade_table_version(format_version=2) + + tbl.append(arrow_table_with_null) + + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +# @pytest.mark.integration +# def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: +# identifier = "default.arrow_table_summaries" +# tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) +# tbl.append(arrow_table_with_null) +# tbl.append(arrow_table_with_null) +# +# # tbl.rewrite_manifests() +# +# # records1 = [ThreeColumnRecord(1, None, "AAAA")] +# # write_records(spark, table_location, records1) +# before_pandas = tbl.scan().to_pandas() +# before_count = before_pandas.shape[0] +# tbl.refresh() +# manifests = tbl.inspect.manifests().to_pylist() +# assert len(manifests) == 2, "Should have 2 manifests before rewrite" +# +# tbl.rewrite_manifests() +# tbl.refresh() +# +# after_pandas = tbl.scan().to_pandas() +# after_count = before_pandas.shape[0] +# manifests = tbl.inspect.manifests().to_pylist() +# assert len(manifests) == 1, "Should have 1 manifests before rewrite" +# +# snaps = tbl.inspect.snapshots().to_pandas() +# print(snaps) + + +@pytest.mark.integration +def test_rewrite_manifests_empty_table(session_catalog: Catalog) -> None: + # Create an unpartitioned table + identifier = "default.test_rewrite_manifests_empty_table" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) + + assert tbl.current_snapshot() is None, "Table must be empty" + + # Execute rewrite manifests action + tbl.rewrite_manifests() + + tbl.refresh() + assert tbl.current_snapshot() is None, "Table must stay empty" + + +@pytest.mark.integration +def test_rewrite_small_manifests_non_partitioned_table(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_rewrite_small_manifests_non_partitioned_table" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + tbl.refresh() + + manifests = tbl.inspect.manifests() + assert len(manifests) == 2, "Should have 2 manifests before rewrite" + + result = tbl.rewrite_manifests() + + assert len(result.rewritten_manifests) == 2, "Action should rewrite 2 manifests" + assert len(result.added_manifests) == 1, "Action should add 1 manifest" + + tbl.refresh() + + current_snapshot = tbl.current_snapshot() + if not current_snapshot: + raise AssertionError + new_manifests = current_snapshot.manifests(tbl.io) + assert len(new_manifests) == 1, "Should have 1 manifest after rewrite" + assert new_manifests[0].existing_files_count == 2, "Should have 4 files in the new manifest" + assert new_manifests[0].added_files_count == 0, "Should have no added files in the new manifest" + assert new_manifests[0].deleted_files_count == 0, "Should have no deleted files in the new manifest" + + # Validate the records + expected_records_count = arrow_table_with_null.shape[0] * 2 + result_df = tbl.scan().to_pandas() + actual_records_count = result_df.shape[0] + assert expected_records_count == actual_records_count, "Rows must match" + + +def compute_manifest_entry_size_bytes(manifests: List[ManifestFile]) -> float: + total_size = 0 + num_entries = 0 + + for manifest in manifests: + total_size += manifest.manifest_length + num_entries += manifest.added_files_count + manifest.existing_files_count + manifest.deleted_files_count + + return total_size / num_entries if num_entries > 0 else 0 + + +def test_rewrite_small_manifests_partitioned_table(session_catalog: Catalog) -> None: + records1 = pa.Table.from_pydict({"c1": [1, 1], "c2": [None, "BBBBBBBBBB"], "c3": ["AAAA", "BBBB"]}) + + records2 = records2 = pa.Table.from_pydict({"c1": [2, 2], "c2": ["CCCCCCCCCC", "DDDDDDDDDD"], "c3": ["CCCC", "DDDD"]}) + + records3 = records3 = pa.Table.from_pydict({"c1": [3, 3], "c2": ["EEEEEEEEEE", "FFFFFFFFFF"], "c3": ["EEEE", "FFFF"]}) + + records4 = records4 = pa.Table.from_pydict({"c1": [4, 4], "c2": ["GGGGGGGGGG", "HHHHHHHHHG"], "c3": ["GGGG", "HHHH"]}) + + schema = pa.schema( + [ + ("c1", pa.int64()), + ("c2", pa.string()), + ("c3", pa.string()), + ] + ) + + identifier = "default.test_rewrite_small_manifests_non_partitioned_table" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=schema) + + tbl.append(records1) + tbl.append(records2) + tbl.append(records3) + tbl.append(records4) + tbl.refresh() + + tbl.refresh() + manifests = tbl.current_snapshot().manifests(tbl.io) + assert len(manifests) == 4, "Should have 4 manifests before rewrite" + + # manifest_entry_size_bytes = compute_manifest_entry_size_bytes(manifests) + target_manifest_size_bytes = 5200 * 2 + 100 + tbl = ( + tbl.transaction() + .set_properties({TableProperties.MANIFEST_TARGET_SIZE_BYTES: str(target_manifest_size_bytes)}) + .commit_transaction() + ) + + result = tbl.rewrite_manifests() + + tbl.refresh() + assert len(result.rewritten_manifests) == 4, "Action should rewrite 4 manifests" + assert len(result.added_manifests) == 2, "Action should add 2 manifests" + + new_manifests = tbl.current_snapshot().manifests(tbl.io) + assert len(new_manifests) == 2, "Should have 2 manifests after rewrite" + + assert new_manifests[0].existing_files_count == 4 + assert new_manifests[0].added_files_count == 0 + assert new_manifests[0].deleted_files_count == 0 + + assert new_manifests[1].existing_files_count == 4 + assert new_manifests[1].added_files_count == 0 + assert new_manifests[1].deleted_files_count == 0 + + sorted_df = tbl.scan().to_pandas().sort_values(["c1", "c2"], ascending=[False, False]) + expectedRecords = ( + pa.concat_tables([records1, records2, records3, records4]).to_pandas().sort_values(["c1", "c2"], ascending=[False, False]) + ) + from pandas.testing import assert_frame_equal + + assert_frame_equal(sorted_df.reset_index(drop=True), expectedRecords.reset_index(drop=True))