From 23488e5417b4ba09b6e998f77e33f7dc9c34b02b Mon Sep 17 00:00:00 2001 From: amitgilad Date: Mon, 10 Feb 2025 19:52:51 +0200 Subject: [PATCH 1/7] first attempt at init_manifests --- pyiceberg/manifest.py | 22 +- pyiceberg/table/__init__.py | 77 +++++- pyiceberg/table/update/snapshot.py | 315 +++++++++++++++++++----- tests/integration/test_inspect_table.py | 45 ++++ 4 files changed, 395 insertions(+), 64 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 5a32a6330c..92247c98f8 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -612,6 +612,24 @@ def has_added_files(self) -> bool: def has_existing_files(self) -> bool: return self.existing_files_count is None or self.existing_files_count > 0 + def copy_with_snapshot_id(self, snapshot_id: int) -> ManifestFile: + return ManifestFile( + manifest_path=self.manifest_path, + manifest_length=self.manifest_length, + partition_spec_id=self.partition_spec_id, + content=self.content, + sequence_number=self.sequence_number, + min_sequence_number=self.min_sequence_number, + added_snapshot_id=snapshot_id, + added_files_count=self.added_files_count, + existing_files_count=self.existing_files_count, + deleted_files_count=self.deleted_files_count, + added_rows_count=self.added_rows_count, + existing_rows_count=self.existing_rows_count, + deleted_rows_count=self.deleted_rows_count, + partitions=self.partitions, + key_metadata=self.key_metadata, + ) def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List[ManifestEntry]: """ Read the manifest entries from the manifest file. @@ -635,7 +653,9 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List for entry in reader if not discard_deleted or entry.status != ManifestEntryStatus.DELETED ] - + def __hash__(self) -> int: + """Return the hash of the file path.""" + return hash(self.manifest_path) @cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list)) def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f857fb8cc0..d3d9e12109 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -116,6 +116,7 @@ ManageSnapshots, UpdateSnapshot, _FastAppendFiles, + _ManifestMergeManager, ) from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics @@ -135,7 +136,7 @@ ) from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config -from pyiceberg.utils.properties import property_as_bool +from pyiceberg.utils.properties import property_as_bool, property_as_int if TYPE_CHECKING: import daft @@ -420,7 +421,29 @@ def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> U A new UpdateSnapshot """ return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties) - + def rewrite_manifests(self) -> None: + + # snapshot = self._table.current_snapshot() + # manifests = [] + # for manifest in snapshot.manifests(self.io): + # if manifest.content == ManifestContent.DATA: + # manifests.append(manifest) + + with self.update_snapshot().rewrite() as rewrite: + rewrite.commit() + + # data_manifest_merge_manager = _ManifestMergeManager( + # target_size_bytes=property_as_int( + # self.properties, + # TableProperties.MANIFEST_TARGET_SIZE_BYTES, + # TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + # ), + # min_count_to_merge=2, + # merge_enabled=True, + # snapshot_producer=self, + # ) + + # data_manifest_merge_manager.merge_manifests(manifests) def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Shorthand API for appending a PyArrow table to a table transaction. @@ -1084,6 +1107,9 @@ def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() + def rewrite_manifests(self) -> None: + with self.transaction() as tx: + tx.rewrite_manifests() def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Shorthand API for appending a PyArrow table to the table. @@ -1168,6 +1194,53 @@ def add_files( file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files ) + def rewrite_manifests( + self, + spec_id: Optional[int] = None, + rewrite_all: bool = False, + max_manifest_size: Optional[int] = None, + ) -> "Table": + + with self.transaction() as tx: + tx.rewrite_manifests() + ... + """Rewrite manifests in the table. + + Args: + spec_id: Spec ID to be used for the rewritten manifests + rewrite_all: If True, rewrite all manifests. If False, only rewrite small manifests + max_manifest_size: Target size for manifests in bytes + + Returns: + An updated version of the table with rewritten manifests + #""" + # return RewriteManifests( + # self, + # spec_id=spec_id, + # rewrite_all=rewrite_all, + # max_manifest_size=max_manifest_size, + # ).commit() + + # snapshot = self.current_snapshot() + # manifests = [] + # for manifest in snapshot.manifests(self.io): + # if manifest.content == ManifestContent.DATA: + # manifests.append(manifest) + # + # data_manifest_merge_manager = _ManifestMergeManager( + # target_size_bytes=property_as_int( + # self.properties, + # TableProperties.MANIFEST_TARGET_SIZE_BYTES, + # TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + # ), + # min_count_to_merge=2, + # merge_enabled=True, + # snapshot_producer=self, + # ) + # + # data_manifest_merge_manager.merge_manifests(manifests) + # entries = self.inspect.entries().filter("status < 2").selectExpr("input_file_name() as manifest") + def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c0d0056e7c..0cafd4f2fa 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -105,12 +105,12 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _deleted_data_files: Set[DataFile] def __init__( - self, - operation: Operation, - transaction: Transaction, - io: FileIO, - commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT, + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() @@ -135,10 +135,12 @@ def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: return self @abstractmethod - def _deleted_entries(self) -> List[ManifestEntry]: ... + def _deleted_entries(self) -> List[ManifestEntry]: + ... @abstractmethod - def _existing_manifests(self) -> List[ManifestFile]: ... + def _existing_manifests(self) -> List[ManifestFile]: + ... def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]: """To perform any post-processing on the manifests before writing them to the new snapshot.""" @@ -148,11 +150,11 @@ def _manifests(self) -> List[ManifestFile]: def _write_added_manifest() -> List[ManifestFile]: if self._added_data_files: with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.spec(), - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.spec(), + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, ) as writer: for data_file in self._added_data_files: writer.add( @@ -178,11 +180,11 @@ def _write_delete_manifest() -> List[ManifestFile]: partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry) for spec_id, entries in partition_groups.items(): with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.specs()[spec_id], - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, ) as writer: for entry in entries: writer.add_entry(entry) @@ -197,7 +199,8 @@ def _write_delete_manifest() -> List[ManifestFile]: delete_manifests = executor.submit(_write_delete_manifest) existing_manifests = executor.submit(self._existing_manifests) - return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result()) + return self._process_manifests( + added_manifests.result() + delete_manifests.result() + existing_manifests.result()) def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: from pyiceberg.table import TableProperties @@ -251,11 +254,11 @@ def _commit(self) -> UpdatesAndRequirements: commit_uuid=self.commit_uuid, ) with write_manifest_list( - format_version=self._transaction.table_metadata.format_version, - output_file=self._io.new_output(manifest_list_file_path), - snapshot_id=self._snapshot_id, - parent_snapshot_id=self._parent_snapshot_id, - sequence_number=next_sequence_number, + format_version=self._transaction.table_metadata.format_version, + output_file=self._io.new_output(manifest_list_file_path), + snapshot_id=self._snapshot_id, + parent_snapshot_id=self._parent_snapshot_id, + sequence_number=next_sequence_number, ) as writer: writer.add_manifests(new_manifests) @@ -272,7 +275,8 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch" + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", + type="branch" ), ), (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), @@ -321,12 +325,12 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]): _case_sensitive: bool def __init__( - self, - operation: Operation, - transaction: Transaction, - io: FileIO, - commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT, + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ): super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) self._predicate = AlwaysFalse() @@ -379,7 +383,8 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ) manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval + strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, + case_sensitive=self._case_sensitive).eval inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( schema, self._predicate, case_sensitive=self._case_sensitive ).eval @@ -415,11 +420,11 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # Rewrite the manifest if len(existing_entries) > 0: with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, ) as writer: for existing_entry in existing_entries: writer.add_entry(existing_entry) @@ -448,6 +453,182 @@ def files_affected(self) -> bool: return len(self._deleted_entries()) > 0 +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + """Rewrite manifest files based on the predicate.""" + + KEPT_MANIFESTS_COUNT = "manifests-kept" + CREATED_MANIFESTS_COUNT = "manifests-created" + REPLACED_MANIFESTS_COUNT = "manifests-replaced" + PROCESSED_ENTRY_COUNT = "entries-processed" + + def __init__( + self, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + super().__init__(Operation.REPLACE, transaction, io, commit_uuid, snapshot_properties) + + self.specs_by_id = self._transaction.table_metadata.spec().spec_id + self.manifest_target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) + self.deleted_manifests: Set[ManifestFile] = set() + self.added_manifests: List[ManifestFile] = [] + self.rewritten_added_manifests: List[ManifestFile] = [] + self.kept_manifests: Set[ManifestFile] = set() + self.new_manifests: Set[ManifestFile] = set() + self.rewritten_manifests: Set[ManifestFile] = set() + + def copy_manifest(self, manifest: ManifestFile) -> ManifestFile: + return ManifestFile( + manifest_path=manifest.manifest_path, + manifest_length=manifest.manifest_length, + partition_spec_id=manifest.partition_spec_id, + content=manifest.content, + sequence_number=manifest.sequence_number, + min_sequence_number=manifest.min_sequence_number, + added_snapshot_id=manifest.added_snapshot_id, + added_files_count=manifest.added_files_count, + existing_files_count=manifest.existing_files_count, + deleted_files_count=manifest.deleted_files_count, + added_rows_count=manifest.added_rows_count, + existing_rows_count=manifest.existing_rows_count, + deleted_rows_count=manifest.deleted_rows_count, + partitions=manifest.partitions, + key_metadata=manifest.key_metadata, + ) + + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: + summary = { + self.CREATED_MANIFESTS_COUNT: str( + len(self.new_manifests) + len(self.added_manifests) + len(self.rewritten_added_manifests) + ), + self.KEPT_MANIFESTS_COUNT: str(len(self.kept_manifests)), + self.REPLACED_MANIFESTS_COUNT: str(len(self.rewritten_manifests) + len(self.deleted_manifests)), + self.PROCESSED_ENTRY_COUNT: str(self.entry_count), + } + return super()._summary(summary) + + def delete_manifest(self, manifest): + self.deleted_manifests.add(manifest) + return self + + def requires_rewrite(self, current_manifests): + + if not self.rewritten_manifests: + # nothing yet processed so perform a full rewrite + return True + + # if any processed manifest is not in the current manifest list, perform a full rewrite + return any(manifest not in current_manifests for manifest in self.rewritten_manifests) + + def keep_active_manifests(self, current_manifests: List[ManifestFile]) -> None: + # keep any existing manifests as-is that were not processed + self.kept_manifests.clear() + for manifest in current_manifests: + if manifest not in self.rewritten_manifests and manifest not in self.deleted_manifests: + self.kept_manifests.add(manifest) + + def active_files_count(self, manifests): + active_files_count = 0 + + for manifest in manifests: + if manifest.added_files_count is None: + raise ValueError("Missing file counts in {}".format(manifest.path())) + if manifest.existing_files_count is None: + raise ValueError("Missing file counts in {}".format(manifest.path())) + active_files_count += manifest.added_files_count + active_files_count += manifest.existing_files_count + + return active_files_count + + def validate_files_counts(self): + created_manifests = itertools.chain(self.new_manifests, self.added_manifests, self.rewritten_added_manifests) + created_manifests_files_count = self.active_files_count(created_manifests) + + replaced_manifests = itertools.chain(self.rewritten_manifests, self.deleted_manifests) + replaced_manifests_files_count = self.active_files_count(replaced_manifests) + + if created_manifests_files_count != replaced_manifests_files_count: + raise ValueError( + "Replaced and created manifests must have the same number of active files: {} (new), {} (old)".format( + created_manifests_files_count, replaced_manifests_files_count + ) + ) + + def _existing_manifests(self) -> List[ManifestFile]: + """Returns the list of manifests to include in the next snapshot.""" + return self.apply() + + def _deleted_entries(self) -> List[ManifestEntry]: + """No entries are deleted during manifest rewriting.""" + return [] + def apply(self) -> List[ManifestFile]: + snapshot = self._transaction.table_metadata.current_snapshot() + current_manifests = snapshot.manifests(io=self._io) + current_manifest_set = set(current_manifests) + data_manifest_merge_manager = _ManifestMergeManager( + target_size_bytes=self.manifest_target_size_bytes, + min_count_to_merge=2, + merge_enabled=True, + snapshot_producer=self, + ) + # self.validate_deleted_manifests(current_manifest_set, base.current_snapshot().snapshot_id()) + + if self.requires_rewrite(current_manifest_set): + new_manifests = data_manifest_merge_manager.merge_manifests(manifests=current_manifests) + # self.rewritten_manifests.add(new_manifests) + self.rewritten_manifests.update(new_manifests) + else: + self.keep_active_manifests(current_manifests) + + self.validate_files_counts() + + new_manifests_with_metadata = [ + manifest.copy_with_snapshot_id(self.snapshot_id) + for manifest in itertools.chain(self.new_manifests, self.added_manifests, self.rewritten_added_manifests) + ] + + # put new manifests at the beginning + applyi = list(new_manifests_with_metadata) + applyi.extend(self.kept_manifests) + + return applyi + + # def execute(self, manifests: List[ManifestFile]) -> List[ManifestFile]: + # data_manifest_merge_manager = _ManifestMergeManager( + # target_size_bytes=self.manifest_target_size_bytes, + # min_count_to_merge=2, + # merge_enabled=True, + # snapshot_producer=self, + # ) + # return data_manifest_merge_manager.merge_manifests(manifests=manifests) + + def add_manifest(self, manifest): + if manifest.has_added_files(): + raise ValueError("Cannot add manifest with added files") + if manifest.has_deleted_files(): + raise ValueError("Cannot add manifest with deleted files") + if manifest.snapshot_id() is not None and manifest.snapshot_id() != -1: + raise ValueError("Snapshot id must be assigned during commit") + if manifest.sequence_number() != -1: + raise ValueError("Sequence must be assigned during commit") + + if manifest.snapshot_id() is None: + self.added_manifests.append(manifest) + else: + copied_manifest = self.copy_manifest(manifest) + self.rewritten_added_manifests.append(copied_manifest) + + return self + + class _FastAppendFiles(_SnapshotProducer["_FastAppendFiles"]): def _existing_manifests(self) -> List[ManifestFile]: """To determine if there are any existing manifest files. @@ -483,12 +664,12 @@ class _MergeAppendFiles(_FastAppendFiles): _merge_enabled: bool def __init__( - self, - operation: Operation, - transaction: Transaction, - io: FileIO, - commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT, + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: from pyiceberg.table import TableProperties @@ -541,7 +722,8 @@ def _existing_manifests(self) -> List[ManifestFile]: if snapshot := self._transaction.table_metadata.current_snapshot(): for manifest_file in snapshot.manifests(io=self._io): entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True) - found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files] + found_deleted_data_files = [entry.data_file for entry in entries if + entry.data_file in self._deleted_data_files] if len(found_deleted_data_files) == 0: existing_files.append(manifest_file) @@ -549,11 +731,11 @@ def _existing_manifests(self) -> List[ManifestFile]: # We have to rewrite the manifest file without the deleted data files if any(entry.data_file not in found_deleted_data_files for entry in entries): with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, ) as writer: [ writer.add_entry( @@ -617,12 +799,19 @@ def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Di def fast_append(self) -> _FastAppendFiles: return _FastAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io, + snapshot_properties=self._snapshot_properties ) def merge_append(self) -> _MergeAppendFiles: return _MergeAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io, + snapshot_properties=self._snapshot_properties + ) + + def rewrite(self) -> _RewriteManifests: + return _RewriteManifests( + transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties ) def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles: @@ -652,7 +841,8 @@ class _ManifestMergeManager(Generic[U]): _snapshot_producer: _SnapshotProducer[U] def __init__( - self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _SnapshotProducer[U] + self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, + snapshot_producer: _SnapshotProducer[U] ) -> None: self._target_size_bytes = target_size_bytes self._min_count_to_merge = min_count_to_merge @@ -681,8 +871,10 @@ def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> Ma return writer.to_manifest_file() - def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: List[ManifestFile]) -> List[ManifestFile]: - packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False) + def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: List[ManifestFile]) -> List[ + ManifestFile]: + packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, + largest_bin_first=False) bins: List[List[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length) def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]: @@ -704,7 +896,8 @@ def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]: # for consistent ordering, we need to maintain future order futures_index = {f: i for i, f in enumerate(futures)} - completed_futures: SortedList[Future[List[ManifestFile]]] = SortedList(iterable=[], key=lambda f: futures_index[f]) + completed_futures: SortedList[Future[List[ManifestFile]]] = SortedList(iterable=[], + key=lambda f: futures_index[f]) for future in concurrent.futures.as_completed(futures): completed_futures.add(future) @@ -772,12 +965,12 @@ def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[i return self def create_branch( - self, - snapshot_id: int, - branch_name: str, - max_ref_age_ms: Optional[int] = None, - max_snapshot_age_ms: Optional[int] = None, - min_snapshots_to_keep: Optional[int] = None, + self, + snapshot_id: int, + branch_name: str, + max_ref_age_ms: Optional[int] = None, + max_snapshot_age_ms: Optional[int] = None, + min_snapshots_to_keep: Optional[int] = None, ) -> ManageSnapshots: """ Create a new branch pointing to the given snapshot id. diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 75fe92a69a..b9a1b73345 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -938,3 +938,48 @@ def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, fo lhs = spark.table(f"{identifier}.all_manifests").toPandas() rhs = df.to_pandas() assert_frame_equal(lhs, rhs, check_dtype=False) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_all_example(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + from pandas.testing import assert_frame_equal + + identifier = "default.table_metadata_all_manifests" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + id int, + data string + ) + PARTITIONED BY (data) + TBLPROPERTIES ('write.update.mode'='merge-on-read', + 'write.delete.mode'='merge-on-read') + """ + ) + tbl = session_catalog.load_table(identifier) + # check all_manifests when there are no snapshots + lhs = tbl.inspect.all_manifests().to_pandas() + rhs = spark.table(f"{identifier}.all_manifests").toPandas() + assert_frame_equal(lhs, rhs, check_dtype=False) + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") + + spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") + + spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") + + spark.sql(f"DELETE FROM {identifier} WHERE id = 2") + + spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')") + + tbl.refresh() + + + tbl.rewrite_manifests() + print("efd") \ No newline at end of file From 3915bf52ce5243aae0d5e63beb5730854afa8208 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Thu, 13 Feb 2025 22:19:36 +0200 Subject: [PATCH 2/7] initial implementation of rewrite_manifests --- pyiceberg/manifest.py | 3 + pyiceberg/table/__init__.py | 84 +--- pyiceberg/table/snapshots.py | 2 +- pyiceberg/table/update/snapshot.py | 409 +++++++----------- tests/integration/test_inspect_table.py | 45 -- .../test_writes/test_rewrite_manifests.py | 116 +++++ 6 files changed, 291 insertions(+), 368 deletions(-) create mode 100644 tests/integration/test_writes/test_rewrite_manifests.py diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 92247c98f8..3a39ac4289 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -630,6 +630,7 @@ def copy_with_snapshot_id(self, snapshot_id: int) -> ManifestFile: partitions=self.partitions, key_metadata=self.key_metadata, ) + def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List[ManifestEntry]: """ Read the manifest entries from the manifest file. @@ -653,10 +654,12 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List for entry in reader if not discard_deleted or entry.status != ManifestEntryStatus.DELETED ] + def __hash__(self) -> int: """Return the hash of the file path.""" return hash(self.manifest_path) + @cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list)) def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]: """Read and cache manifests from the given manifest list, returning a tuple to prevent modification.""" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index d3d9e12109..7af0f8afe2 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -112,12 +112,7 @@ update_table_metadata, ) from pyiceberg.table.update.schema import UpdateSchema -from pyiceberg.table.update.snapshot import ( - ManageSnapshots, - UpdateSnapshot, - _FastAppendFiles, - _ManifestMergeManager, -) +from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles, RewriteManifestsResult from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics from pyiceberg.transforms import IdentityTransform @@ -136,7 +131,7 @@ ) from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config -from pyiceberg.utils.properties import property_as_bool, property_as_int +from pyiceberg.utils.properties import property_as_bool if TYPE_CHECKING: import daft @@ -421,29 +416,11 @@ def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> U A new UpdateSnapshot """ return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties) - def rewrite_manifests(self) -> None: - - # snapshot = self._table.current_snapshot() - # manifests = [] - # for manifest in snapshot.manifests(self.io): - # if manifest.content == ManifestContent.DATA: - # manifests.append(manifest) + def rewrite_manifests(self, spec_id: Optional[int] = None) -> None: with self.update_snapshot().rewrite() as rewrite: - rewrite.commit() - - # data_manifest_merge_manager = _ManifestMergeManager( - # target_size_bytes=property_as_int( - # self.properties, - # TableProperties.MANIFEST_TARGET_SIZE_BYTES, - # TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, - # ), - # min_count_to_merge=2, - # merge_enabled=True, - # snapshot_producer=self, - # ) - - # data_manifest_merge_manager.merge_manifests(manifests) + rewrite.rewrite_manifests() + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Shorthand API for appending a PyArrow table to a table transaction. @@ -1107,9 +1084,6 @@ def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() - def rewrite_manifests(self) -> None: - with self.transaction() as tx: - tx.rewrite_manifests() def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Shorthand API for appending a PyArrow table to the table. @@ -1197,49 +1171,17 @@ def add_files( def rewrite_manifests( self, spec_id: Optional[int] = None, - rewrite_all: bool = False, - max_manifest_size: Optional[int] = None, - ) -> "Table": - - with self.transaction() as tx: - tx.rewrite_manifests() - ... - """Rewrite manifests in the table. + ) -> RewriteManifestsResult: + """ + Shorthand API for Rewriting manifests for the table. Args: - spec_id: Spec ID to be used for the rewritten manifests - rewrite_all: If True, rewrite all manifests. If False, only rewrite small manifests - max_manifest_size: Target size for manifests in bytes + spec_id: Spec id of the manifests to rewrite (defaults to current spec id) - Returns: - An updated version of the table with rewritten manifests - #""" - # return RewriteManifests( - # self, - # spec_id=spec_id, - # rewrite_all=rewrite_all, - # max_manifest_size=max_manifest_size, - # ).commit() - - # snapshot = self.current_snapshot() - # manifests = [] - # for manifest in snapshot.manifests(self.io): - # if manifest.content == ManifestContent.DATA: - # manifests.append(manifest) - # - # data_manifest_merge_manager = _ManifestMergeManager( - # target_size_bytes=property_as_int( - # self.properties, - # TableProperties.MANIFEST_TARGET_SIZE_BYTES, - # TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, - # ), - # min_count_to_merge=2, - # merge_enabled=True, - # snapshot_producer=self, - # ) - # - # data_manifest_merge_manager.merge_manifests(manifests) - # entries = self.inspect.entries().filter("status < 2").selectExpr("input_file_name() as manifest") + """ + with self.transaction() as tx: + rewrite_results = tx.rewrite_manifests(spec_id=spec_id) + return rewrite_results def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index a5515f12b0..020a452e85 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -352,7 +352,7 @@ def get_prop(prop: str) -> int: def update_snapshot_summaries( summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False ) -> Summary: - if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}: + if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE, Operation.REPLACE}: raise ValueError(f"Operation not implemented: {summary.operation}") if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 0cafd4f2fa..568643ab3e 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -22,6 +22,7 @@ from abc import abstractmethod from collections import defaultdict from concurrent.futures import Future +from dataclasses import dataclass from functools import cached_property from typing import TYPE_CHECKING, Callable, Dict, Generic, List, Optional, Set, Tuple @@ -81,7 +82,7 @@ from pyiceberg.utils.properties import property_as_bool, property_as_int if TYPE_CHECKING: - from pyiceberg.table import Transaction + from pyiceberg.table import Table, Transaction def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: @@ -105,12 +106,12 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _deleted_data_files: Set[DataFile] def __init__( - self, - operation: Operation, - transaction: Transaction, - io: FileIO, - commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT, + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() @@ -135,12 +136,10 @@ def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: return self @abstractmethod - def _deleted_entries(self) -> List[ManifestEntry]: - ... + def _deleted_entries(self) -> List[ManifestEntry]: ... @abstractmethod - def _existing_manifests(self) -> List[ManifestFile]: - ... + def _existing_manifests(self) -> List[ManifestFile]: ... def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]: """To perform any post-processing on the manifests before writing them to the new snapshot.""" @@ -150,11 +149,11 @@ def _manifests(self) -> List[ManifestFile]: def _write_added_manifest() -> List[ManifestFile]: if self._added_data_files: with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.spec(), - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.spec(), + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, ) as writer: for data_file in self._added_data_files: writer.add( @@ -180,11 +179,11 @@ def _write_delete_manifest() -> List[ManifestFile]: partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry) for spec_id, entries in partition_groups.items(): with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.specs()[spec_id], - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, ) as writer: for entry in entries: writer.add_entry(entry) @@ -199,8 +198,7 @@ def _write_delete_manifest() -> List[ManifestFile]: delete_manifests = executor.submit(_write_delete_manifest) existing_manifests = executor.submit(self._existing_manifests) - return self._process_manifests( - added_manifests.result() + delete_manifests.result() + existing_manifests.result()) + return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result()) def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: from pyiceberg.table import TableProperties @@ -254,11 +252,11 @@ def _commit(self) -> UpdatesAndRequirements: commit_uuid=self.commit_uuid, ) with write_manifest_list( - format_version=self._transaction.table_metadata.format_version, - output_file=self._io.new_output(manifest_list_file_path), - snapshot_id=self._snapshot_id, - parent_snapshot_id=self._parent_snapshot_id, - sequence_number=next_sequence_number, + format_version=self._transaction.table_metadata.format_version, + output_file=self._io.new_output(manifest_list_file_path), + snapshot_id=self._snapshot_id, + parent_snapshot_id=self._parent_snapshot_id, + sequence_number=next_sequence_number, ) as writer: writer.add_manifests(new_manifests) @@ -275,8 +273,7 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", - type="branch" + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch" ), ), (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), @@ -325,12 +322,12 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]): _case_sensitive: bool def __init__( - self, - operation: Operation, - transaction: Transaction, - io: FileIO, - commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT, + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ): super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) self._predicate = AlwaysFalse() @@ -383,8 +380,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ) manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, - case_sensitive=self._case_sensitive).eval + strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( schema, self._predicate, case_sensitive=self._case_sensitive ).eval @@ -420,11 +416,11 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # Rewrite the manifest if len(existing_entries) > 0: with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, ) as writer: for existing_entry in existing_entries: writer.add_entry(existing_entry) @@ -453,182 +449,6 @@ def files_affected(self) -> bool: return len(self._deleted_entries()) > 0 -class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): - """Rewrite manifest files based on the predicate.""" - - KEPT_MANIFESTS_COUNT = "manifests-kept" - CREATED_MANIFESTS_COUNT = "manifests-created" - REPLACED_MANIFESTS_COUNT = "manifests-replaced" - PROCESSED_ENTRY_COUNT = "entries-processed" - - def __init__( - self, - transaction: Transaction, - io: FileIO, - commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT, - ): - from pyiceberg.table import TableProperties - - super().__init__(Operation.REPLACE, transaction, io, commit_uuid, snapshot_properties) - - self.specs_by_id = self._transaction.table_metadata.spec().spec_id - self.manifest_target_size_bytes = property_as_int( - self._transaction.table_metadata.properties, - TableProperties.MANIFEST_TARGET_SIZE_BYTES, - TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, - ) - self.deleted_manifests: Set[ManifestFile] = set() - self.added_manifests: List[ManifestFile] = [] - self.rewritten_added_manifests: List[ManifestFile] = [] - self.kept_manifests: Set[ManifestFile] = set() - self.new_manifests: Set[ManifestFile] = set() - self.rewritten_manifests: Set[ManifestFile] = set() - - def copy_manifest(self, manifest: ManifestFile) -> ManifestFile: - return ManifestFile( - manifest_path=manifest.manifest_path, - manifest_length=manifest.manifest_length, - partition_spec_id=manifest.partition_spec_id, - content=manifest.content, - sequence_number=manifest.sequence_number, - min_sequence_number=manifest.min_sequence_number, - added_snapshot_id=manifest.added_snapshot_id, - added_files_count=manifest.added_files_count, - existing_files_count=manifest.existing_files_count, - deleted_files_count=manifest.deleted_files_count, - added_rows_count=manifest.added_rows_count, - existing_rows_count=manifest.existing_rows_count, - deleted_rows_count=manifest.deleted_rows_count, - partitions=manifest.partitions, - key_metadata=manifest.key_metadata, - ) - - def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: - summary = { - self.CREATED_MANIFESTS_COUNT: str( - len(self.new_manifests) + len(self.added_manifests) + len(self.rewritten_added_manifests) - ), - self.KEPT_MANIFESTS_COUNT: str(len(self.kept_manifests)), - self.REPLACED_MANIFESTS_COUNT: str(len(self.rewritten_manifests) + len(self.deleted_manifests)), - self.PROCESSED_ENTRY_COUNT: str(self.entry_count), - } - return super()._summary(summary) - - def delete_manifest(self, manifest): - self.deleted_manifests.add(manifest) - return self - - def requires_rewrite(self, current_manifests): - - if not self.rewritten_manifests: - # nothing yet processed so perform a full rewrite - return True - - # if any processed manifest is not in the current manifest list, perform a full rewrite - return any(manifest not in current_manifests for manifest in self.rewritten_manifests) - - def keep_active_manifests(self, current_manifests: List[ManifestFile]) -> None: - # keep any existing manifests as-is that were not processed - self.kept_manifests.clear() - for manifest in current_manifests: - if manifest not in self.rewritten_manifests and manifest not in self.deleted_manifests: - self.kept_manifests.add(manifest) - - def active_files_count(self, manifests): - active_files_count = 0 - - for manifest in manifests: - if manifest.added_files_count is None: - raise ValueError("Missing file counts in {}".format(manifest.path())) - if manifest.existing_files_count is None: - raise ValueError("Missing file counts in {}".format(manifest.path())) - active_files_count += manifest.added_files_count - active_files_count += manifest.existing_files_count - - return active_files_count - - def validate_files_counts(self): - created_manifests = itertools.chain(self.new_manifests, self.added_manifests, self.rewritten_added_manifests) - created_manifests_files_count = self.active_files_count(created_manifests) - - replaced_manifests = itertools.chain(self.rewritten_manifests, self.deleted_manifests) - replaced_manifests_files_count = self.active_files_count(replaced_manifests) - - if created_manifests_files_count != replaced_manifests_files_count: - raise ValueError( - "Replaced and created manifests must have the same number of active files: {} (new), {} (old)".format( - created_manifests_files_count, replaced_manifests_files_count - ) - ) - - def _existing_manifests(self) -> List[ManifestFile]: - """Returns the list of manifests to include in the next snapshot.""" - return self.apply() - - def _deleted_entries(self) -> List[ManifestEntry]: - """No entries are deleted during manifest rewriting.""" - return [] - def apply(self) -> List[ManifestFile]: - snapshot = self._transaction.table_metadata.current_snapshot() - current_manifests = snapshot.manifests(io=self._io) - current_manifest_set = set(current_manifests) - data_manifest_merge_manager = _ManifestMergeManager( - target_size_bytes=self.manifest_target_size_bytes, - min_count_to_merge=2, - merge_enabled=True, - snapshot_producer=self, - ) - # self.validate_deleted_manifests(current_manifest_set, base.current_snapshot().snapshot_id()) - - if self.requires_rewrite(current_manifest_set): - new_manifests = data_manifest_merge_manager.merge_manifests(manifests=current_manifests) - # self.rewritten_manifests.add(new_manifests) - self.rewritten_manifests.update(new_manifests) - else: - self.keep_active_manifests(current_manifests) - - self.validate_files_counts() - - new_manifests_with_metadata = [ - manifest.copy_with_snapshot_id(self.snapshot_id) - for manifest in itertools.chain(self.new_manifests, self.added_manifests, self.rewritten_added_manifests) - ] - - # put new manifests at the beginning - applyi = list(new_manifests_with_metadata) - applyi.extend(self.kept_manifests) - - return applyi - - # def execute(self, manifests: List[ManifestFile]) -> List[ManifestFile]: - # data_manifest_merge_manager = _ManifestMergeManager( - # target_size_bytes=self.manifest_target_size_bytes, - # min_count_to_merge=2, - # merge_enabled=True, - # snapshot_producer=self, - # ) - # return data_manifest_merge_manager.merge_manifests(manifests=manifests) - - def add_manifest(self, manifest): - if manifest.has_added_files(): - raise ValueError("Cannot add manifest with added files") - if manifest.has_deleted_files(): - raise ValueError("Cannot add manifest with deleted files") - if manifest.snapshot_id() is not None and manifest.snapshot_id() != -1: - raise ValueError("Snapshot id must be assigned during commit") - if manifest.sequence_number() != -1: - raise ValueError("Sequence must be assigned during commit") - - if manifest.snapshot_id() is None: - self.added_manifests.append(manifest) - else: - copied_manifest = self.copy_manifest(manifest) - self.rewritten_added_manifests.append(copied_manifest) - - return self - - class _FastAppendFiles(_SnapshotProducer["_FastAppendFiles"]): def _existing_manifests(self) -> List[ManifestFile]: """To determine if there are any existing manifest files. @@ -658,18 +478,32 @@ def _deleted_entries(self) -> List[ManifestEntry]: return [] +@dataclass(init=False) +class RewriteManifestsResult: + rewritten_manifests: List[ManifestFile] + added_manifests: List[ManifestFile] + + def __init__( + self, + rewritten_manifests: Optional[List[ManifestFile]], + added_manifests: Optional[List[ManifestFile]], + ) -> None: + self.rewritten_manifests = rewritten_manifests or [] + self.added_manifests = added_manifests or [] + + class _MergeAppendFiles(_FastAppendFiles): _target_size_bytes: int _min_count_to_merge: int _merge_enabled: bool def __init__( - self, - operation: Operation, - transaction: Transaction, - io: FileIO, - commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT, + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: from pyiceberg.table import TableProperties @@ -709,6 +543,83 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _target_size_bytes: int + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + _table: Table + _spec: PartitionSpec + + super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) + self._target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._table = table + self._spec_id = spec_id + + def rewrite_manifests(self) -> RewriteManifestsResult: + data_result = self._find_matching_manifests(ManifestContent.DATA) + self.rewritten_manifests.extend(data_result.rewritten_manifests) + self.added_manifests.extend(data_result.added_manifests) + + deletes_result = self._find_matching_manifests(ManifestContent.DELETES) + self.rewritten_manifests.extend(deletes_result.rewritten_manifests) + self.added_manifests.extend(deletes_result.added_manifests) + + if not self.rewritten_manifests: + return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) + + return RewriteManifestsResult(rewritten_manifests=self.rewritten_manifests, added_manifests=self.added_manifests) + + def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifestsResult: + snapshot = self._table.current_snapshot() + if self._spec_id and self._spec_id not in self._table.specs(): + raise ValueError(f"Cannot find spec with id: {self._spec_id}") + + if not snapshot: + raise ValueError("Cannot rewrite manifests without a current snapshot") + + manifests = [ + manifest + for manifest in snapshot.manifests(io=self._io) + if manifest.partition_spec_id == self._spec_id and manifest.content == content + ] + + data_manifest_merge_manager = _ManifestMergeManager( + target_size_bytes=self._target_size_bytes, + min_count_to_merge=2, + merge_enabled=True, + snapshot_producer=self, + ) + new_manifests = data_manifest_merge_manager.merge_manifests(manifests=manifests) + + return RewriteManifestsResult(rewritten_manifests=manifests, added_manifests=new_manifests) + + def _existing_manifests(self) -> List[ManifestFile]: + """Determine if there are any existing manifest files.""" + return [] + + def _deleted_entries(self) -> List[ManifestEntry]: + """To determine if we need to record any deleted manifest entries. + + In case of an append, nothing is deleted. + """ + return [] + + class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]): """Overwrites data from the table. This will produce an OVERWRITE snapshot. @@ -722,8 +633,7 @@ def _existing_manifests(self) -> List[ManifestFile]: if snapshot := self._transaction.table_metadata.current_snapshot(): for manifest_file in snapshot.manifests(io=self._io): entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True) - found_deleted_data_files = [entry.data_file for entry in entries if - entry.data_file in self._deleted_data_files] + found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files] if len(found_deleted_data_files) == 0: existing_files.append(manifest_file) @@ -731,11 +641,11 @@ def _existing_manifests(self) -> List[ManifestFile]: # We have to rewrite the manifest file without the deleted data files if any(entry.data_file not in found_deleted_data_files for entry in entries): with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, ) as writer: [ writer.add_entry( @@ -799,19 +709,20 @@ def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Di def fast_append(self) -> _FastAppendFiles: return _FastAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, - snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties ) def merge_append(self) -> _MergeAppendFiles: return _MergeAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, - snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties ) def rewrite(self) -> _RewriteManifests: return _RewriteManifests( - transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + table=self._transaction._table, + transaction=self._transaction, + io=self._io, + snapshot_properties=self._snapshot_properties, ) def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles: @@ -841,8 +752,7 @@ class _ManifestMergeManager(Generic[U]): _snapshot_producer: _SnapshotProducer[U] def __init__( - self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, - snapshot_producer: _SnapshotProducer[U] + self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _SnapshotProducer[U] ) -> None: self._target_size_bytes = target_size_bytes self._min_count_to_merge = min_count_to_merge @@ -871,10 +781,8 @@ def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> Ma return writer.to_manifest_file() - def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: List[ManifestFile]) -> List[ - ManifestFile]: - packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, - largest_bin_first=False) + def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: List[ManifestFile]) -> List[ManifestFile]: + packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False) bins: List[List[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length) def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]: @@ -896,8 +804,7 @@ def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]: # for consistent ordering, we need to maintain future order futures_index = {f: i for i, f in enumerate(futures)} - completed_futures: SortedList[Future[List[ManifestFile]]] = SortedList(iterable=[], - key=lambda f: futures_index[f]) + completed_futures: SortedList[Future[List[ManifestFile]]] = SortedList(iterable=[], key=lambda f: futures_index[f]) for future in concurrent.futures.as_completed(futures): completed_futures.add(future) @@ -965,12 +872,12 @@ def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[i return self def create_branch( - self, - snapshot_id: int, - branch_name: str, - max_ref_age_ms: Optional[int] = None, - max_snapshot_age_ms: Optional[int] = None, - min_snapshots_to_keep: Optional[int] = None, + self, + snapshot_id: int, + branch_name: str, + max_ref_age_ms: Optional[int] = None, + max_snapshot_age_ms: Optional[int] = None, + min_snapshots_to_keep: Optional[int] = None, ) -> ManageSnapshots: """ Create a new branch pointing to the given snapshot id. diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index b9a1b73345..75fe92a69a 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -938,48 +938,3 @@ def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, fo lhs = spark.table(f"{identifier}.all_manifests").toPandas() rhs = df.to_pandas() assert_frame_equal(lhs, rhs, check_dtype=False) - - -@pytest.mark.integration -@pytest.mark.parametrize("format_version", [1, 2]) -def test_inspect_all_example(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: - from pandas.testing import assert_frame_equal - - identifier = "default.table_metadata_all_manifests" - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - spark.sql( - f""" - CREATE TABLE {identifier} ( - id int, - data string - ) - PARTITIONED BY (data) - TBLPROPERTIES ('write.update.mode'='merge-on-read', - 'write.delete.mode'='merge-on-read') - """ - ) - tbl = session_catalog.load_table(identifier) - # check all_manifests when there are no snapshots - lhs = tbl.inspect.all_manifests().to_pandas() - rhs = spark.table(f"{identifier}.all_manifests").toPandas() - assert_frame_equal(lhs, rhs, check_dtype=False) - - spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") - - spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") - - spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") - - spark.sql(f"DELETE FROM {identifier} WHERE id = 2") - - spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')") - - tbl.refresh() - - - tbl.rewrite_manifests() - print("efd") \ No newline at end of file diff --git a/tests/integration/test_writes/test_rewrite_manifests.py b/tests/integration/test_writes/test_rewrite_manifests.py new file mode 100644 index 0000000000..6d0d753227 --- /dev/null +++ b/tests/integration/test_writes/test_rewrite_manifests.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name + +import pyarrow as pa +import pytest +from pyspark.sql import SparkSession + +from pyiceberg.catalog import Catalog +from utils import _create_table + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v1_without_data" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_without_data]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_only_nulls" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_only_nulls]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v2_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v2_without_data" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_without_data]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v2_with_only_nulls" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_only_nulls]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v2_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_with_null]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_v2_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + with tbl.transaction() as tx: + tx.upgrade_table_version(format_version=2) + + tbl.append(arrow_table_with_null) + + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.mark.integration +def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_summaries" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + + # tbl.rewrite_manifests() + + # records1 = [ThreeColumnRecord(1, None, "AAAA")] + # write_records(spark, table_location, records1) + + tbl.refresh() + manifests = tbl.inspect.all_manifests().to_pylist() + assert len(manifests) == 3, "Should have 3 manifests before rewrite" + + result = tbl.rewrite_manifests() + tbl.refresh() + manifests = tbl.inspect.all_manifests().to_pylist() + assert len(manifests) == 1, "Should have 1 manifests before rewrite" From b434828abed724ee3994fe551f0b3819f403f4f3 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Thu, 13 Feb 2025 22:21:07 +0200 Subject: [PATCH 3/7] fix lint --- pyiceberg/table/__init__.py | 7 +++---- tests/integration/test_writes/test_rewrite_manifests.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 7af0f8afe2..bf780cfeca 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -112,7 +112,7 @@ update_table_metadata, ) from pyiceberg.table.update.schema import UpdateSchema -from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles, RewriteManifestsResult +from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics from pyiceberg.transforms import IdentityTransform @@ -1171,7 +1171,7 @@ def add_files( def rewrite_manifests( self, spec_id: Optional[int] = None, - ) -> RewriteManifestsResult: + ) -> None: """ Shorthand API for Rewriting manifests for the table. @@ -1180,8 +1180,7 @@ def rewrite_manifests( """ with self.transaction() as tx: - rewrite_results = tx.rewrite_manifests(spec_id=spec_id) - return rewrite_results + tx.rewrite_manifests(spec_id=spec_id) def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) diff --git a/tests/integration/test_writes/test_rewrite_manifests.py b/tests/integration/test_writes/test_rewrite_manifests.py index 6d0d753227..1946ef542d 100644 --- a/tests/integration/test_writes/test_rewrite_manifests.py +++ b/tests/integration/test_writes/test_rewrite_manifests.py @@ -110,7 +110,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi manifests = tbl.inspect.all_manifests().to_pylist() assert len(manifests) == 3, "Should have 3 manifests before rewrite" - result = tbl.rewrite_manifests() + tbl.rewrite_manifests() tbl.refresh() manifests = tbl.inspect.all_manifests().to_pylist() assert len(manifests) == 1, "Should have 1 manifests before rewrite" From bf337160cf285feb4bc1d8b866fdc88128b096d4 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Thu, 13 Feb 2025 22:42:52 +0200 Subject: [PATCH 4/7] remove unused code --- pyiceberg/manifest.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 3a39ac4289..a3387148b4 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -612,25 +612,6 @@ def has_added_files(self) -> bool: def has_existing_files(self) -> bool: return self.existing_files_count is None or self.existing_files_count > 0 - def copy_with_snapshot_id(self, snapshot_id: int) -> ManifestFile: - return ManifestFile( - manifest_path=self.manifest_path, - manifest_length=self.manifest_length, - partition_spec_id=self.partition_spec_id, - content=self.content, - sequence_number=self.sequence_number, - min_sequence_number=self.min_sequence_number, - added_snapshot_id=snapshot_id, - added_files_count=self.added_files_count, - existing_files_count=self.existing_files_count, - deleted_files_count=self.deleted_files_count, - added_rows_count=self.added_rows_count, - existing_rows_count=self.existing_rows_count, - deleted_rows_count=self.deleted_rows_count, - partitions=self.partitions, - key_metadata=self.key_metadata, - ) - def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List[ManifestEntry]: """ Read the manifest entries from the manifest file. From 76740df7443e429cd852f0bbdc5ee60c73ded195 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Mon, 17 Feb 2025 19:34:19 +0200 Subject: [PATCH 5/7] commit --- pyiceberg/table/__init__.py | 3 + pyiceberg/table/update/snapshot.py | 299 ++++++++++++++++++++++++++++- 2 files changed, 300 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index bf780cfeca..c15621dd16 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -219,6 +219,9 @@ class TableProperties: MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep" MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1 + SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled" + SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = False + class Transaction: _table: Table diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 568643ab3e..896d064eb1 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -568,7 +568,7 @@ def __init__( TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, ) # type: ignore self._table = table - self._spec_id = spec_id + self._spec_id = spec_id or table.spec().spec_id def rewrite_manifests(self) -> RewriteManifestsResult: data_result = self._find_matching_manifests(ManifestContent.DATA) @@ -608,9 +608,33 @@ def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifests return RewriteManifestsResult(rewritten_manifests=manifests, added_manifests=new_manifests) + def _copy_manifest_file(self, manifest_file:ManifestFile, snapshot_id:int) -> ManifestFile: + return ManifestFile( + manifest_path=manifest_file.manifest_path, + manifest_length=manifest_file.manifest_length, + partition_spec_id=manifest_file.partition_spec_id, + content=manifest_file.content, + sequence_number=manifest_file.sequence_number, + min_sequence_number=manifest_file.min_sequence_number, + added_snapshot_id=snapshot_id, + added_files_count=manifest_file.added_files_count, + existing_files_count=manifest_file.existing_files_count, + deleted_files_count=manifest_file.deleted_files_count, + added_rows_count=manifest_file.added_rows_count, + existing_rows_count=manifest_file.existing_rows_count, + deleted_rows_count=manifest_file.deleted_rows_count, + partitions=manifest_file.partitions, + key_metadata=manifest_file.key_metadata, + ) + def _existing_manifests(self) -> List[ManifestFile]: """Determine if there are any existing manifest files.""" - return [] + new_manifests = [ + self._copy_manifest_file(manifest, self.snapshot_id) + for manifest in self.added_manifests + self.rewritten_manifests + ] + return new_manifests + # return [] def _deleted_entries(self) -> List[ManifestEntry]: """To determine if we need to record any deleted manifest entries. @@ -620,6 +644,277 @@ def _deleted_entries(self) -> List[ManifestEntry]: return [] +# class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): +# KEPT_MANIFESTS_COUNT = "manifests-kept" +# CREATED_MANIFESTS_COUNT = "manifests-created" +# REPLACED_MANIFESTS_COUNT = "manifests-replaced" +# PROCESSED_ENTRY_COUNT = "entries-processed" +# _target_size_bytes: int +# _min_count_to_merge: int +# +# def __init__( +# self, +# table: Table, +# transaction: Transaction, +# io: FileIO, +# commit_uuid: Optional[uuid.UUID] = None, +# snapshot_properties: Dict[str, str] = EMPTY_DICT, +# ) -> None: +# from pyiceberg.table import TableProperties +# +# super().__init__(Operation.REPLACE, transaction, io, commit_uuid, snapshot_properties) +# self._table = table +# self.specs_by_id = self._table.spec() # ops.current().specs_by_id() +# self.manifest_target_size_bytes = 8388608 # Default value +# self.deleted_manifests: Set[ManifestFile] = set() +# self.added_manifests: List[ManifestFile] = [] +# self.rewritten_added_manifests: List[ManifestFile] = [] +# self.kept_manifests: List[ManifestFile] = [] +# self.new_manifests: List[ManifestFile] = [] +# self.rewritten_manifests: Set[ManifestFile] = set() +# self.snapshot_Id_Inheritance_enabled = property_as_bool( +# self._transaction.table_metadata.properties, +# TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, +# TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT, +# ) # type: ignore +# self.can_inherit_snapshot_id = table.metadata.format_version > 1 or self.snapshot_Id_Inheritance_enabled +# # self.writers: Dict[Tuple[Any, int], 'WriterWrapper'] = {} +# self.entry_count = 0 +# self.cluster_by_func: Optional[Callable[[DataFile], Any]] = None +# self.predicate: Optional[Callable[[ManifestFile], bool]] = None +# # self.summary_builder = SnapshotSummary.Builder() +# # self.lock = threading.Lock() +# self._target_size_bytes = property_as_int( +# self._transaction.table_metadata.properties, +# TableProperties.MANIFEST_TARGET_SIZE_BYTES, +# TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, +# ) # type: ignore +# self._min_count_to_merge = property_as_int( +# self._transaction.table_metadata.properties, +# TableProperties.MANIFEST_MIN_MERGE_COUNT, +# TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT, +# ) # type: ignore +# +# # def cluster_by(self, func: Callable[[DataFile], Any]]) -> RewriteManifests: +# # self.cluster_by_func = func +# # return self +# +# # def rewrite_if(self, predicate: Callable[[ManifestFile], bool]]) -> RewriteManifests: +# # self.predicate = predicate +# # return self +# +# # def delete_manifest(self, manifest: ManifestFile) -> "_RewriteManifests": +# # self.deleted_manifests.add(manifest) +# # return self +# # +# # def add_manifest(self, manifest: ManifestFile) -> "_RewriteManifests": +# # if self.can_inherit_snapshot_id and manifest.added_snapshot_id is None: +# # self.added_manifests.append(manifest) +# # else: +# # copied_manifest = self.copy_manifest(manifest) +# # self.rewritten_added_manifests.append(copied_manifest) +# # return self +# +# # def copy_manifest(self, manifest: ManifestFile) -> ManifestFile: +# # """ +# # Copies a manifest file to a new location, updating its metadata (e.g., snapshot ID). +# # +# # Args: +# # manifest: The manifest file to copy. +# # +# # Returns: +# # A new ManifestFile object representing the copied manifest. +# # """ +# # # Get the current table metadata +# # current_metadata = self._table.metadata +# # +# # # Create an input file for the manifest to be copied +# # input_file = self.ops.io().new_input_file(manifest.path) +# # +# # # Create an output file for the new manifest +# # output_file = self.new_manifest_output_file() +# # +# # # Copy the manifest file, updating its metadata +# # new_manifest = ManifestFile( +# # format_version=current_metadata.format_version, +# # spec_id=manifest.partition_spec_id, +# # input_file=input_file, +# # specs_by_id=self.specs_by_id, +# # output_file=output_file, +# # snapshot_id=self.snapshot_id(), +# # summary_builder=self.summary_builder, +# # ) +# # +# # return new_manifest +# +# def _existing_manifests(self) -> List[ManifestFile]: +# """To determine if there are any existing manifest files. +# +# A fast append will add another ManifestFile to the ManifestList. +# All the existing manifest files are considered existing. +# """ +# existing_manifests = [] +# +# if self._parent_snapshot_id is not None: +# previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) +# +# if previous_snapshot is None: +# raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}") +# +# for manifest in previous_snapshot.manifests(io=self._io): +# if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id: +# existing_manifests.append(manifest) +# +# return existing_manifests +# def _deleted_entries(self) -> List[ManifestEntry]: +# """To determine if we need to record any deleted manifest entries. +# +# In case of an append, nothing is deleted. +# """ +# return [] +# +# +# def rewrite_manifests(self) -> List[ManifestFile]: +# snapshot = self._table.current_snapshot() +# if not snapshot: +# raise ValueError("Cannot rewrite manifests without a current snapshot") +# current_manifests = snapshot.manifests(self._io) +# current_manifest_set = set(current_manifests) +# self.validate_deleted_manifests(current_manifest_set, snapshot.snapshot_id) +# +# if self.requires_rewrite(current_manifest_set): +# self.perform_rewrite(current_manifests) +# else: +# self.keep_active_manifests(current_manifests) +# +# self.validate_files_counts() +# +# new_manifests = [ +# self.with_snapshot_id(manifest) +# for manifest in list(self.new_manifests) + self.added_manifests + self.rewritten_added_manifests +# ] +# return new_manifests + list(self.kept_manifests) +# +# def perform_rewrite(self, current_manifests: List[ManifestFile]) -> None: +# # self.reset() +# remaining_manifests = [m for m in current_manifests if m not in self.deleted_manifests] +# data_manifest_merge_manager = _ManifestMergeManager( +# target_size_bytes=self._target_size_bytes, +# min_count_to_merge=self._min_count_to_merge, +# merge_enabled=True, +# snapshot_producer=self, +# ) +# +# new_manifests = data_manifest_merge_manager.merge_manifests(remaining_manifests) +# self.new_manifests.extend(new_manifests) +# +# # def process_manifest(self, manifest: ManifestFile): +# # if not self.contains_deletes(manifest) and self.matches_predicate(manifest): +# # self.rewritten_manifests.add(manifest) +# # try: +# # reader = ManifestReader(manifest, self.ops.io(), self.ops.current().specs_by_id()) +# # for entry in reader.live_entries(): +# # self.append_entry(entry, self.cluster_by_func(entry.file), manifest.partition_spec_id) +# # reader.close() +# # except IOError as e: +# # raise RuntimeIOException from e +# # else: +# # self.kept_manifests.put(manifest) +# +# # def append_entry(self, entry: ManifestEntry, key: Any, partition_spec_id: int): +# # with self.lock: +# # writer = self.get_writer(key, partition_spec_id) +# # writer.add_entry(entry) +# # self.entry_count += 1 +# +# # def get_writer(self, key: Any, partition_spec_id: int) -> 'WriterWrapper': +# # return self.writers.setdefault((key, partition_spec_id), WriterWrapper(self, self.specs_by_id[partition_spec_id])) +# +# def validate_deleted_manifests(self, current_manifests: Set[ManifestFile], current_snapshot_id: int) -> None: +# for manifest in self.deleted_manifests: +# if manifest not in current_manifests: +# raise ValueError( +# f"Deleted manifest {manifest.manifest_path} could not be found in the latest snapshot {current_snapshot_id}" +# ) +# +# def requires_rewrite(self, current_manifests: Set[ManifestFile]) -> bool: +# # if self.cluster_by_func is None: +# # return False +# return len(self.rewritten_manifests) == 0 or any( +# manifest not in current_manifests for manifest in self.rewritten_manifests +# ) +# +# def keep_active_manifests(self, current_manifests: List[ManifestFile]) -> None: +# for manifest in current_manifests: +# if manifest not in self.rewritten_manifests and manifest not in self.deleted_manifests: +# self.kept_manifests.append(manifest) +# +# def validate_files_counts(self) -> None: +# created_manifests = list(self.new_manifests) + self.added_manifests + self.rewritten_added_manifests +# created_files_count = self.active_files_count(created_manifests) +# replaced_manifests = list(self.rewritten_manifests) + list(self.deleted_manifests) +# replaced_files_count = self.active_files_count(replaced_manifests) +# +# if created_files_count != replaced_files_count: +# raise ValueError( +# f"Replaced and created manifests must have the same number of active files: {created_files_count} (new), {replaced_files_count} (old)" +# ) +# +# def active_files_count(self, manifests: List[ManifestFile]) -> int: +# count = 0 +# for manifest in manifests: +# count += manifest.existing_files_count + manifest.added_files_count +# return count +# +# # def reset(self) -> None: +# # self.clean_uncommitted(self.new_manifests, set()) +# # self.entry_count = 0 +# # self.kept_manifests.clear() +# # self.rewritten_manifests.clear() +# # self.new_manifests.clear() +# # +# # def clean_uncommitted(self, manifests: List[ManifestFile], committed: Set[ManifestFile]) -> None: +# # for manifest in manifests: +# # if manifest not in committed: +# # self.delete_file(manifest.manifest_path) +# # +# # def delete_file(self, path: str): +# # # Mock implementation +# # if os.path.exists(path): +# # os.remove(path) +# +# def with_snapshot_id(self, manifest: ManifestFile) -> ManifestFile: +# # Mock implementation +# return ManifestFile(manifest.manifest_path, snapshot_id=0) +# +# def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: +# # Mock implementation +# return ManifestWriter(spec) +# +# # class WriterWrapper: +# # def __init__(self, outer: 'BaseRewriteManifests', spec: PartitionSpec): +# # self.outer = outer +# # self.spec = spec +# # self.writer: Optional[ManifestWriter] = None +# # self.lock = threading.Lock() +# # +# # def add_entry(self, entry: ManifestEntry): +# # with self.lock: +# # if self.writer is None or self.writer.length() >= self.outer.manifest_target_size_bytes: +# # self._close_writer() +# # self.writer = self.outer.new_manifest_writer(self.spec) +# # self.writer.existing(entry) +# # +# # def _close_writer(self): +# # if self.writer: +# # self.writer.close() +# # self.outer.new_manifests.put(self.writer.to_manifest_file()) +# # +# # def close(self): +# # with self.lock: +# # self._close_writer() + + class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]): """Overwrites data from the table. This will produce an OVERWRITE snapshot. From d705c76c399bed3edefe21c5ad37421e12a694ee Mon Sep 17 00:00:00 2001 From: amitgilad Date: Sun, 23 Feb 2025 09:05:46 +0200 Subject: [PATCH 6/7] 1. support returning rewrite results 2. write tests for rewrite manifests --- pyiceberg/table/__init__.py | 13 +- pyiceberg/table/update/snapshot.py | 320 +++--------------- .../test_writes/test_rewrite_manifests.py | 125 ++++++- 3 files changed, 162 insertions(+), 296 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c15621dd16..ad3af67683 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -112,7 +112,7 @@ update_table_metadata, ) from pyiceberg.table.update.schema import UpdateSchema -from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles +from pyiceberg.table.update.snapshot import ManageSnapshots, RewriteManifestsResult, UpdateSnapshot, _FastAppendFiles from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics from pyiceberg.transforms import IdentityTransform @@ -420,9 +420,12 @@ def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> U """ return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties) - def rewrite_manifests(self, spec_id: Optional[int] = None) -> None: + def rewrite_manifests(self, spec_id: Optional[int] = None) -> RewriteManifestsResult: + if self._table.current_snapshot() is None: + return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) with self.update_snapshot().rewrite() as rewrite: - rewrite.rewrite_manifests() + rewritten = rewrite.rewrite_manifests() + return rewritten def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ @@ -1174,7 +1177,7 @@ def add_files( def rewrite_manifests( self, spec_id: Optional[int] = None, - ) -> None: + ) -> RewriteManifestsResult: """ Shorthand API for Rewriting manifests for the table. @@ -1183,7 +1186,7 @@ def rewrite_manifests( """ with self.transaction() as tx: - tx.rewrite_manifests(spec_id=spec_id) + return tx.rewrite_manifests(spec_id=spec_id) def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 896d064eb1..ba0a1dcdc2 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -24,7 +24,7 @@ from concurrent.futures import Future from dataclasses import dataclass from functools import cached_property -from typing import TYPE_CHECKING, Callable, Dict, Generic, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, List, Optional, Set, Tuple from sortedcontainers import SortedList @@ -547,6 +547,7 @@ class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): _target_size_bytes: int rewritten_manifests: List[ManifestFile] = [] added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] def __init__( self, @@ -570,8 +571,38 @@ def __init__( self._table = table self._spec_id = spec_id or table.spec().spec_id + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: + from pyiceberg.table import TableProperties + + ssc = SnapshotSummaryCollector() + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + ) + ssc.set_partition_summary_limit(partition_summary_limit) + + props = { + "manifests-kept": str(len([])), + "manifests-created": str(len(self.added_manifests)), + "manifests-replaced": str(len(self.rewritten_manifests)), + "entries-processed": str(len([])), + } + previous_snapshot = ( + self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if self._parent_snapshot_id is not None + else None + ) + + return update_snapshot_summaries( + summary=Summary(operation=self._operation, **ssc.build(), **props), + previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, + truncate_full_table=False, + ) + def rewrite_manifests(self) -> RewriteManifestsResult: data_result = self._find_matching_manifests(ManifestContent.DATA) + self.rewritten_manifests.extend(data_result.rewritten_manifests) self.added_manifests.extend(data_result.added_manifests) @@ -608,8 +639,8 @@ def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifests return RewriteManifestsResult(rewritten_manifests=manifests, added_manifests=new_manifests) - def _copy_manifest_file(self, manifest_file:ManifestFile, snapshot_id:int) -> ManifestFile: - return ManifestFile( + def _copy_manifest_file(self, manifest_file: ManifestFile, snapshot_id: int) -> ManifestFile: + return ManifestFile( manifest_path=manifest_file.manifest_path, manifest_length=manifest_file.manifest_length, partition_spec_id=manifest_file.partition_spec_id, @@ -627,14 +658,14 @@ def _copy_manifest_file(self, manifest_file:ManifestFile, snapshot_id:int) -> Ma key_metadata=manifest_file.key_metadata, ) + def __exit__(self, _: Any, value: Any, traceback: Any) -> None: + """Commit only if we have rewritten any manifests.""" + if self.rewritten_manifests: + self.commit() + def _existing_manifests(self) -> List[ManifestFile]: """Determine if there are any existing manifest files.""" - new_manifests = [ - self._copy_manifest_file(manifest, self.snapshot_id) - for manifest in self.added_manifests + self.rewritten_manifests - ] - return new_manifests - # return [] + return [self._copy_manifest_file(manifest, self.snapshot_id) for manifest in self.added_manifests] def _deleted_entries(self) -> List[ManifestEntry]: """To determine if we need to record any deleted manifest entries. @@ -644,277 +675,6 @@ def _deleted_entries(self) -> List[ManifestEntry]: return [] -# class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): -# KEPT_MANIFESTS_COUNT = "manifests-kept" -# CREATED_MANIFESTS_COUNT = "manifests-created" -# REPLACED_MANIFESTS_COUNT = "manifests-replaced" -# PROCESSED_ENTRY_COUNT = "entries-processed" -# _target_size_bytes: int -# _min_count_to_merge: int -# -# def __init__( -# self, -# table: Table, -# transaction: Transaction, -# io: FileIO, -# commit_uuid: Optional[uuid.UUID] = None, -# snapshot_properties: Dict[str, str] = EMPTY_DICT, -# ) -> None: -# from pyiceberg.table import TableProperties -# -# super().__init__(Operation.REPLACE, transaction, io, commit_uuid, snapshot_properties) -# self._table = table -# self.specs_by_id = self._table.spec() # ops.current().specs_by_id() -# self.manifest_target_size_bytes = 8388608 # Default value -# self.deleted_manifests: Set[ManifestFile] = set() -# self.added_manifests: List[ManifestFile] = [] -# self.rewritten_added_manifests: List[ManifestFile] = [] -# self.kept_manifests: List[ManifestFile] = [] -# self.new_manifests: List[ManifestFile] = [] -# self.rewritten_manifests: Set[ManifestFile] = set() -# self.snapshot_Id_Inheritance_enabled = property_as_bool( -# self._transaction.table_metadata.properties, -# TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, -# TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT, -# ) # type: ignore -# self.can_inherit_snapshot_id = table.metadata.format_version > 1 or self.snapshot_Id_Inheritance_enabled -# # self.writers: Dict[Tuple[Any, int], 'WriterWrapper'] = {} -# self.entry_count = 0 -# self.cluster_by_func: Optional[Callable[[DataFile], Any]] = None -# self.predicate: Optional[Callable[[ManifestFile], bool]] = None -# # self.summary_builder = SnapshotSummary.Builder() -# # self.lock = threading.Lock() -# self._target_size_bytes = property_as_int( -# self._transaction.table_metadata.properties, -# TableProperties.MANIFEST_TARGET_SIZE_BYTES, -# TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, -# ) # type: ignore -# self._min_count_to_merge = property_as_int( -# self._transaction.table_metadata.properties, -# TableProperties.MANIFEST_MIN_MERGE_COUNT, -# TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT, -# ) # type: ignore -# -# # def cluster_by(self, func: Callable[[DataFile], Any]]) -> RewriteManifests: -# # self.cluster_by_func = func -# # return self -# -# # def rewrite_if(self, predicate: Callable[[ManifestFile], bool]]) -> RewriteManifests: -# # self.predicate = predicate -# # return self -# -# # def delete_manifest(self, manifest: ManifestFile) -> "_RewriteManifests": -# # self.deleted_manifests.add(manifest) -# # return self -# # -# # def add_manifest(self, manifest: ManifestFile) -> "_RewriteManifests": -# # if self.can_inherit_snapshot_id and manifest.added_snapshot_id is None: -# # self.added_manifests.append(manifest) -# # else: -# # copied_manifest = self.copy_manifest(manifest) -# # self.rewritten_added_manifests.append(copied_manifest) -# # return self -# -# # def copy_manifest(self, manifest: ManifestFile) -> ManifestFile: -# # """ -# # Copies a manifest file to a new location, updating its metadata (e.g., snapshot ID). -# # -# # Args: -# # manifest: The manifest file to copy. -# # -# # Returns: -# # A new ManifestFile object representing the copied manifest. -# # """ -# # # Get the current table metadata -# # current_metadata = self._table.metadata -# # -# # # Create an input file for the manifest to be copied -# # input_file = self.ops.io().new_input_file(manifest.path) -# # -# # # Create an output file for the new manifest -# # output_file = self.new_manifest_output_file() -# # -# # # Copy the manifest file, updating its metadata -# # new_manifest = ManifestFile( -# # format_version=current_metadata.format_version, -# # spec_id=manifest.partition_spec_id, -# # input_file=input_file, -# # specs_by_id=self.specs_by_id, -# # output_file=output_file, -# # snapshot_id=self.snapshot_id(), -# # summary_builder=self.summary_builder, -# # ) -# # -# # return new_manifest -# -# def _existing_manifests(self) -> List[ManifestFile]: -# """To determine if there are any existing manifest files. -# -# A fast append will add another ManifestFile to the ManifestList. -# All the existing manifest files are considered existing. -# """ -# existing_manifests = [] -# -# if self._parent_snapshot_id is not None: -# previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) -# -# if previous_snapshot is None: -# raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}") -# -# for manifest in previous_snapshot.manifests(io=self._io): -# if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id: -# existing_manifests.append(manifest) -# -# return existing_manifests -# def _deleted_entries(self) -> List[ManifestEntry]: -# """To determine if we need to record any deleted manifest entries. -# -# In case of an append, nothing is deleted. -# """ -# return [] -# -# -# def rewrite_manifests(self) -> List[ManifestFile]: -# snapshot = self._table.current_snapshot() -# if not snapshot: -# raise ValueError("Cannot rewrite manifests without a current snapshot") -# current_manifests = snapshot.manifests(self._io) -# current_manifest_set = set(current_manifests) -# self.validate_deleted_manifests(current_manifest_set, snapshot.snapshot_id) -# -# if self.requires_rewrite(current_manifest_set): -# self.perform_rewrite(current_manifests) -# else: -# self.keep_active_manifests(current_manifests) -# -# self.validate_files_counts() -# -# new_manifests = [ -# self.with_snapshot_id(manifest) -# for manifest in list(self.new_manifests) + self.added_manifests + self.rewritten_added_manifests -# ] -# return new_manifests + list(self.kept_manifests) -# -# def perform_rewrite(self, current_manifests: List[ManifestFile]) -> None: -# # self.reset() -# remaining_manifests = [m for m in current_manifests if m not in self.deleted_manifests] -# data_manifest_merge_manager = _ManifestMergeManager( -# target_size_bytes=self._target_size_bytes, -# min_count_to_merge=self._min_count_to_merge, -# merge_enabled=True, -# snapshot_producer=self, -# ) -# -# new_manifests = data_manifest_merge_manager.merge_manifests(remaining_manifests) -# self.new_manifests.extend(new_manifests) -# -# # def process_manifest(self, manifest: ManifestFile): -# # if not self.contains_deletes(manifest) and self.matches_predicate(manifest): -# # self.rewritten_manifests.add(manifest) -# # try: -# # reader = ManifestReader(manifest, self.ops.io(), self.ops.current().specs_by_id()) -# # for entry in reader.live_entries(): -# # self.append_entry(entry, self.cluster_by_func(entry.file), manifest.partition_spec_id) -# # reader.close() -# # except IOError as e: -# # raise RuntimeIOException from e -# # else: -# # self.kept_manifests.put(manifest) -# -# # def append_entry(self, entry: ManifestEntry, key: Any, partition_spec_id: int): -# # with self.lock: -# # writer = self.get_writer(key, partition_spec_id) -# # writer.add_entry(entry) -# # self.entry_count += 1 -# -# # def get_writer(self, key: Any, partition_spec_id: int) -> 'WriterWrapper': -# # return self.writers.setdefault((key, partition_spec_id), WriterWrapper(self, self.specs_by_id[partition_spec_id])) -# -# def validate_deleted_manifests(self, current_manifests: Set[ManifestFile], current_snapshot_id: int) -> None: -# for manifest in self.deleted_manifests: -# if manifest not in current_manifests: -# raise ValueError( -# f"Deleted manifest {manifest.manifest_path} could not be found in the latest snapshot {current_snapshot_id}" -# ) -# -# def requires_rewrite(self, current_manifests: Set[ManifestFile]) -> bool: -# # if self.cluster_by_func is None: -# # return False -# return len(self.rewritten_manifests) == 0 or any( -# manifest not in current_manifests for manifest in self.rewritten_manifests -# ) -# -# def keep_active_manifests(self, current_manifests: List[ManifestFile]) -> None: -# for manifest in current_manifests: -# if manifest not in self.rewritten_manifests and manifest not in self.deleted_manifests: -# self.kept_manifests.append(manifest) -# -# def validate_files_counts(self) -> None: -# created_manifests = list(self.new_manifests) + self.added_manifests + self.rewritten_added_manifests -# created_files_count = self.active_files_count(created_manifests) -# replaced_manifests = list(self.rewritten_manifests) + list(self.deleted_manifests) -# replaced_files_count = self.active_files_count(replaced_manifests) -# -# if created_files_count != replaced_files_count: -# raise ValueError( -# f"Replaced and created manifests must have the same number of active files: {created_files_count} (new), {replaced_files_count} (old)" -# ) -# -# def active_files_count(self, manifests: List[ManifestFile]) -> int: -# count = 0 -# for manifest in manifests: -# count += manifest.existing_files_count + manifest.added_files_count -# return count -# -# # def reset(self) -> None: -# # self.clean_uncommitted(self.new_manifests, set()) -# # self.entry_count = 0 -# # self.kept_manifests.clear() -# # self.rewritten_manifests.clear() -# # self.new_manifests.clear() -# # -# # def clean_uncommitted(self, manifests: List[ManifestFile], committed: Set[ManifestFile]) -> None: -# # for manifest in manifests: -# # if manifest not in committed: -# # self.delete_file(manifest.manifest_path) -# # -# # def delete_file(self, path: str): -# # # Mock implementation -# # if os.path.exists(path): -# # os.remove(path) -# -# def with_snapshot_id(self, manifest: ManifestFile) -> ManifestFile: -# # Mock implementation -# return ManifestFile(manifest.manifest_path, snapshot_id=0) -# -# def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: -# # Mock implementation -# return ManifestWriter(spec) -# -# # class WriterWrapper: -# # def __init__(self, outer: 'BaseRewriteManifests', spec: PartitionSpec): -# # self.outer = outer -# # self.spec = spec -# # self.writer: Optional[ManifestWriter] = None -# # self.lock = threading.Lock() -# # -# # def add_entry(self, entry: ManifestEntry): -# # with self.lock: -# # if self.writer is None or self.writer.length() >= self.outer.manifest_target_size_bytes: -# # self._close_writer() -# # self.writer = self.outer.new_manifest_writer(self.spec) -# # self.writer.existing(entry) -# # -# # def _close_writer(self): -# # if self.writer: -# # self.writer.close() -# # self.outer.new_manifests.put(self.writer.to_manifest_file()) -# # -# # def close(self): -# # with self.lock: -# # self._close_writer() - - class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]): """Overwrites data from the table. This will produce an OVERWRITE snapshot. diff --git a/tests/integration/test_writes/test_rewrite_manifests.py b/tests/integration/test_writes/test_rewrite_manifests.py index 1946ef542d..faf1285058 100644 --- a/tests/integration/test_writes/test_rewrite_manifests.py +++ b/tests/integration/test_writes/test_rewrite_manifests.py @@ -18,7 +18,6 @@ import pyarrow as pa import pytest -from pyspark.sql import SparkSession from pyiceberg.catalog import Catalog from utils import _create_table @@ -94,23 +93,127 @@ def table_v1_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_nu assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" +# @pytest.mark.integration +# def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: +# identifier = "default.arrow_table_summaries" +# tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) +# tbl.append(arrow_table_with_null) +# tbl.append(arrow_table_with_null) +# +# # tbl.rewrite_manifests() +# +# # records1 = [ThreeColumnRecord(1, None, "AAAA")] +# # write_records(spark, table_location, records1) +# before_pandas = tbl.scan().to_pandas() +# before_count = before_pandas.shape[0] +# tbl.refresh() +# manifests = tbl.inspect.manifests().to_pylist() +# assert len(manifests) == 2, "Should have 2 manifests before rewrite" +# +# tbl.rewrite_manifests() +# tbl.refresh() +# +# after_pandas = tbl.scan().to_pandas() +# after_count = before_pandas.shape[0] +# manifests = tbl.inspect.manifests().to_pylist() +# assert len(manifests) == 1, "Should have 1 manifests before rewrite" +# +# snaps = tbl.inspect.snapshots().to_pandas() +# print(snaps) + + @pytest.mark.integration -def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: - identifier = "default.arrow_table_summaries" +def test_rewrite_manifests_empty_table(session_catalog: Catalog) -> None: + # Create an unpartitioned table + identifier = "default.test_rewrite_manifests_empty_table" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) + + assert tbl.current_snapshot() is None, "Table must be empty" + + # Execute rewrite manifests action + tbl.rewrite_manifests() + + tbl.refresh() + assert tbl.current_snapshot() is None, "Table must stay empty" + + +@pytest.mark.integration +def test_rewrite_small_manifests_non_partitioned_table(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_rewrite_small_manifests_non_partitioned_table" tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) tbl.append(arrow_table_with_null) tbl.append(arrow_table_with_null) + tbl.refresh() - # tbl.rewrite_manifests() + manifests = tbl.inspect.manifests() + assert len(manifests) == 2, "Should have 2 manifests before rewrite" - # records1 = [ThreeColumnRecord(1, None, "AAAA")] - # write_records(spark, table_location, records1) + result = tbl.rewrite_manifests() + + assert len(result.rewritten_manifests) == 2, "Action should rewrite 2 manifests" + assert len(result.added_manifests) == 1, "Action should add 1 manifest" tbl.refresh() - manifests = tbl.inspect.all_manifests().to_pylist() - assert len(manifests) == 3, "Should have 3 manifests before rewrite" - tbl.rewrite_manifests() + current_snapshot = tbl.current_snapshot() + if not current_snapshot: + raise AssertionError + new_manifests = current_snapshot.manifests(tbl.io) + assert len(new_manifests) == 1, "Should have 1 manifest after rewrite" + assert new_manifests[0].existing_files_count == 2, "Should have 4 files in the new manifest" + assert new_manifests[0].added_files_count == 0, "Should have no added files in the new manifest" + assert new_manifests[0].deleted_files_count == 0, "Should have no deleted files in the new manifest" + + # Validate the records + expected_records_count = arrow_table_with_null.shape[0] * 2 + result_df = tbl.scan().to_pandas() + actual_records_count = result_df.shape[0] + assert expected_records_count == actual_records_count, "Rows must match" + + +def test_rewrite_small_manifests_partitioned_table(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + # Create and append data files + # records1 = [ThreeColumnRecord(1, None, "AAAA"), ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")] + # records2 = [ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")] + # records3 = [ThreeColumnRecord(3, "EEEEEEEEEE", "EEEE"), ThreeColumnRecord(3, "FFFFFFFFFF", "FFFF")] + # records4 = [ThreeColumnRecord(4, "GGGGGGGGGG", "GGGG"), ThreeColumnRecord(4, "HHHHHHHHHH", "HHHH")] + # self.table.newFastAppend().appendFile(DataFile.from_records(records1)).commit() + # self.table.newFastAppend().appendFile(DataFile.from_records(records2)).commit() + # self.table.newFastAppend().appendFile(DataFile.from_records(records3)).commit() + # self.table.newFastAppend().appendFile(DataFile.from_records(records4)).commit() + + identifier = "default.test_rewrite_small_manifests_non_partitioned_table" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + + tbl.refresh() + manifests = tbl.current_snapshot().manifests(tbl.io) + assert len(manifests) == 4, "Should have 4 manifests before rewrite" + + # Perform the rewrite manifests action + # actions = SparkActions.get() + result = tbl.rewrite_manifests() + + assert len(result.rewritten_manifests) == 4, "Action should rewrite 4 manifests" + assert len(result.added_manifests) == 2, "Action should add 2 manifests" + tbl.refresh() - manifests = tbl.inspect.all_manifests().to_pylist() - assert len(manifests) == 1, "Should have 1 manifests before rewrite" + new_manifests = tbl.current_snapshot().manifests(tbl.io) + assert len(new_manifests) == 2, "Should have 2 manifests after rewrite" + + assert new_manifests[0].existing_files_count == 4 + assert new_manifests[0].added_files_count == 0 + assert new_manifests[0].deleted_files_count == 0 + # + # assertnew_manifests[1].existingFilesCount(), 4) + # self.assertFalse(new_manifests[1].hasAddedFiles()) + # self.assertFalse(new_manifests[1].hasDeletedFiles()) + # + # # Validate the records + # expected_records = records1 + records2 + records3 + records4 + # result_df = tbl.read() + # actual_records = result_df.collect() + # self.assertEqual(actual_records, expected_records, "Rows must match") From bf688482021c8edc71910bf561c51c809a92c13e Mon Sep 17 00:00:00 2001 From: amitgilad Date: Sun, 2 Mar 2025 09:42:06 +0200 Subject: [PATCH 7/7] add test --- pyiceberg/manifest.py | 1 + .../test_writes/test_rewrite_manifests.py | 86 +++++++++++++------ 2 files changed, 59 insertions(+), 28 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index a3387148b4..b1053c154d 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -865,6 +865,7 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter: entry.snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file ) ) + self._existing_files += 1 return self diff --git a/tests/integration/test_writes/test_rewrite_manifests.py b/tests/integration/test_writes/test_rewrite_manifests.py index faf1285058..922a29d30f 100644 --- a/tests/integration/test_writes/test_rewrite_manifests.py +++ b/tests/integration/test_writes/test_rewrite_manifests.py @@ -15,11 +15,14 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name +from typing import List import pyarrow as pa import pytest from pyiceberg.catalog import Catalog +from pyiceberg.manifest import ManifestFile +from pyiceberg.table import TableProperties from utils import _create_table @@ -171,49 +174,76 @@ def test_rewrite_small_manifests_non_partitioned_table(session_catalog: Catalog, assert expected_records_count == actual_records_count, "Rows must match" -def test_rewrite_small_manifests_partitioned_table(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: - # Create and append data files - # records1 = [ThreeColumnRecord(1, None, "AAAA"), ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")] - # records2 = [ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")] - # records3 = [ThreeColumnRecord(3, "EEEEEEEEEE", "EEEE"), ThreeColumnRecord(3, "FFFFFFFFFF", "FFFF")] - # records4 = [ThreeColumnRecord(4, "GGGGGGGGGG", "GGGG"), ThreeColumnRecord(4, "HHHHHHHHHH", "HHHH")] - # self.table.newFastAppend().appendFile(DataFile.from_records(records1)).commit() - # self.table.newFastAppend().appendFile(DataFile.from_records(records2)).commit() - # self.table.newFastAppend().appendFile(DataFile.from_records(records3)).commit() - # self.table.newFastAppend().appendFile(DataFile.from_records(records4)).commit() +def compute_manifest_entry_size_bytes(manifests: List[ManifestFile]) -> float: + total_size = 0 + num_entries = 0 + + for manifest in manifests: + total_size += manifest.manifest_length + num_entries += manifest.added_files_count + manifest.existing_files_count + manifest.deleted_files_count + + return total_size / num_entries if num_entries > 0 else 0 + + +def test_rewrite_small_manifests_partitioned_table(session_catalog: Catalog) -> None: + records1 = pa.Table.from_pydict({"c1": [1, 1], "c2": [None, "BBBBBBBBBB"], "c3": ["AAAA", "BBBB"]}) + + records2 = records2 = pa.Table.from_pydict({"c1": [2, 2], "c2": ["CCCCCCCCCC", "DDDDDDDDDD"], "c3": ["CCCC", "DDDD"]}) + + records3 = records3 = pa.Table.from_pydict({"c1": [3, 3], "c2": ["EEEEEEEEEE", "FFFFFFFFFF"], "c3": ["EEEE", "FFFF"]}) + + records4 = records4 = pa.Table.from_pydict({"c1": [4, 4], "c2": ["GGGGGGGGGG", "HHHHHHHHHG"], "c3": ["GGGG", "HHHH"]}) + + schema = pa.schema( + [ + ("c1", pa.int64()), + ("c2", pa.string()), + ("c3", pa.string()), + ] + ) identifier = "default.test_rewrite_small_manifests_non_partitioned_table" - tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) - tbl.append(arrow_table_with_null) - tbl.append(arrow_table_with_null) - tbl.append(arrow_table_with_null) - tbl.append(arrow_table_with_null) + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=schema) + + tbl.append(records1) + tbl.append(records2) + tbl.append(records3) + tbl.append(records4) + tbl.refresh() tbl.refresh() manifests = tbl.current_snapshot().manifests(tbl.io) assert len(manifests) == 4, "Should have 4 manifests before rewrite" - # Perform the rewrite manifests action - # actions = SparkActions.get() + # manifest_entry_size_bytes = compute_manifest_entry_size_bytes(manifests) + target_manifest_size_bytes = 5200 * 2 + 100 + tbl = ( + tbl.transaction() + .set_properties({TableProperties.MANIFEST_TARGET_SIZE_BYTES: str(target_manifest_size_bytes)}) + .commit_transaction() + ) + result = tbl.rewrite_manifests() + tbl.refresh() assert len(result.rewritten_manifests) == 4, "Action should rewrite 4 manifests" assert len(result.added_manifests) == 2, "Action should add 2 manifests" - tbl.refresh() new_manifests = tbl.current_snapshot().manifests(tbl.io) assert len(new_manifests) == 2, "Should have 2 manifests after rewrite" assert new_manifests[0].existing_files_count == 4 assert new_manifests[0].added_files_count == 0 assert new_manifests[0].deleted_files_count == 0 - # - # assertnew_manifests[1].existingFilesCount(), 4) - # self.assertFalse(new_manifests[1].hasAddedFiles()) - # self.assertFalse(new_manifests[1].hasDeletedFiles()) - # - # # Validate the records - # expected_records = records1 + records2 + records3 + records4 - # result_df = tbl.read() - # actual_records = result_df.collect() - # self.assertEqual(actual_records, expected_records, "Rows must match") + + assert new_manifests[1].existing_files_count == 4 + assert new_manifests[1].added_files_count == 0 + assert new_manifests[1].deleted_files_count == 0 + + sorted_df = tbl.scan().to_pandas().sort_values(["c1", "c2"], ascending=[False, False]) + expectedRecords = ( + pa.concat_tables([records1, records2, records3, records4]).to_pandas().sort_values(["c1", "c2"], ascending=[False, False]) + ) + from pandas.testing import assert_frame_equal + + assert_frame_equal(sorted_df.reset_index(drop=True), expectedRecords.reset_index(drop=True))