Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(table): Implement converting Iceberg schema and types to Arrow #168

Merged
merged 6 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/go-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ on:
branches:
- 'main'
tags:
- 'v**'
pull_request:
- 'v**'
pull_request:

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
Expand Down
54 changes: 52 additions & 2 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,25 @@ type AfterMapValueVisitor interface {
AfterMapValue(value NestedField)
}

type SchemaVisitorPerPrimitiveType[T any] interface {
SchemaVisitor[T]

VisitFixed(FixedType) T
VisitDecimal(DecimalType) T
VisitBoolean() T
VisitInt32() T
VisitInt64() T
VisitFloat32() T
VisitFloat64() T
VisitDate() T
VisitTime() T
VisitTimestamp() T
VisitTimestampTz() T
VisitString() T
VisitBinary() T
VisitUUID() T
}

// Visit accepts a visitor and performs a post-order traversal of the given schema.
func Visit[T any](sc *Schema, visitor SchemaVisitor[T]) (res T, err error) {
if sc == nil {
Expand Down Expand Up @@ -534,6 +553,38 @@ func visitField[T any](f NestedField, visitor SchemaVisitor[T]) T {
case *MapType:
return visitMap(*typ, visitor)
default: // primitive
if perPrimitive, ok := visitor.(SchemaVisitorPerPrimitiveType[T]); ok {
switch t := typ.(type) {
case BooleanType:
return perPrimitive.VisitBoolean()
case Int32Type:
return perPrimitive.VisitInt32()
case Int64Type:
return perPrimitive.VisitInt64()
case Float32Type:
return perPrimitive.VisitFloat32()
case Float64Type:
return perPrimitive.VisitFloat64()
case DateType:
return perPrimitive.VisitDate()
case TimeType:
return perPrimitive.VisitTime()
case TimestampType:
return perPrimitive.VisitTimestamp()
case TimestampTzType:
return perPrimitive.VisitTimestampTz()
case StringType:
return perPrimitive.VisitString()
case BinaryType:
return perPrimitive.VisitBinary()
case UUIDType:
return perPrimitive.VisitUUID()
case DecimalType:
return perPrimitive.VisitDecimal(t)
case FixedType:
return perPrimitive.VisitFixed(t)
}
}
return visitor.Primitive(typ.(PrimitiveType))
}
}
Expand Down Expand Up @@ -706,8 +757,7 @@ func (i *indexByName) AfterField(field NestedField) {
// PruneColumns visits a schema pruning any columns which do not exist in the
// provided selected set. Parent fields of a selected child will be retained.
func PruneColumns(schema *Schema, selected map[int]Void, selectFullTypes bool) (*Schema, error) {

result, err := Visit[Type](schema, &pruneColVisitor{selected: selected,
result, err := Visit(schema, &pruneColVisitor{selected: selected,
fullTypes: selectFullTypes})
if err != nil {
return nil, err
Expand Down
130 changes: 130 additions & 0 deletions table/arrow_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/extensions"
"github.com/apache/iceberg-go"
)

Expand Down Expand Up @@ -410,3 +411,132 @@ func ArrowSchemaToIceberg(sc *arrow.Schema, downcastNsTimestamp bool, nameMappin
iceberg.ErrInvalidSchema)
}
}

type convertToArrow struct {
metadata map[string]string
includeFieldIDs bool
}

func (c convertToArrow) Schema(_ *iceberg.Schema, result arrow.Field) arrow.Field {
result.Metadata = arrow.MetadataFrom(c.metadata)
return result
}

func (c convertToArrow) Struct(_ iceberg.StructType, results []arrow.Field) arrow.Field {
return arrow.Field{Type: arrow.StructOf(results...)}
}

func (c convertToArrow) Field(field iceberg.NestedField, result arrow.Field) arrow.Field {
meta := map[string]string{}
if len(field.Doc) > 0 {
meta[ArrowFieldDocKey] = field.Doc
}

if c.includeFieldIDs {
meta[ArrowParquetFieldIDKey] = strconv.Itoa(field.ID)
}

if len(meta) > 0 {
result.Metadata = arrow.MetadataFrom(meta)
}

result.Name, result.Nullable = field.Name, !field.Required
return result
}

func (c convertToArrow) List(list iceberg.ListType, elemResult arrow.Field) arrow.Field {
elemField := c.Field(list.ElementField(), elemResult)
return arrow.Field{Type: arrow.LargeListOfField(elemField)}
}

func (c convertToArrow) Map(m iceberg.MapType, keyResult, valResult arrow.Field) arrow.Field {
keyField := c.Field(m.KeyField(), keyResult)
valField := c.Field(m.ValueField(), valResult)
return arrow.Field{Type: arrow.MapOfWithMetadata(keyField.Type, keyField.Metadata,
valField.Type, valField.Metadata)}
}

func (c convertToArrow) Primitive(iceberg.PrimitiveType) arrow.Field { panic("shouldn't be called") }

func (c convertToArrow) VisitFixed(f iceberg.FixedType) arrow.Field {
return arrow.Field{Type: &arrow.FixedSizeBinaryType{ByteWidth: f.Len()}}
}

func (c convertToArrow) VisitDecimal(d iceberg.DecimalType) arrow.Field {
return arrow.Field{Type: &arrow.Decimal128Type{
Precision: int32(d.Precision()), Scale: int32(d.Scale())}}
}

func (c convertToArrow) VisitBoolean() arrow.Field {
return arrow.Field{Type: arrow.FixedWidthTypes.Boolean}
}

func (c convertToArrow) VisitInt32() arrow.Field {
return arrow.Field{Type: arrow.PrimitiveTypes.Int32}
}

func (c convertToArrow) VisitInt64() arrow.Field {
return arrow.Field{Type: arrow.PrimitiveTypes.Int64}
}

func (c convertToArrow) VisitFloat32() arrow.Field {
return arrow.Field{Type: arrow.PrimitiveTypes.Float32}
}

func (c convertToArrow) VisitFloat64() arrow.Field {
return arrow.Field{Type: arrow.PrimitiveTypes.Float64}
}

func (c convertToArrow) VisitDate() arrow.Field {
return arrow.Field{Type: arrow.FixedWidthTypes.Date32}
}

func (c convertToArrow) VisitTime() arrow.Field {
return arrow.Field{Type: arrow.FixedWidthTypes.Time64us}
}

func (c convertToArrow) VisitTimestampTz() arrow.Field {
return arrow.Field{Type: arrow.FixedWidthTypes.Timestamp_us}
}

func (c convertToArrow) VisitTimestamp() arrow.Field {
return arrow.Field{Type: &arrow.TimestampType{Unit: arrow.Microsecond}}
}

func (c convertToArrow) VisitString() arrow.Field {
return arrow.Field{Type: arrow.BinaryTypes.LargeString}
}

func (c convertToArrow) VisitBinary() arrow.Field {
return arrow.Field{Type: arrow.BinaryTypes.LargeBinary}
}

func (c convertToArrow) VisitUUID() arrow.Field {
return arrow.Field{Type: extensions.NewUUIDType()}
}

// SchemaToArrowSchema converts an Iceberg schema to an Arrow schema. If the metadata parameter
// is non-nil, it will be included as the top-level metadata in the schema. If includeFieldIDs
// is true, then each field of the schema will contain a metadata key PARQUET:field_id set to
// the field id from the iceberg schema.
func SchemaToArrowSchema(sc *iceberg.Schema, metadata map[string]string, includeFieldIDs bool) (*arrow.Schema, error) {
top, err := iceberg.Visit(sc, convertToArrow{metadata: metadata, includeFieldIDs: includeFieldIDs})
if err != nil {
return nil, err
}

return arrow.NewSchema(top.Type.(*arrow.StructType).Fields(), &top.Metadata), nil
}

// TypeToArrowType converts a given iceberg type, into the equivalent Arrow data type.
// For dealing with nested fields (List, Struct, Map) if includeFieldIDs is true, then
// the child fields will contain a metadata key PARQUET:field_id set to the field id.
func TypeToArrowType(t iceberg.Type, includeFieldIDs bool) (arrow.DataType, error) {
top, err := iceberg.Visit(iceberg.NewSchema(0, iceberg.NestedField{Type: t}),
convertToArrow{includeFieldIDs: includeFieldIDs})
if err != nil {
return nil, err
}

return top.Type.(*arrow.StructType).Field(0).Type, nil
}
Loading
Loading