Skip to content

Added ExpireSnapshots Feature #1880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0a94d96
Added initial units tests and Class for Removing a Snapshot
ForeverAngry Mar 29, 2025
5f0b62b
Added methods needed to expire snapshots by id, and optionally cleanu…
ForeverAngry Mar 31, 2025
f995daa
Update test_expire_snapshots.py
ForeverAngry Mar 31, 2025
65365e1
Added the builder method to __init__.py, updated the snapshot api wit…
ForeverAngry Apr 1, 2025
e28815f
Snapshots are not being transacted on, but need to re-assign refs
ForeverAngry Apr 1, 2025
4628ede
Fixed the test case.
ForeverAngry Apr 3, 2025
e80c41c
adding print statements to help with debugging
ForeverAngry Apr 3, 2025
cb9f0c9
Draft ready
ForeverAngry Apr 3, 2025
ebcff2b
Applied suggestions to Fix CICD
ForeverAngry Apr 3, 2025
97399bf
Merge branch 'main' into main
ForeverAngry Apr 3, 2025
95e5af2
Rebuild the poetry lock file.
ForeverAngry Apr 3, 2025
5ab5890
Merge branch 'main' into main
ForeverAngry Apr 4, 2025
5acd690
Refactor implementation of `ExpireSnapshots`
ForeverAngry Apr 13, 2025
d30a08c
Fixed format and linting issues
ForeverAngry Apr 13, 2025
e62ab58
Merge branch 'main' into main
ForeverAngry Apr 13, 2025
1af3258
Fixed format and linting issues
ForeverAngry Apr 13, 2025
352b48f
Merge branch 'main' of https://github.com/ForeverAngry/iceberg-python
ForeverAngry Apr 13, 2025
382e0ea
Merge branch 'main' into main
ForeverAngry Apr 18, 2025
549c183
rebased: from main
ForeverAngry Apr 19, 2025
386cb15
fixed: typo
ForeverAngry Apr 19, 2025
12729fa
removed errant files
ForeverAngry Apr 22, 2025
ce3515c
Added: public method signature to the init table file.
ForeverAngry Apr 22, 2025
28fce4b
Removed: `expire_snapshots_older_than` method, in favor of implementi…
ForeverAngry Apr 24, 2025
2c3153e
Update tests/table/test_expire_snapshots.py
ForeverAngry Apr 26, 2025
27c3ece
Removed: unrelated changes, Added: logic to expire snapshot method.
ForeverAngry Apr 26, 2025
05793c0
Merge branch 'main' into 1880-add-expire-snapshots
ForeverAngry Apr 26, 2025
8ec1889
Update test_partition_evolution.py
ForeverAngry Apr 26, 2025
b23ac6a
Update test_literals.py
ForeverAngry May 10, 2025
5c458f2
Update snapshot.py
ForeverAngry May 10, 2025
a08eb6b
Update snapshot.py
ForeverAngry May 11, 2025
689310d
Fixed: Linting
ForeverAngry May 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,5 @@ htmlcov
pyiceberg/avro/decoder_fast.c
pyiceberg/avro/*.html
pyiceberg/avro/*.so
.vscode/settings.json
pyiceberg/table/update/expire_snapshot.md
783 changes: 274 additions & 509 deletions poetry.lock

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
)
from pyiceberg.table.update.schema import UpdateSchema
from pyiceberg.table.update.snapshot import (
ExpireSnapshots,
ManageSnapshots,
UpdateSnapshot,
_FastAppendFiles,
Expand Down Expand Up @@ -1068,6 +1069,23 @@ def manage_snapshots(self) -> ManageSnapshots:
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
"""
return ManageSnapshots(transaction=Transaction(self, autocommit=True))

def expire_snapshots(self) -> ExpireSnapshots:
"""
Shorthand to expire snapshots.

Use table.expire_snapshots().expire_snapshot_id(...).commit() or
table.expire_snapshots().expire_older_than(...).commit()

You can also use it inside a transaction context:
with table.transaction() as tx:
tx.expire_snapshots().expire_older_than(...)

"""
return ExpireSnapshots(Transaction(self, autocommit=True))




def update_statistics(self) -> UpdateStatistics:
"""
Expand Down
91 changes: 89 additions & 2 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,16 @@
AddSnapshotUpdate,
AssertRefSnapshotId,
RemoveSnapshotRefUpdate,
RemoveSnapshotsUpdate,
SetSnapshotRefUpdate,
TableRequirement,
TableMetadata,
TableUpdate,
U,
UpdatesAndRequirements,
UpdateTableMetadata,
)

from pyiceberg.typedef import (
EMPTY_DICT,
KeyDefaultDict,
Expand All @@ -82,8 +85,23 @@
from pyiceberg.utils.properties import property_as_bool, property_as_int

if TYPE_CHECKING:
from pyiceberg.table import Transaction
from pyiceberg.table import Table

from pyiceberg.table.metadata import Snapshot
from pyiceberg.table.update import UpdateTableMetadata
from typing import Optional, Set
from datetime import datetime, timezone

from typing import Dict, Optional, Set
import uuid
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.snapshots import Snapshot
from pyiceberg.table.update import (
UpdateTableMetadata,
RemoveSnapshotsUpdate,
UpdatesAndRequirements,
AssertRefSnapshotId,
)

def _new_manifest_file_name(num: int, commit_uuid: uuid.UUID) -> str:
return f"{commit_uuid}-m{num}.avro"
Expand Down Expand Up @@ -239,7 +257,7 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
truncate_full_table=self._operation == Operation.OVERWRITE,
)

def _commit(self) -> UpdatesAndRequirements:
def commit(self, base_metadata: TableMetadata) -> UpdatesAndRequirements:
new_manifests = self._manifests()
next_sequence_number = self._transaction.table_metadata.next_sequence_number()

Expand Down Expand Up @@ -745,6 +763,8 @@ class ManageSnapshots(UpdateTableMetadata["ManageSnapshots"]):

def _commit(self) -> UpdatesAndRequirements:
"""Apply the pending changes and commit."""
if not hasattr(self._transaction, "_apply"):
raise AttributeError("Transaction object is not properly initialized.")
return self._updates, self._requirements

def _remove_ref_snapshot(self, ref_name: str) -> ManageSnapshots:
Expand Down Expand Up @@ -844,3 +864,70 @@ def remove_branch(self, branch_name: str) -> ManageSnapshots:
This for method chaining
"""
return self._remove_ref_snapshot(ref_name=branch_name)

class ExpireSnapshots(UpdateTableMetadata["ExpireSnapshots"]):
"""
API for removing old snapshots from the table.
"""
_updates: Tuple[TableUpdate, ...] = ()
_requirements: Tuple[TableRequirement, ...] = ()

def __init__(self, transaction) -> None:
super().__init__(transaction)
self._transaction = transaction
self._ids_to_remove: Set[int] = set()

def _commit(self) -> Tuple[Tuple[TableUpdate, ...], Tuple[TableRequirement, ...]]:
"""Apply the pending changes and commit."""
if not hasattr(self, "_transaction") or not self._transaction:
raise AttributeError("Transaction object is not properly initialized.")

if not self._ids_to_remove:
raise ValueError("No snapshot IDs marked for expiration.")

# print all children snapshots of the current snapshot
print(f"Current snapshot ID of {self._transaction._table.current_snapshot()} which has {len(self._transaction._table.snapshots())}")
print(f"Totals number of snapshot IDs to expire: {len(self._ids_to_remove)}")
print(f"Total number of snapshots in the table: {len(self._transaction.table_metadata.snapshots)}")
# Ensure current snapshots in refs are not marked for removal
current_snapshot_ids = {ref.snapshot_id for ref in self._transaction.table_metadata.refs.values()}

print(f"Current snapshot IDs in refs: {current_snapshot_ids}")
print(f"Snapshot IDs marked for removal: {self._ids_to_remove}")
conflicting_ids = self._ids_to_remove.intersection(current_snapshot_ids)
print(f"Conflicting snapshot IDs: {conflicting_ids}")

if conflicting_ids:
# Remove references to the conflicting snapshots before expiring them
for ref_name, ref in list(self._transaction.table_metadata.refs.items()):
if ref.snapshot_id in conflicting_ids:
self._updates += (RemoveSnapshotRefUpdate(ref_name=ref_name),)

# Remove the snapshots
self._updates = (RemoveSnapshotsUpdate(snapshot_ids=list(self._ids_to_remove)),)

# Ensure refs haven't changed (snapshot ID consistency check)
requirements = tuple(
AssertRefSnapshotId(snapshot_id=ref.snapshot_id, ref=ref_name)
for ref_name, ref in self._transaction.table_metadata.refs.items()
if ref.snapshot_id not in self._ids_to_remove
)

self._requirements += requirements
return self._updates, self._requirements

def expire_snapshot_id(self, snapshot_id_to_expire: int) -> ExpireSnapshots:
"""Mark a specific snapshot ID for expiration."""
snapshot = self._transaction._table.snapshot_by_id(snapshot_id_to_expire)
if snapshot:
self._ids_to_remove.add(snapshot_id_to_expire)
else:
raise ValueError(f"Snapshot ID {snapshot_id_to_expire} does not exist.")
return self

def expire_older_than(self, timestamp_ms: int) -> ExpireSnapshots:
"""Mark snapshots older than the given timestamp for expiration."""
for snapshot in self._transaction.table_metadata.snapshots:
if snapshot.timestamp_ms < timestamp_ms:
self._ids_to_remove.add(snapshot.snapshot_id)
return self
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ cachetools = "^5.5.0"
pyiceberg-core = { version = "^0.4.0", optional = true }
polars = { version = "^1.21.0", optional = true }
thrift-sasl = { version = ">=0.4.3", optional = true }
daft = "^0.4.8"

[tool.poetry.group.dev.dependencies]
pytest = "7.4.4"
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2349,7 +2349,7 @@ def table_v2_with_extensive_snapshots(example_table_metadata_v2_with_extensive_s
identifier=("database", "table"),
metadata=table_metadata,
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
io=load_file_io(),
io=load_file_io(location=metadata_location),
catalog=NoopCatalog("NoopCatalog"),
)

Expand Down
184 changes: 184 additions & 0 deletions tests/table/test_expire_snapshots.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
from typing import Any, Dict, Tuple
import pytest
from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.io import load_file_io
from pyiceberg.table import Table

import time
from random import randint
from typing import Any, Dict, Optional
import pytest
from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.io import load_file_io
from pyiceberg.table import Table
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.table import Table
from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.table.update import TableRequirement, TableUpdate
# Mock definition for CommitTableResponse
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, LongType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import BucketTransform, IdentityTransform
from pyiceberg.table.sorting import SortOrder, SortField, SortDirection, NullOrder

class CommitTableResponse:
def __init__(self, metadata=None, metadata_location='s3://bucket/test/location'):
if metadata is None:
# Provide a default TableMetadata object to avoid NoneType errors
metadata = TableMetadataV2(
location=metadata_location,
table_uuid='9c12d441-03fe-4693-9a96-a0705ddf69c1',
last_updated_ms=1602638573590,
last_column_id=3,
schemas=[
Schema(
NestedField(field_id=1, name="x", field_type=LongType(), required=True),
NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"),
NestedField(field_id=3, name="z", field_type=LongType(), required=True),
identifier_field_ids=[1, 2],
schema_id=1
)
],
current_schema_id=1,
partition_specs=[
PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="x"), spec_id=0
)
],
default_spec_id=0,
sort_orders=[
SortOrder(
SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
order_id=3
)
],
default_sort_order_id=3,
properties={},
current_snapshot_id=None,
snapshots=[],
snapshot_log=[],
metadata_log=[],
refs={},
statistics=[],
format_version=2,
last_sequence_number=34
)
self.metadata = metadata
self.metadata_location = metadata_location

class MockCatalog(NoopCatalog):
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
# Mock implementation of commit_table
return CommitTableResponse()

@pytest.fixture
def example_table_metadata_v2_with_extensive_snapshots() -> Dict[str, Any]:
def generate_snapshot(
snapshot_id: int,
parent_snapshot_id: Optional[int] = None,
timestamp_ms: Optional[int] = None,
sequence_number: int = 0,
) -> Dict[str, Any]:
return {
"snapshot-id": snapshot_id,
"parent-snapshot-id": parent_snapshot_id,
"timestamp-ms": timestamp_ms or int(time.time() * 1000),
"sequence-number": sequence_number,
"summary": {"operation": "append"},
"manifest-list": f"s3://a/b/{snapshot_id}.avro",
}

snapshots = []
snapshot_log = []
initial_snapshot_id = 3051729675574597004

for i in range(2000):
snapshot_id = initial_snapshot_id + i
parent_snapshot_id = snapshot_id - 1 if i > 0 else None
timestamp_ms = int(time.time() * 1000) - randint(0, 1000000)
snapshots.append(generate_snapshot(snapshot_id, parent_snapshot_id, timestamp_ms, i))
snapshot_log.append({"snapshot-id": snapshot_id, "timestamp-ms": timestamp_ms})

return {
"format-version": 2,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "s3://bucket/test/location",
"last-sequence-number": 34,
"last-updated-ms": 1602638573590,
"last-column-id": 3,
"current-schema-id": 1,
"schemas": [
{"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]},
{
"type": "struct",
"schema-id": 1,
"identifier-field-ids": [1, 2],
"fields": [
{"id": 1, "name": "x", "required": True, "type": "long"},
{"id": 2, "name": "y", "required": True, "type": "long", "doc": "comment"},
{"id": 3, "name": "z", "required": True, "type": "long"},
],
},
],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}],
"last-partition-id": 1000,
"default-sort-order-id": 3,
"sort-orders": [
{
"order-id": 3,
"fields": [
{"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"},
{"transform": "identity", "source-id": 3, "direction": "desc", "null-order": "nulls-last"}, # Adjusted field
],
}
],
"properties": {"read.split.target.size": "134217728"},
"current-snapshot-id": initial_snapshot_id + 1999,
"snapshots": snapshots,
"snapshot-log": snapshot_log,
"metadata-log": [{"metadata-file": "s3://bucket/.../v1.json", "timestamp-ms": 1515100}],
"refs": {"test": {"snapshot-id": initial_snapshot_id, "type": "tag", "max-ref-age-ms": 10000000}},
}

@pytest.fixture
def table_v2_with_extensive_snapshots(example_table_metadata_v2_with_extensive_snapshots: Dict[str, Any]) -> Table:
table_metadata = TableMetadataV2(**example_table_metadata_v2_with_extensive_snapshots)
return Table(
identifier=("database", "table"),
metadata=table_metadata,
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
io=load_file_io(location=f"{table_metadata.location}/uuid.metadata.json"),
catalog=NoopCatalog("NoopCatalog"),
)

def test_remove_snapshot(table_v2_with_extensive_snapshots: Table):
table = table_v2_with_extensive_snapshots
table.catalog = MockCatalog("MockCatalog")

# Verify the table has metadata and a current snapshot before proceeding
assert table.metadata is not None, "Table metadata is None"
assert table.metadata.current_snapshot_id is not None, "Current snapshot ID is None"

snapshot_to_expire = 3051729675574599003

# Ensure the table has snapshots
assert table.metadata.snapshots is not None, "Snapshots list is None"
assert len(table.metadata.snapshots) == 2000, f"Expected 2000 snapshots, got {len(table.metadata.snapshots)}"

assert snapshot_to_expire is not None, "No valid snapshot found to expire"

# Remove a snapshot using the expire_snapshots API
table.expire_snapshots().expire_snapshot_id(snapshot_to_expire).commit()

# Verify the snapshot was removed
assert snapshot_to_expire not in [snapshot.snapshot_id for snapshot in table.metadata.snapshots], \
f"Snapshot ID {snapshot_to_expire} was not removed"

# Use the built-in pytest capsys fixture to capture printed output
print(f"Snapshot ID {snapshot_to_expire} expired successfully")
print(f"Number of snapshots after expiry: {table.metadata}")