Skip to content

Commit

Permalink
feat(table): add conversion from Arrow Schema to Iceberg (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade authored Oct 9, 2024
1 parent 2f43ed7 commit 4929eea
Show file tree
Hide file tree
Showing 19 changed files with 1,474 additions and 56 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/go-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [ '1.21', '1.22' ]
go: [ '1.22', '1.23' ]
os: [ 'ubuntu-latest', 'windows-latest', 'macos-latest' ]
steps:
- uses: actions/checkout@v4
Expand All @@ -50,7 +50,11 @@ jobs:
cache: true
cache-dependency-path: go.sum
- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@latest
if: matrix.go == '1.22'
run: go install honnef.co/go/tools/cmd/[email protected]
- name: Install staticcheck
if: matrix.go == '1.23'
run: go install honnef.co/go/tools/cmd/[email protected]
- name: Lint
run: staticcheck ./...
- name: Run tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/go-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v4
with:
go-version: 1.22
go-version: 1.23
cache: true
cache-dependency-path: go.sum

Expand Down
36 changes: 24 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

module github.com/apache/iceberg-go

go 1.21
go 1.22.7

require (
github.com/apache/arrow/go/v16 v16.1.0
github.com/apache/arrow-go/v18 v18.0.0-20240924011512-14844aea3205
github.com/aws/aws-sdk-go-v2 v1.31.0
github.com/aws/aws-sdk-go-v2/config v1.27.39
github.com/aws/aws-sdk-go-v2/credentials v1.17.37
Expand All @@ -29,18 +29,22 @@ require (
github.com/aws/smithy-go v1.21.0
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815
github.com/google/uuid v1.6.0
github.com/hamba/avro/v2 v2.23.0
github.com/hamba/avro/v2 v2.26.0
github.com/pterm/pterm v0.12.79
github.com/stretchr/testify v1.9.0
github.com/twmb/murmur3 v1.1.8
github.com/wolfeidau/s3iofs v1.5.2
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/sync v0.8.0
)

require (
atomicgo.dev/cursor v0.2.0 // indirect
atomicgo.dev/keyboard v0.2.9 // indirect
atomicgo.dev/schedule v0.1.0 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/apache/thrift v0.20.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 // indirect
Expand All @@ -56,28 +60,36 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.31.3 // indirect
github.com/containerd/console v1.0.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/gookit/color v1.5.4 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/lithammer/fuzzysearch v1.1.8 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.23.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/term v0.24.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.25.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
76 changes: 52 additions & 24 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions literals.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"time"
"unsafe"

"github.com/apache/arrow/go/v16/arrow"
"github.com/apache/arrow/go/v16/arrow/decimal128"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/decimal128"
"github.com/google/uuid"
)

Expand Down
4 changes: 2 additions & 2 deletions literals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"testing"
"time"

"github.com/apache/arrow/go/v16/arrow"
"github.com/apache/arrow/go/v16/arrow/decimal128"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/decimal128"
"github.com/apache/iceberg-go"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
Expand Down
7 changes: 6 additions & 1 deletion manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,12 @@ func avroPartitionData(input map[string]any) map[string]any {
}
}
default:
out[k] = v
switch v := v.(type) {
case time.Time:
out[k] = Timestamp(v.UTC().UnixMicro())
default:
out[k] = v
}
}
}
return out
Expand Down
148 changes: 147 additions & 1 deletion schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package iceberg
import (
"encoding/json"
"fmt"
"maps"
"strings"
"sync"
"sync/atomic"

"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -1019,3 +1019,149 @@ func (buildPosAccessors) Primitive(PrimitiveType) map[int]accessor {
func buildAccessors(schema *Schema) (map[int]accessor, error) {
return Visit(schema, buildPosAccessors{})
}

type SchemaWithPartnerVisitor[T, P any] interface {
Schema(sc *Schema, schemaPartner P, structResult T) T
Struct(st StructType, structPartner P, fieldResults []T) T
Field(field NestedField, fieldPartner P, fieldResult T) T
List(l ListType, listPartner P, elemResult T) T
Map(m MapType, mapPartner P, keyResult, valResult T) T
Primitive(p PrimitiveType, primitivePartner P) T
}

type PartnerAccessor[P any] interface {
SchemaPartner(P) P
FieldPartner(partnerStruct P, fieldID int, fieldName string) P
ListElementPartner(P) P
MapKeyPartner(P) P
MapValuePartner(P) P
}

func VisitSchemaWithPartner[T, P any](sc *Schema, partner P, visitor SchemaWithPartnerVisitor[T, P], accessor PartnerAccessor[P]) (res T, err error) {
if sc == nil {
err = fmt.Errorf("%w: cannot visit nil schema", ErrInvalidArgument)
return
}

if visitor == nil || accessor == nil {
err = fmt.Errorf("%w: cannot visit with nil visitor or accessor", ErrInvalidArgument)
return
}

defer func() {
if r := recover(); r != nil {
switch e := r.(type) {
case string:
err = fmt.Errorf("error encountered during schema visitor: %s", e)
case error:
err = fmt.Errorf("error encountered during schema visitor: %w", e)
}
}
}()

structPartner := accessor.SchemaPartner(partner)
return visitor.Schema(sc, partner, visitStructWithPartner(sc.AsStruct(), structPartner, visitor, accessor)), nil
}

func visitStructWithPartner[T, P any](st StructType, partner P, visitor SchemaWithPartnerVisitor[T, P], accessor PartnerAccessor[P]) T {
type (
beforeField interface {
BeforeField(NestedField, P)
}
afterField interface {
AfterField(NestedField, P)
}
)

bf, _ := visitor.(beforeField)
af, _ := visitor.(afterField)

fieldResults := make([]T, len(st.FieldList))

for i, f := range st.FieldList {
fieldPartner := accessor.FieldPartner(partner, f.ID, f.Name)
if bf != nil {
bf.BeforeField(f, fieldPartner)
}
fieldResult := visitTypeWithPartner(f.Type, fieldPartner, visitor, accessor)
fieldResults[i] = visitor.Field(f, fieldPartner, fieldResult)
if af != nil {
af.AfterField(f, fieldPartner)
}
}

return visitor.Struct(st, partner, fieldResults)
}

func visitListWithPartner[T, P any](listType ListType, partner P, visitor SchemaWithPartnerVisitor[T, P], accessor PartnerAccessor[P]) T {
type (
beforeListElem interface {
BeforeListElement(NestedField, P)
}
afterListElem interface {
AfterListElement(NestedField, P)
}
)

elemPartner := accessor.ListElementPartner(partner)
if ble, ok := visitor.(beforeListElem); ok {
ble.BeforeListElement(listType.ElementField(), elemPartner)
}
elemResult := visitTypeWithPartner(listType.Element, elemPartner, visitor, accessor)
if ale, ok := visitor.(afterListElem); ok {
ale.AfterListElement(listType.ElementField(), elemPartner)
}

return visitor.List(listType, partner, elemResult)
}

func visitMapWithPartner[T, P any](m MapType, partner P, visitor SchemaWithPartnerVisitor[T, P], accessor PartnerAccessor[P]) T {
type (
beforeMapKey interface {
BeforeMapKey(NestedField, P)
}
afterMapKey interface {
AfterMapKey(NestedField, P)
}

beforeMapValue interface {
BeforeMapValue(NestedField, P)
}
afterMapValue interface {
AfterMapValue(NestedField, P)
}
)

keyPartner := accessor.MapKeyPartner(partner)
if bmk, ok := visitor.(beforeMapKey); ok {
bmk.BeforeMapKey(m.KeyField(), keyPartner)
}
keyResult := visitTypeWithPartner(m.KeyType, keyPartner, visitor, accessor)
if amk, ok := visitor.(afterMapKey); ok {
amk.AfterMapKey(m.KeyField(), keyPartner)
}

valPartner := accessor.MapValuePartner(partner)
if bmv, ok := visitor.(beforeMapValue); ok {
bmv.BeforeMapValue(m.ValueField(), valPartner)
}
valResult := visitTypeWithPartner(m.ValueType, valPartner, visitor, accessor)
if amv, ok := visitor.(afterMapValue); ok {
amv.AfterMapValue(m.ValueField(), valPartner)
}

return visitor.Map(m, partner, keyResult, valResult)
}

func visitTypeWithPartner[T, P any](t Type, fieldPartner P, visitor SchemaWithPartnerVisitor[T, P], accessor PartnerAccessor[P]) T {
switch t := t.(type) {
case *ListType:
return visitListWithPartner(*t, fieldPartner, visitor, accessor)
case *StructType:
return visitStructWithPartner(*t, fieldPartner, visitor, accessor)
case *MapType:
return visitMapWithPartner(*t, fieldPartner, visitor, accessor)
default:
return visitor.Primitive(t.(PrimitiveType), fieldPartner)
}
}
4 changes: 2 additions & 2 deletions schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func TestNestedFieldToString(t *testing.T) {
{2, "3: baz: optional boolean"},
{3, "4: qux: required list<string>"},
{4, "6: quux: required map<string, map<string, int>>"},
{5, "11: location: required list<struct<13: latitude: float, 14: longitude: float>>"},
{6, "15: person: optional struct<16: name: string, 17: age: required int>"},
{5, "11: location: required list<struct<13: latitude: optional float, 14: longitude: optional float>>"},
{6, "15: person: optional struct<16: name: optional string, 17: age: required int>"},
}

for _, tt := range tests {
Expand Down
Loading

0 comments on commit 4929eea

Please sign in to comment.