Skip to content

Commit

Permalink
Add V3 read support (#1554)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko authored Jan 21, 2025
1 parent 0638493 commit 5a3c346
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 22 deletions.
14 changes: 14 additions & 0 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
Field,
PlainSerializer,
WithJsonSchema,
model_validator,
)
from typing_extensions import Annotated

Expand Down Expand Up @@ -111,6 +112,19 @@ def __init__(

super().__init__(**data)

@model_validator(mode="before")
@classmethod
def map_source_ids_onto_source_id(cls, data: Any) -> Any:
if isinstance(data, dict):
if "source-id" not in data and (source_ids := data["source-ids"]):
if isinstance(source_ids, list):
if len(source_ids) == 0:
raise ValueError("Empty source-ids is not allowed")
if len(source_ids) > 1:
raise ValueError("Multi argument transforms are not yet supported")
data["source-id"] = source_ids[0]
return data

def __str__(self) -> str:
"""Return the string representation of the PartitionField class."""
return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"
Expand Down
119 changes: 98 additions & 21 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,8 @@ def to_v2(self) -> TableMetadataV2:
return TableMetadataV2.model_validate(metadata)

format_version: Literal[1] = Field(alias="format-version", default=1)
"""An integer version number for the format. Currently, this can be 1 or 2
based on the spec. Implementations must throw an exception if a table’s
version is higher than the supported version."""
"""An integer version number for the format. Implementations must throw
an exception if a table’s version is higher than the supported version."""

schema_: Schema = Field(alias="schema")
"""The table’s current schema. (Deprecated: use schemas and
Expand Down Expand Up @@ -507,16 +506,74 @@ def construct_refs(cls, table_metadata: TableMetadata) -> TableMetadata:
return construct_refs(table_metadata)

format_version: Literal[2] = Field(alias="format-version", default=2)
"""An integer version number for the format. Currently, this can be 1 or 2
based on the spec. Implementations must throw an exception if a table’s
version is higher than the supported version."""
"""An integer version number for the format. Implementations must throw
an exception if a table’s version is higher than the supported version."""

last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER)
"""The table’s highest assigned sequence number, a monotonically
increasing long that tracks the order of snapshots in a table."""


TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2], Field(discriminator="format_version")]
class TableMetadataV3(TableMetadataCommonFields, IcebergBaseModel):
"""Represents version 3 of the Table Metadata.
Version 3 of the Iceberg spec extends data types and existing metadata structures to add new capabilities:
- New data types: nanosecond timestamp(tz), unknown
- Default value support for columns
- Multi-argument transforms for partitioning and sorting
- Row Lineage tracking
- Binary deletion vectors
For more information:
https://iceberg.apache.org/spec/?column-projection#version-3-extended-types-and-capabilities
"""

@model_validator(mode="before")
def cleanup_snapshot_id(cls, data: Dict[str, Any]) -> Dict[str, Any]:
return cleanup_snapshot_id(data)

@model_validator(mode="after")
def check_schemas(cls, table_metadata: TableMetadata) -> TableMetadata:
return check_schemas(table_metadata)

@model_validator(mode="after")
def check_partition_specs(cls, table_metadata: TableMetadata) -> TableMetadata:
return check_partition_specs(table_metadata)

@model_validator(mode="after")
def check_sort_orders(cls, table_metadata: TableMetadata) -> TableMetadata:
return check_sort_orders(table_metadata)

@model_validator(mode="after")
def construct_refs(cls, table_metadata: TableMetadata) -> TableMetadata:
return construct_refs(table_metadata)

format_version: Literal[3] = Field(alias="format-version", default=3)
"""An integer version number for the format. Implementations must throw
an exception if a table’s version is higher than the supported version."""

last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER)
"""The table’s highest assigned sequence number, a monotonically
increasing long that tracks the order of snapshots in a table."""

row_lineage: bool = Field(alias="row-lineage", default=False)
"""Indicates that row-lineage is enabled on the table
For more information:
https://iceberg.apache.org/spec/?column-projection#row-lineage
"""

next_row_id: Optional[int] = Field(alias="next-row-id", default=None)
"""A long higher than all assigned row IDs; the next snapshot's `first-row-id`."""

def model_dump_json(
self, exclude_none: bool = True, exclude: Optional[Any] = None, by_alias: bool = True, **kwargs: Any
) -> str:
raise NotImplementedError("Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551")


TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2, TableMetadataV3], Field(discriminator="format_version")]


def new_table_metadata(
Expand Down Expand Up @@ -553,20 +610,36 @@ def new_table_metadata(
last_partition_id=fresh_partition_spec.last_assigned_field_id,
table_uuid=table_uuid,
)

return TableMetadataV2(
location=location,
schemas=[fresh_schema],
last_column_id=fresh_schema.highest_field_id,
current_schema_id=fresh_schema.schema_id,
partition_specs=[fresh_partition_spec],
default_spec_id=fresh_partition_spec.spec_id,
sort_orders=[fresh_sort_order],
default_sort_order_id=fresh_sort_order.order_id,
properties=properties,
last_partition_id=fresh_partition_spec.last_assigned_field_id,
table_uuid=table_uuid,
)
elif format_version == 2:
return TableMetadataV2(
location=location,
schemas=[fresh_schema],
last_column_id=fresh_schema.highest_field_id,
current_schema_id=fresh_schema.schema_id,
partition_specs=[fresh_partition_spec],
default_spec_id=fresh_partition_spec.spec_id,
sort_orders=[fresh_sort_order],
default_sort_order_id=fresh_sort_order.order_id,
properties=properties,
last_partition_id=fresh_partition_spec.last_assigned_field_id,
table_uuid=table_uuid,
)
elif format_version == 3:
return TableMetadataV3(
location=location,
schemas=[fresh_schema],
last_column_id=fresh_schema.highest_field_id,
current_schema_id=fresh_schema.schema_id,
partition_specs=[fresh_partition_spec],
default_spec_id=fresh_partition_spec.spec_id,
sort_orders=[fresh_sort_order],
default_sort_order_id=fresh_sort_order.order_id,
properties=properties,
last_partition_id=fresh_partition_spec.last_assigned_field_id,
table_uuid=table_uuid,
)
else:
raise ValidationError(f"Unknown format version: {format_version}")


class TableMetadataWrapper(IcebergRootModel[TableMetadata]):
Expand All @@ -593,6 +666,8 @@ def parse_obj(data: Dict[str, Any]) -> TableMetadata:
return TableMetadataV1(**data)
elif format_version == 2:
return TableMetadataV2(**data)
elif format_version == 3:
return TableMetadataV3(**data)
else:
raise ValidationError(f"Unknown format version: {format_version}")

Expand All @@ -609,6 +684,8 @@ def _construct_without_validation(table_metadata: TableMetadata) -> TableMetadat
return TableMetadataV1.model_construct(**dict(table_metadata))
elif table_metadata.format_version == 2:
return TableMetadataV2.model_construct(**dict(table_metadata))
elif table_metadata.format_version == 3:
return TableMetadataV3.model_construct(**dict(table_metadata))
else:
raise ValidationError(f"Unknown format version: {table_metadata.format_version}")

Expand Down
13 changes: 13 additions & 0 deletions pyiceberg/table/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ def set_null_order(cls, values: Dict[str, Any]) -> Dict[str, Any]:
values["null-order"] = NullOrder.NULLS_FIRST if values["direction"] == SortDirection.ASC else NullOrder.NULLS_LAST
return values

@model_validator(mode="before")
@classmethod
def map_source_ids_onto_source_id(cls, data: Any) -> Any:
if isinstance(data, dict):
if "source-id" not in data and (source_ids := data["source-ids"]):
if isinstance(source_ids, list):
if len(source_ids) == 0:
raise ValueError("Empty source-ids is not allowed")
if len(source_ids) > 1:
raise ValueError("Multi argument transforms are not yet supported")
data["source-id"] = source_ids[0]
return data

source_id: int = Field(alias="source-id")
transform: Annotated[ # type: ignore
Transform,
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/typedef.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,4 @@ def __hash__(self) -> int:
return hash(str(self))


TableVersion: TypeAlias = Literal[1, 2]
TableVersion: TypeAlias = Literal[1, 2, 3]
71 changes: 71 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,72 @@ def generate_snapshot(
"refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}},
}

EXAMPLE_TABLE_METADATA_V3 = {
"format-version": 3,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "s3://bucket/test/location",
"last-sequence-number": 34,
"last-updated-ms": 1602638573590,
"last-column-id": 3,
"current-schema-id": 1,
"schemas": [
{"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]},
{
"type": "struct",
"schema-id": 1,
"identifier-field-ids": [1, 2],
"fields": [
{"id": 1, "name": "x", "required": True, "type": "long"},
{"id": 2, "name": "y", "required": True, "type": "long", "doc": "comment"},
{"id": 3, "name": "z", "required": True, "type": "long"},
# TODO: Add unknown, timestamp(tz)_ns
# {"id": 4, "name": "u", "required": True, "type": "unknown"},
# {"id": 5, "name": "ns", "required": True, "type": "timestamp_ns"},
# {"id": 6, "name": "nstz", "required": True, "type": "timestamptz_ns"},
],
},
],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-ids": [1], "field-id": 1000}]}],
"last-partition-id": 1000,
"default-sort-order-id": 3,
"sort-orders": [
{
"order-id": 3,
"fields": [
{"transform": "identity", "source-ids": [2], "direction": "asc", "null-order": "nulls-first"},
{"transform": "bucket[4]", "source-ids": [3], "direction": "desc", "null-order": "nulls-last"},
],
}
],
"properties": {"read.split.target.size": "134217728"},
"current-snapshot-id": 3055729675574597004,
"snapshots": [
{
"snapshot-id": 3051729675574597004,
"timestamp-ms": 1515100955770,
"sequence-number": 0,
"summary": {"operation": "append"},
"manifest-list": "s3://a/b/1.avro",
},
{
"snapshot-id": 3055729675574597004,
"parent-snapshot-id": 3051729675574597004,
"timestamp-ms": 1555100955770,
"sequence-number": 1,
"summary": {"operation": "append"},
"manifest-list": "s3://a/b/2.avro",
"schema-id": 1,
},
],
"snapshot-log": [
{"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770},
{"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770},
],
"metadata-log": [{"metadata-file": "s3://bucket/.../v1.json", "timestamp-ms": 1515100}],
"refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}},
}

TABLE_METADATA_V2_WITH_FIXED_AND_DECIMAL_TYPES = {
"format-version": 2,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
Expand Down Expand Up @@ -1052,6 +1118,11 @@ def table_metadata_v2_with_statistics() -> Dict[str, Any]:
return TABLE_METADATA_V2_WITH_STATISTICS


@pytest.fixture
def example_table_metadata_v3() -> Dict[str, Any]:
return EXAMPLE_TABLE_METADATA_V3


@pytest.fixture(scope="session")
def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str:
from pyiceberg.io.pyarrow import PyArrowFileIO
Expand Down
10 changes: 10 additions & 0 deletions tests/table/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TableMetadataUtil,
TableMetadataV1,
TableMetadataV2,
TableMetadataV3,
new_table_metadata,
)
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
Expand Down Expand Up @@ -178,6 +179,15 @@ def test_serialize_v2(example_table_metadata_v2: Dict[str, Any]) -> None:
assert table_metadata == expected


def test_serialize_v3(example_table_metadata_v3: Dict[str, Any]) -> None:
# Writing will be part of https://github.com/apache/iceberg-python/issues/1551

with pytest.raises(NotImplementedError) as exc_info:
_ = TableMetadataV3(**example_table_metadata_v3).model_dump_json()

assert "Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551" in str(exc_info.value)


def test_migrate_v1_schemas(example_table_metadata_v1: Dict[str, Any]) -> None:
table_metadata = TableMetadataV1(**example_table_metadata_v1)

Expand Down
14 changes: 14 additions & 0 deletions tests/table/test_partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,17 @@ def test_partition_type(table_schema_simple: Schema) -> None:
NestedField(field_id=1000, name="str_truncate", field_type=StringType(), required=False),
NestedField(field_id=1001, name="int_bucket", field_type=IntegerType(), required=True),
)


def test_deserialize_partition_field_v2() -> None:
json_partition_spec = """{"source-id": 1, "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}"""

field = PartitionField.model_validate_json(json_partition_spec)
assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate")


def test_deserialize_partition_field_v3() -> None:
json_partition_spec = """{"source-ids": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}"""

field = PartitionField.model_validate_json(json_partition_spec)
assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate")
12 changes: 12 additions & 0 deletions tests/table/test_sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,15 @@ def test_unsorting_to_repr() -> None:
def test_sorting_repr(sort_order: SortOrder) -> None:
"""To make sure that the repr converts back to the original object"""
assert sort_order == eval(repr(sort_order))


def test_serialize_sort_field_v2() -> None:
expected = SortField(source_id=19, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST)
payload = '{"source-id":19,"transform":"identity","direction":"asc","null-order":"nulls-first"}'
assert SortField.model_validate_json(payload) == expected


def test_serialize_sort_field_v3() -> None:
expected = SortField(source_id=19, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST)
payload = '{"source-ids":[19],"transform":"identity","direction":"asc","null-order":"nulls-first"}'
assert SortField.model_validate_json(payload) == expected

0 comments on commit 5a3c346

Please sign in to comment.