From 92839b020695860ad72feda62815eadf8073e40c Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Thu, 26 Sep 2024 19:10:07 -0400 Subject: [PATCH 1/8] feat(table): add conversion from Arrow Schema to Iceberg --- go.mod | 38 ++-- go.sum | 70 +++++-- literals.go | 4 +- literals_test.go | 4 +- schema.go | 146 +++++++++++++ table/arrow_utils.go | 406 +++++++++++++++++++++++++++++++++++++ table/arrow_utils_test.go | 371 +++++++++++++++++++++++++++++++++ table/name_mapping.go | 296 +++++++++++++++++++++++++++ table/name_mapping_test.go | 145 +++++++++++++ transforms.go | 2 +- types.go | 4 +- visitors_test.go | 2 +- 12 files changed, 1447 insertions(+), 41 deletions(-) create mode 100644 table/arrow_utils.go create mode 100644 table/arrow_utils_test.go create mode 100644 table/name_mapping.go create mode 100644 table/name_mapping_test.go diff --git a/go.mod b/go.mod index 4ed0447..8aef72b 100644 --- a/go.mod +++ b/go.mod @@ -17,10 +17,12 @@ module github.com/apache/iceberg-go -go 1.21 +go 1.22.0 + +toolchain go1.23.1 require ( - github.com/apache/arrow/go/v16 v16.1.0 + github.com/apache/arrow-go/v18 v18.0.0-20240924011512-14844aea3205 github.com/aws/aws-sdk-go-v2 v1.30.5 github.com/aws/aws-sdk-go-v2/config v1.27.33 github.com/aws/aws-sdk-go-v2/credentials v1.17.32 @@ -29,18 +31,22 @@ require ( github.com/aws/smithy-go v1.20.4 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 github.com/google/uuid v1.6.0 - github.com/hamba/avro/v2 v2.23.0 + github.com/hamba/avro/v2 v2.25.1 github.com/pterm/pterm v0.12.79 github.com/stretchr/testify v1.9.0 github.com/twmb/murmur3 v1.1.8 github.com/wolfeidau/s3iofs v1.5.2 - golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 + golang.org/x/sync v0.8.0 ) require ( atomicgo.dev/cursor v0.2.0 // indirect atomicgo.dev/keyboard v0.2.9 // indirect atomicgo.dev/schedule v0.1.0 // indirect + github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect + github.com/andybalholm/brotli v1.1.0 // indirect + github.com/apache/thrift v0.20.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 // indirect @@ -56,28 +62,36 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.30.7 // indirect github.com/containerd/console v1.0.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/goccy/go-json v0.10.2 // indirect + github.com/goccy/go-json v0.10.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/gookit/color v1.5.4 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/lithammer/fuzzysearch v1.1.8 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect + github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect - golang.org/x/mod v0.19.0 // indirect - golang.org/x/net v0.27.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.22.0 // indirect - golang.org/x/term v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect - golang.org/x/tools v0.23.0 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/mod v0.21.0 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/term v0.24.0 // indirect + golang.org/x/text v0.18.0 // indirect + golang.org/x/tools v0.25.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect + google.golang.org/grpc v1.63.2 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 35add03..d7cd9dc 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ atomicgo.dev/keyboard v0.2.9 h1:tOsIid3nlPLZ3lwgG8KZMp/SFmr7P0ssEN5JUsm78K8= atomicgo.dev/keyboard v0.2.9/go.mod h1:BC4w9g00XkxH/f1HXhW2sXmJFOCWbKn9xrOunSFtExQ= atomicgo.dev/schedule v0.1.0 h1:nTthAbhZS5YZmgYbb2+DH8uQIZcTlIrd4eYr3UQxEjs= atomicgo.dev/schedule v0.1.0/go.mod h1:xeUa3oAkiuHYh8bKiQBRojqAMq3PXXbJujjb0hw8pEU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/MarvinJWendt/testza v0.1.0/go.mod h1:7AxNvlfeHP7Z/hDQ5JtE3OKYT3XFUeLCDE2DQninSqs= github.com/MarvinJWendt/testza v0.2.1/go.mod h1:God7bhG8n6uQxwdScay+gjm9/LnO4D3kkcZX4hv9Rp8= github.com/MarvinJWendt/testza v0.2.8/go.mod h1:nwIcjmr0Zz+Rcwfh3/4UhBp7ePKVhuBExvZqnKYWlII= @@ -15,8 +17,12 @@ github.com/MarvinJWendt/testza v0.3.0/go.mod h1:eFcL4I0idjtIx8P9C6KkAuLgATNKpX4/ github.com/MarvinJWendt/testza v0.4.2/go.mod h1:mSdhXiKH8sg/gQehJ63bINcCKp7RtYewEjXsvsVUPbE= github.com/MarvinJWendt/testza v0.5.2 h1:53KDo64C1z/h/d/stCYCPY69bt/OSwjq5KpFNwi+zB4= github.com/MarvinJWendt/testza v0.5.2/go.mod h1:xu53QFE5sCdjtMCKk8YMQ2MnymimEctc4n3EjyIYvEY= -github.com/apache/arrow/go/v16 v16.1.0 h1:dwgfOya6s03CzH9JrjCBx6bkVb4yPD4ma3haj9p7FXI= -github.com/apache/arrow/go/v16 v16.1.0/go.mod h1:9wnc9mn6vEDTRIm4+27pEjQpRKuTvBaessPoEXQzxWA= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/apache/arrow-go/v18 v18.0.0-20240924011512-14844aea3205 h1:/tq9JMJI+i/MO016cGVdKn9c7od1/Ui2uwF78vojPW4= +github.com/apache/arrow-go/v18 v18.0.0-20240924011512-14844aea3205/go.mod h1:MXqyiBhPPITRK1sWzJeXiPh8S+xSCAJVlmzTeMY7l1M= +github.com/apache/thrift v0.20.0 h1:631+KvYbsBZxmuJjYwhezVsrfc/TbqtZV4QcxOX1fOI= +github.com/apache/thrift v0.20.0/go.mod h1:hOk1BQqcp2OLzGsyVXdfMk7YFlMxK3aoEVhjD06QhB8= github.com/atomicgo/cursor v0.0.1/go.mod h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk= github.com/aws/aws-sdk-go-v2 v1.30.5 h1:mWSRTwQAb0aLE17dSzztCVJWI9+cRMgqebndjwDyK0g= github.com/aws/aws-sdk-go-v2 v1.30.5/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= @@ -63,12 +69,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= -github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= -github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -76,17 +84,19 @@ github.com/gookit/color v1.4.2/go.mod h1:fqRyamkC1W8uxl+lxCQxOT09l/vYfZ+QeiX3rKQ github.com/gookit/color v1.5.0/go.mod h1:43aQb+Zerm/BWh2GnrgOQm7ffz7tvQXEKV6BFMl7wAo= github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0= github.com/gookit/color v1.5.4/go.mod h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w= -github.com/hamba/avro/v2 v2.23.0 h1:DYWz6UqNCi21JflaZlcwNfW+rK+D/CwnrWWJtfmO4vw= -github.com/hamba/avro/v2 v2.23.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI= +github.com/hamba/avro/v2 v2.25.1 h1:t8cOyv0wkNAPF6/khArMtR0nK9HtGa+WKbp9q+KdFZQ= +github.com/hamba/avro/v2 v2.25.1/go.mod h1:I8glyswHnpED3Nlx2ZdUe+4LJnCOOyiCzLMno9i/Uu0= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.10/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= -github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= -github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -98,6 +108,10 @@ github.com/lithammer/fuzzysearch v1.1.8/go.mod h1:IdqeyBClc3FFqSzYq/MXESsS4S0FsZ github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -105,6 +119,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pterm/pterm v0.12.27/go.mod h1:PhQ89w4i95rhgE+xedAoqous6K9X+r6aSOI2eFF7DZI= @@ -138,6 +154,8 @@ github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1z github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -146,19 +164,19 @@ golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUF golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8= -golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= +golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -169,31 +187,39 @@ golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= -golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= +golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= +golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg= -golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI= +golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE= +golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= +gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/literals.go b/literals.go index 1150524..2e16d02 100644 --- a/literals.go +++ b/literals.go @@ -31,8 +31,8 @@ import ( "time" "unsafe" - "github.com/apache/arrow/go/v16/arrow" - "github.com/apache/arrow/go/v16/arrow/decimal128" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/decimal128" "github.com/google/uuid" ) diff --git a/literals_test.go b/literals_test.go index 1f9baa9..4dbb7f2 100644 --- a/literals_test.go +++ b/literals_test.go @@ -23,8 +23,8 @@ import ( "testing" "time" - "github.com/apache/arrow/go/v16/arrow" - "github.com/apache/arrow/go/v16/arrow/decimal128" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/decimal128" "github.com/apache/iceberg-go" "github.com/google/uuid" "github.com/stretchr/testify/assert" diff --git a/schema.go b/schema.go index a204b54..4748184 100644 --- a/schema.go +++ b/schema.go @@ -1019,3 +1019,149 @@ func (buildPosAccessors) Primitive(PrimitiveType) map[int]accessor { func buildAccessors(schema *Schema) (map[int]accessor, error) { return Visit(schema, buildPosAccessors{}) } + +type SchemaWithPartnerVisitor[T, P any] interface { + Schema(sc *Schema, schemaPartner P, structResult T) T + Struct(st StructType, structPartner P, fieldResults []T) T + Field(field NestedField, fieldPartner P, fieldResult T) T + List(l ListType, listPartner P, elemResult T) T + Map(m MapType, mapPartner P, keyResult, valResult T) T + Primitive(p PrimitiveType, primitivePartner P) T +} + +type PartnerAccessor[P any] interface { + SchemaPartner(P) P + FieldPartner(partnerStruct P, fieldID int, fieldName string) P + ListElementPartner(P) P + MapKeyPartner(P) P + MapValuePartner(P) P +} + +func VisitSchemaWithPartner[T, P any](sc *Schema, partner P, visitor SchemaWithPartnerVisitor[T, P], accessor PartnerAccessor[P]) (res T, err error) { + if sc == nil { + err = fmt.Errorf("%w: cannot visit nil schema", ErrInvalidArgument) + return + } + + if visitor == nil || accessor == nil { + err = fmt.Errorf("%w: cannot visit with nil visitor or accessor", ErrInvalidArgument) + return + } + + defer func() { + if r := recover(); r != nil { + switch e := r.(type) { + case string: + err = fmt.Errorf("error encountered during schema visitor: %s", e) + case error: + err = fmt.Errorf("error encountered during schema visitor: %w", e) + } + } + }() + + structPartner := accessor.SchemaPartner(partner) + return visitor.Schema(sc, partner, visitStructWithPartner(sc.AsStruct(), structPartner, visitor, accessor)), nil +} + +func visitStructWithPartner[T, P any](st StructType, partner P, visitor SchemaWithPartnerVisitor[T, P], accessor PartnerAccessor[P]) T { + type ( + beforeField interface { + BeforeField(NestedField, P) + } + afterField interface { + AfterField(NestedField, P) + } + ) + + bf, _ := visitor.(beforeField) + af, _ := visitor.(afterField) + + fieldResults := make([]T, len(st.FieldList)) + + for i, f := range st.FieldList { + fieldPartner := accessor.FieldPartner(partner, f.ID, f.Name) + if bf != nil { + bf.BeforeField(f, fieldPartner) + } + fieldResult := visitTypeWithPartner(f.Type, fieldPartner, visitor, accessor) + fieldResults[i] = visitor.Field(f, fieldPartner, fieldResult) + if af != nil { + af.AfterField(f, fieldPartner) + } + } + + return visitor.Struct(st, partner, fieldResults) +} + +func visitListWithPartner[T, P any](listType ListType, partner P, visitor SchemaWithPartnerVisitor[T, P], accessor PartnerAccessor[P]) T { + type ( + beforeListElem interface { + BeforeListElement(NestedField, P) + } + afterListElem interface { + AfterListElement(NestedField, P) + } + ) + + elemPartner := accessor.ListElementPartner(partner) + if ble, ok := visitor.(beforeListElem); ok { + ble.BeforeListElement(listType.ElementField(), elemPartner) + } + elemResult := visitTypeWithPartner(listType.Element, elemPartner, visitor, accessor) + if ale, ok := visitor.(afterListElem); ok { + ale.AfterListElement(listType.ElementField(), elemPartner) + } + + return visitor.List(listType, partner, elemResult) +} + +func visitMapWithPartner[T, P any](m MapType, partner P, visitor SchemaWithPartnerVisitor[T, P], accessor PartnerAccessor[P]) T { + type ( + beforeMapKey interface { + BeforeMapKey(NestedField, P) + } + afterMapKey interface { + AfterMapKey(NestedField, P) + } + + beforeMapValue interface { + BeforeMapValue(NestedField, P) + } + afterMapValue interface { + AfterMapValue(NestedField, P) + } + ) + + keyPartner := accessor.MapKeyPartner(partner) + if bmk, ok := visitor.(beforeMapKey); ok { + bmk.BeforeMapKey(m.KeyField(), keyPartner) + } + keyResult := visitTypeWithPartner(m.KeyType, keyPartner, visitor, accessor) + if amk, ok := visitor.(afterMapKey); ok { + amk.AfterMapKey(m.KeyField(), keyPartner) + } + + valPartner := accessor.MapValuePartner(partner) + if bmv, ok := visitor.(beforeMapValue); ok { + bmv.BeforeMapValue(m.ValueField(), valPartner) + } + valResult := visitTypeWithPartner(m.ValueType, valPartner, visitor, accessor) + if amv, ok := visitor.(afterMapValue); ok { + amv.AfterMapValue(m.ValueField(), valPartner) + } + + return visitor.Map(m, partner, keyResult, valResult) +} + +func visitTypeWithPartner[T, P any](t Type, fieldPartner P, visitor SchemaWithPartnerVisitor[T, P], accessor PartnerAccessor[P]) T { + switch t := t.(type) { + case *ListType: + return visitListWithPartner(*t, fieldPartner, visitor, accessor) + case *StructType: + return visitStructWithPartner(*t, fieldPartner, visitor, accessor) + case *MapType: + return visitMapWithPartner(*t, fieldPartner, visitor, accessor) + default: + return visitor.Primitive(t.(PrimitiveType), fieldPartner) + } +} diff --git a/table/arrow_utils.go b/table/arrow_utils.go new file mode 100644 index 0000000..7c07720 --- /dev/null +++ b/table/arrow_utils.go @@ -0,0 +1,406 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "fmt" + "slices" + "strconv" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/iceberg-go" +) + +const ( + ArrowFieldDocKey = "doc" + ArrowParquetFieldIDKey = "PARQUET:field_id" +) + +type ArrowSchemaVisitor[T any] interface { + Schema(*arrow.Schema, T) T + Struct(*arrow.StructType, []T) T + Field(arrow.Field, T) T + List(arrow.ListLikeType, T) T + Map(mt *arrow.MapType, keyResult T, valueResult T) T + Primitive(arrow.DataType) T +} + +func recoverError(err *error) { + if r := recover(); r != nil { + switch e := r.(type) { + case string: + *err = fmt.Errorf("error encountered during arrow schema visitor: %s", e) + case error: + *err = fmt.Errorf("error encountered during arrow schema visitor: %w", e) + } + } +} + +func VisitArrowSchema[T any](sc *arrow.Schema, visitor ArrowSchemaVisitor[T]) (res T, err error) { + if sc == nil { + err = fmt.Errorf("%w: cannot visit nil arrow schema", iceberg.ErrInvalidArgument) + return + } + + defer recoverError(&err) + + return visitor.Schema(sc, visitArrowStruct(arrow.StructOf(sc.Fields()...), visitor)), err +} + +func visitArrowField[T any](f arrow.Field, visitor ArrowSchemaVisitor[T]) T { + switch typ := f.Type.(type) { + case *arrow.StructType: + return visitArrowStruct(typ, visitor) + case *arrow.MapType: + return visitArrowMap(typ, visitor) + case arrow.ListLikeType: + return visitArrowList(typ, visitor) + default: + return visitor.Primitive(typ) + } +} + +func visitArrowStruct[T any](dt *arrow.StructType, visitor ArrowSchemaVisitor[T]) T { + type ( + beforeField interface { + BeforeField(arrow.Field) + } + afterField interface { + AfterField(arrow.Field) + } + ) + + results := make([]T, dt.NumFields()) + bf, _ := visitor.(beforeField) + af, _ := visitor.(afterField) + + for i, f := range dt.Fields() { + if bf != nil { + bf.BeforeField(f) + } + + res := visitArrowField(f, visitor) + + if af != nil { + af.AfterField(f) + } + + results[i] = visitor.Field(f, res) + } + + return visitor.Struct(dt, results) +} + +func visitArrowMap[T any](dt *arrow.MapType, visitor ArrowSchemaVisitor[T]) T { + type ( + beforeMapKey interface { + BeforeMapKey(arrow.Field) + } + beforeMapValue interface { + BeforeMapValue(arrow.Field) + } + afterMapKey interface { + AfterMapKey(arrow.Field) + } + afterMapValue interface { + AfterMapValue(arrow.Field) + } + ) + + key, val := dt.KeyField(), dt.ItemField() + + if bmk, ok := visitor.(beforeMapKey); ok { + bmk.BeforeMapKey(key) + } + + keyResult := visitArrowField(key, visitor) + + if amk, ok := visitor.(afterMapKey); ok { + amk.AfterMapKey(key) + } + + if bmv, ok := visitor.(beforeMapValue); ok { + bmv.BeforeMapValue(val) + } + + valueResult := visitArrowField(val, visitor) + + if amv, ok := visitor.(afterMapValue); ok { + amv.AfterMapValue(val) + } + + return visitor.Map(dt, keyResult, valueResult) +} + +func visitArrowList[T any](dt arrow.ListLikeType, visitor ArrowSchemaVisitor[T]) T { + type ( + beforeListElem interface { + BeforeListElement(arrow.Field) + } + afterListElem interface { + AfterListElement(arrow.Field) + } + ) + + elemField := dt.ElemField() + + if bl, ok := visitor.(beforeListElem); ok { + bl.BeforeListElement(elemField) + } + + res := visitArrowField(elemField, visitor) + + if al, ok := visitor.(afterListElem); ok { + al.AfterListElement(elemField) + } + + return visitor.List(dt, res) +} + +func getFieldID(f arrow.Field) *int { + if !f.HasMetadata() { + return nil + } + + fieldIDStr, ok := f.Metadata.GetValue(ArrowParquetFieldIDKey) + if !ok { + return nil + } + + id, err := strconv.Atoi(fieldIDStr) + if err != nil { + return nil + } + + return &id +} + +type hasIDs struct{} + +func (hasIDs) Schema(sc *arrow.Schema, result bool) bool { + return result +} + +func (hasIDs) Struct(st *arrow.StructType, results []bool) bool { + return !slices.Contains(results, false) +} + +func (hasIDs) Field(f arrow.Field, result bool) bool { + return getFieldID(f) != nil +} + +func (hasIDs) List(dt arrow.ListLikeType, elem bool) bool { + elemField := dt.ElemField() + return elem && getFieldID(elemField) != nil +} + +func (hasIDs) Map(m *arrow.MapType, key, val bool) bool { + return key && val && + getFieldID(m.KeyField()) != nil && getFieldID(m.ItemField()) != nil +} + +func (hasIDs) Primitive(arrow.DataType) bool { return true } + +type convertToIceberg struct { + downcastTimestamp bool + + fieldID func(arrow.Field) int +} + +func (convertToIceberg) Schema(_ *arrow.Schema, result iceberg.NestedField) iceberg.NestedField { + return result +} + +func (convertToIceberg) Struct(_ *arrow.StructType, results []iceberg.NestedField) iceberg.NestedField { + return iceberg.NestedField{ + Type: &iceberg.StructType{FieldList: results}, + } +} + +func (c convertToIceberg) Field(field arrow.Field, result iceberg.NestedField) iceberg.NestedField { + result.ID = c.fieldID(field) + if field.HasMetadata() { + if doc, ok := field.Metadata.GetValue(ArrowFieldDocKey); ok { + result.Doc = doc + } + } + + result.Required = !field.Nullable + result.Name = field.Name + return result +} + +func (c convertToIceberg) List(dt arrow.ListLikeType, elemResult iceberg.NestedField) iceberg.NestedField { + elemField := dt.ElemField() + elemID := c.fieldID(elemField) + + return iceberg.NestedField{ + Type: &iceberg.ListType{ + ElementID: elemID, + Element: elemResult.Type, + ElementRequired: !elemField.Nullable, + }, + } +} + +func (c convertToIceberg) Map(m *arrow.MapType, keyResult, valueResult iceberg.NestedField) iceberg.NestedField { + keyField, valField := m.KeyField(), m.ItemField() + keyID, valID := c.fieldID(keyField), c.fieldID(valField) + + return iceberg.NestedField{ + Type: &iceberg.MapType{ + KeyID: keyID, + KeyType: keyResult.Type, + ValueID: valID, + ValueType: valueResult.Type, + ValueRequired: !valField.Nullable, + }, + } +} + +var ( + utcAliases = []string{"UTC", "+00:00", "Etc/UTC", "Z"} +) + +func (c convertToIceberg) Primitive(dt arrow.DataType) (result iceberg.NestedField) { + switch dt := dt.(type) { + case *arrow.DictionaryType: + if _, ok := dt.ValueType.(arrow.NestedType); ok { + panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt)) + } + return c.Primitive(dt.ValueType) + case *arrow.RunEndEncodedType: + if _, ok := dt.Encoded().(arrow.NestedType); ok { + panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt)) + } + return c.Primitive(dt.Encoded()) + case *arrow.BooleanType: + result.Type = iceberg.PrimitiveTypes.Bool + case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, + *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type: + result.Type = iceberg.PrimitiveTypes.Int32 + case *arrow.Uint64Type, *arrow.Int64Type: + result.Type = iceberg.PrimitiveTypes.Int64 + case *arrow.Float16Type, *arrow.Float32Type: + result.Type = iceberg.PrimitiveTypes.Float32 + case *arrow.Float64Type: + result.Type = iceberg.PrimitiveTypes.Float64 + case *arrow.Decimal32Type, *arrow.Decimal64Type, *arrow.Decimal128Type: + dec := dt.(arrow.DecimalType) + result.Type = iceberg.DecimalTypeOf(int(dec.GetPrecision()), int(dec.GetScale())) + case *arrow.StringType, *arrow.LargeStringType: + result.Type = iceberg.PrimitiveTypes.String + case *arrow.BinaryType, *arrow.LargeBinaryType: + result.Type = iceberg.PrimitiveTypes.Binary + case *arrow.Date32Type: + result.Type = iceberg.PrimitiveTypes.Date + case *arrow.Time64Type: + if dt.Unit == arrow.Microsecond { + result.Type = iceberg.PrimitiveTypes.Time + } else { + panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt)) + } + case *arrow.TimestampType: + if dt.Unit == arrow.Nanosecond { + if !c.downcastTimestamp { + panic(fmt.Errorf("%w: 'ns' timestamp precision not supported", iceberg.ErrType)) + } + // TODO: log something + } + + if slices.Contains(utcAliases, dt.TimeZone) { + result.Type = iceberg.PrimitiveTypes.TimestampTz + } else if dt.TimeZone == "" { + result.Type = iceberg.PrimitiveTypes.Timestamp + } else { + panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt)) + } + case *arrow.FixedSizeBinaryType: + result.Type = iceberg.FixedTypeOf(dt.ByteWidth) + case arrow.ExtensionType: + if dt.ExtensionName() == "arrow.uuid" { + result.Type = iceberg.PrimitiveTypes.UUID + } else { + panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt)) + } + default: + panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt)) + } + + return +} + +func ArrowTypeToIceberg(dt arrow.DataType, downcastNsTimestamp bool) (iceberg.Type, error) { + sc := arrow.NewSchema([]arrow.Field{{Type: dt, + Metadata: arrow.NewMetadata([]string{ArrowParquetFieldIDKey}, []string{"1"})}}, nil) + + out, err := VisitArrowSchema(sc, convertToIceberg{ + downcastTimestamp: downcastNsTimestamp, + fieldID: func(field arrow.Field) int { + if id := getFieldID(field); id != nil { + return *id + } + + panic(fmt.Errorf("%w: cannot convert %s to Iceberg field, missing field_id", + iceberg.ErrInvalidSchema, field)) + }, + }) + if err != nil { + return nil, err + } + + return out.Type.(*iceberg.StructType).FieldList[0].Type, nil +} + +func ArrowSchemaToIceberg(sc *arrow.Schema, downcastNsTimestamp bool, nameMapping NameMapping) (*iceberg.Schema, error) { + hasIDs, _ := VisitArrowSchema(sc, hasIDs{}) + + switch { + case hasIDs: + out, err := VisitArrowSchema(sc, convertToIceberg{ + downcastTimestamp: downcastNsTimestamp, + fieldID: func(field arrow.Field) int { + if id := getFieldID(field); id != nil { + return *id + } + + panic(fmt.Errorf("%w: cannot convert %s to Iceberg field, missing field_id", + iceberg.ErrInvalidSchema, field)) + }, + }) + if err != nil { + return nil, err + } + + return iceberg.NewSchema(0, out.Type.(*iceberg.StructType).FieldList...), nil + case nameMapping != nil: + withoutIDs, err := VisitArrowSchema(sc, convertToIceberg{ + downcastTimestamp: downcastNsTimestamp, + fieldID: func(_ arrow.Field) int { return -1 }, + }) + if err != nil { + return nil, err + } + + schemaWithoutIDs := iceberg.NewSchema(0, withoutIDs.Type.(*iceberg.StructType).FieldList...) + return ApplyNameMapping(schemaWithoutIDs, nameMapping) + default: + return nil, fmt.Errorf("%w: arrow schema does not have field-ids and no name mapping provided", + iceberg.ErrInvalidSchema) + } +} diff --git a/table/arrow_utils_test.go b/table/arrow_utils_test.go new file mode 100644 index 0000000..1d8173e --- /dev/null +++ b/table/arrow_utils_test.go @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/extensions" + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func fieldIDMeta(id string) arrow.Metadata { + return arrow.MetadataFrom(map[string]string{table.ArrowParquetFieldIDKey: id}) +} + +func TestArrowToIceberg(t *testing.T) { + tests := []struct { + dt arrow.DataType + ice iceberg.Type + err string + }{ + {&arrow.FixedSizeBinaryType{ByteWidth: 23}, iceberg.FixedTypeOf(23), ""}, + {&arrow.Decimal32Type{Precision: 8, Scale: 9}, iceberg.DecimalTypeOf(8, 9), ""}, + {&arrow.Decimal64Type{Precision: 15, Scale: 14}, iceberg.DecimalTypeOf(15, 14), ""}, + {&arrow.Decimal128Type{Precision: 26, Scale: 20}, iceberg.DecimalTypeOf(26, 20), ""}, + {&arrow.Decimal256Type{Precision: 8, Scale: 9}, nil, "unsupported arrow type for conversion - decimal256(8, 9)"}, + {arrow.FixedWidthTypes.Boolean, iceberg.PrimitiveTypes.Bool, ""}, + {arrow.PrimitiveTypes.Int8, iceberg.PrimitiveTypes.Int32, ""}, + {arrow.PrimitiveTypes.Uint8, iceberg.PrimitiveTypes.Int32, ""}, + {arrow.PrimitiveTypes.Int16, iceberg.PrimitiveTypes.Int32, ""}, + {arrow.PrimitiveTypes.Uint16, iceberg.PrimitiveTypes.Int32, ""}, + {arrow.PrimitiveTypes.Int32, iceberg.PrimitiveTypes.Int32, ""}, + {arrow.PrimitiveTypes.Uint32, iceberg.PrimitiveTypes.Int32, ""}, + {arrow.PrimitiveTypes.Int64, iceberg.PrimitiveTypes.Int64, ""}, + {arrow.PrimitiveTypes.Uint64, iceberg.PrimitiveTypes.Int64, ""}, + {arrow.FixedWidthTypes.Float16, iceberg.PrimitiveTypes.Float32, ""}, + {arrow.PrimitiveTypes.Float32, iceberg.PrimitiveTypes.Float32, ""}, + {arrow.PrimitiveTypes.Float64, iceberg.PrimitiveTypes.Float64, ""}, + {arrow.FixedWidthTypes.Date32, iceberg.PrimitiveTypes.Date, ""}, + {arrow.FixedWidthTypes.Date64, nil, "unsupported arrow type for conversion - date64"}, + {arrow.FixedWidthTypes.Time32s, nil, "unsupported arrow type for conversion - time32[s]"}, + {arrow.FixedWidthTypes.Time32ms, nil, "unsupported arrow type for conversion - time32[ms]"}, + {arrow.FixedWidthTypes.Time64us, iceberg.PrimitiveTypes.Time, ""}, + {arrow.FixedWidthTypes.Time64ns, nil, "unsupported arrow type for conversion - time64[ns]"}, + {arrow.FixedWidthTypes.Timestamp_s, iceberg.PrimitiveTypes.TimestampTz, ""}, + {arrow.FixedWidthTypes.Timestamp_ms, iceberg.PrimitiveTypes.TimestampTz, ""}, + {arrow.FixedWidthTypes.Timestamp_us, iceberg.PrimitiveTypes.TimestampTz, ""}, + {arrow.FixedWidthTypes.Timestamp_ns, nil, "'ns' timestamp precision not supported"}, + {&arrow.TimestampType{Unit: arrow.Second}, iceberg.PrimitiveTypes.Timestamp, ""}, + {&arrow.TimestampType{Unit: arrow.Millisecond}, iceberg.PrimitiveTypes.Timestamp, ""}, + {&arrow.TimestampType{Unit: arrow.Microsecond}, iceberg.PrimitiveTypes.Timestamp, ""}, + {&arrow.TimestampType{Unit: arrow.Nanosecond}, nil, "'ns' timestamp precision not supported"}, + {&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "US/Pacific"}, nil, "unsupported arrow type for conversion - timestamp[us, tz=US/Pacific]"}, + {arrow.BinaryTypes.String, iceberg.PrimitiveTypes.String, ""}, + {arrow.BinaryTypes.LargeString, iceberg.PrimitiveTypes.String, ""}, + {arrow.BinaryTypes.StringView, nil, "unsupported arrow type for conversion - string_view"}, + {arrow.BinaryTypes.Binary, iceberg.PrimitiveTypes.Binary, ""}, + {arrow.BinaryTypes.LargeBinary, iceberg.PrimitiveTypes.Binary, ""}, + {arrow.BinaryTypes.BinaryView, nil, "unsupported arrow type for conversion - binary_view"}, + {extensions.NewUUIDType(), iceberg.PrimitiveTypes.UUID, ""}, + {arrow.StructOf(arrow.Field{ + Name: "foo", + Type: arrow.BinaryTypes.LargeString, + Nullable: true, + Metadata: arrow.MetadataFrom(map[string]string{ + table.ArrowParquetFieldIDKey: "1", table.ArrowFieldDocKey: "foo doc", + }), + }, arrow.Field{ + Name: "bar", + Type: arrow.PrimitiveTypes.Int32, + Metadata: fieldIDMeta("2"), + }, arrow.Field{ + Name: "baz", + Type: arrow.FixedWidthTypes.Boolean, + Nullable: true, + Metadata: fieldIDMeta("3"), + }), &iceberg.StructType{ + FieldList: []iceberg.NestedField{ + {ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String, Required: false, Doc: "foo doc"}, + {ID: 2, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + {ID: 3, Name: "baz", Type: iceberg.PrimitiveTypes.Bool, Required: false}, + }}, ""}, + {arrow.ListOfField(arrow.Field{ + Name: "element", + Type: arrow.PrimitiveTypes.Int32, + Nullable: false, + Metadata: fieldIDMeta("1"), + }), &iceberg.ListType{ + ElementID: 1, + Element: iceberg.PrimitiveTypes.Int32, + ElementRequired: true, + }, ""}, + {arrow.LargeListOfField(arrow.Field{ + Name: "element", + Type: arrow.PrimitiveTypes.Int32, + Nullable: false, + Metadata: fieldIDMeta("1"), + }), &iceberg.ListType{ + ElementID: 1, + Element: iceberg.PrimitiveTypes.Int32, + ElementRequired: true, + }, ""}, + {arrow.FixedSizeListOfField(1, arrow.Field{ + Name: "element", + Type: arrow.PrimitiveTypes.Int32, + Nullable: false, + Metadata: fieldIDMeta("1"), + }), &iceberg.ListType{ + ElementID: 1, + Element: iceberg.PrimitiveTypes.Int32, + ElementRequired: true, + }, ""}, + {arrow.MapOfWithMetadata(arrow.PrimitiveTypes.Int32, + fieldIDMeta("1"), + arrow.BinaryTypes.String, fieldIDMeta("2")), + &iceberg.MapType{ + KeyID: 1, KeyType: iceberg.PrimitiveTypes.Int32, + ValueID: 2, ValueType: iceberg.PrimitiveTypes.String, ValueRequired: false, + }, ""}, + {&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, + ValueType: arrow.BinaryTypes.String}, iceberg.PrimitiveTypes.String, ""}, + {&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, + ValueType: arrow.PrimitiveTypes.Int32}, iceberg.PrimitiveTypes.Int32, ""}, + {&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int64, + ValueType: arrow.PrimitiveTypes.Float64}, iceberg.PrimitiveTypes.Float64, ""}, + {arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.BinaryTypes.String), iceberg.PrimitiveTypes.String, ""}, + {arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Float64), iceberg.PrimitiveTypes.Float64, ""}, + {arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Int16), iceberg.PrimitiveTypes.Int32, ""}, + } + + for _, tt := range tests { + t.Run(tt.dt.String(), func(t *testing.T) { + out, err := table.ArrowTypeToIceberg(tt.dt, false) + if tt.err == "" { + require.NoError(t, err) + assert.True(t, out.Equals(tt.ice), out.String(), tt.ice.String()) + } else { + assert.ErrorContains(t, err, tt.err) + } + }) + } +} + +func TestArrowSchemaToIceb(t *testing.T) { + tests := []struct { + name string + sc *arrow.Schema + expected string + err string + }{ + {"simple", arrow.NewSchema([]arrow.Field{ + {Name: "foo", Nullable: true, Type: arrow.BinaryTypes.String, + Metadata: fieldIDMeta("1")}, + {Name: "bar", Nullable: false, Type: arrow.PrimitiveTypes.Int32, + Metadata: fieldIDMeta("2")}, + {Name: "baz", Nullable: true, Type: arrow.FixedWidthTypes.Boolean, + Metadata: fieldIDMeta("3")}, + }, nil), `table { + 1: foo: optional string + 2: bar: required int + 3: baz: optional boolean +}`, ""}, + {"nested", arrow.NewSchema([]arrow.Field{ + {Name: "qux", Nullable: false, Metadata: fieldIDMeta("4"), + Type: arrow.ListOfField(arrow.Field{ + Name: "element", + Type: arrow.BinaryTypes.String, + Metadata: fieldIDMeta("5"), + })}, + {Name: "quux", Nullable: false, Metadata: fieldIDMeta("6"), + Type: arrow.MapOfWithMetadata(arrow.BinaryTypes.String, fieldIDMeta("7"), + arrow.MapOfWithMetadata(arrow.BinaryTypes.String, fieldIDMeta("9"), + arrow.PrimitiveTypes.Int32, fieldIDMeta("10")), fieldIDMeta("8"))}, + {Name: "location", Nullable: false, Metadata: fieldIDMeta("11"), + Type: arrow.ListOfField( + arrow.Field{ + Name: "element", Metadata: fieldIDMeta("12"), + Type: arrow.StructOf( + arrow.Field{Name: "latitude", Nullable: true, + Type: arrow.PrimitiveTypes.Float32, Metadata: fieldIDMeta("13")}, + arrow.Field{Name: "longitude", Nullable: true, + Type: arrow.PrimitiveTypes.Float32, Metadata: fieldIDMeta("14")}, + )})}, + {Name: "person", Nullable: true, Metadata: fieldIDMeta("15"), + Type: arrow.StructOf( + arrow.Field{Name: "name", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: fieldIDMeta("16")}, + arrow.Field{Name: "age", Type: arrow.PrimitiveTypes.Int32, Metadata: fieldIDMeta("17")}, + )}, + }, nil), `table { + 4: qux: required list + 6: quux: required map> + 11: location: required list> + 15: person: optional struct<16: name: optional string, 17: age: required int> +}`, ""}, + {"missing ids", arrow.NewSchema([]arrow.Field{ + {Name: "foo", Type: arrow.BinaryTypes.String, Nullable: false}, + }, nil), "", "arrow schema does not have field-ids and no name mapping provided"}, + {"missing ids partial", arrow.NewSchema([]arrow.Field{ + {Name: "foo", Type: arrow.BinaryTypes.String, Metadata: fieldIDMeta("1")}, + {Name: "bar", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + }, nil), "", "arrow schema does not have field-ids and no name mapping provided"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out, err := table.ArrowSchemaToIceberg(tt.sc, true, nil) + if tt.err == "" { + require.NoError(t, err) + assert.Equal(t, tt.expected, out.String()) + } else { + assert.ErrorContains(t, err, tt.err) + } + }) + } +} + +func makeID(v int) *int { return &v } + +var ( + icebergSchemaNested = iceberg.NewSchema(0, + iceberg.NestedField{ + ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String, Required: true}, + iceberg.NestedField{ + ID: 2, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ + ID: 3, Name: "baz", Type: iceberg.PrimitiveTypes.Bool, Required: false}, + iceberg.NestedField{ + ID: 4, Name: "qux", Required: true, Type: &iceberg.ListType{ + ElementID: 5, Element: iceberg.PrimitiveTypes.String, ElementRequired: false}}, + iceberg.NestedField{ + ID: 6, Name: "quux", + Type: &iceberg.MapType{ + KeyID: 7, + KeyType: iceberg.PrimitiveTypes.String, + ValueID: 8, + ValueType: &iceberg.MapType{ + KeyID: 9, + KeyType: iceberg.PrimitiveTypes.String, + ValueID: 10, + ValueType: iceberg.PrimitiveTypes.Int32, + ValueRequired: false, + }, + ValueRequired: false, + }, + Required: true}, + iceberg.NestedField{ + ID: 11, Name: "location", Type: &iceberg.ListType{ + ElementID: 12, Element: &iceberg.StructType{ + FieldList: []iceberg.NestedField{ + {ID: 13, Name: "latitude", Type: iceberg.PrimitiveTypes.Float32, Required: true}, + {ID: 14, Name: "longitude", Type: iceberg.PrimitiveTypes.Float32, Required: true}, + }, + }, + ElementRequired: false}, + Required: true}, + iceberg.NestedField{ + ID: 15, + Name: "person", + Type: &iceberg.StructType{ + FieldList: []iceberg.NestedField{ + {ID: 16, Name: "name", Type: iceberg.PrimitiveTypes.String, Required: false}, + {ID: 17, Name: "age", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + }, + }, + Required: false, + }, + ) + + icebergSchemaSimple = iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 2, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 3, Name: "baz", Type: iceberg.PrimitiveTypes.Bool}, + ) +) + +func TestArrowSchemaWithNameMapping(t *testing.T) { + schemaWithoutIDs := arrow.NewSchema([]arrow.Field{ + {Name: "foo", Type: arrow.BinaryTypes.String, Nullable: true}, + {Name: "bar", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + {Name: "baz", Type: arrow.FixedWidthTypes.Boolean, Nullable: true}, + }, nil) + + schemaNestedWithoutIDs := arrow.NewSchema([]arrow.Field{ + {Name: "foo", Type: arrow.BinaryTypes.String, Nullable: false}, + {Name: "bar", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + {Name: "baz", Type: arrow.FixedWidthTypes.Boolean, Nullable: true}, + {Name: "qux", Type: arrow.ListOf(arrow.BinaryTypes.String), Nullable: false}, + {Name: "quux", Type: arrow.MapOf(arrow.BinaryTypes.String, + arrow.MapOf(arrow.BinaryTypes.String, arrow.PrimitiveTypes.Int32)), Nullable: false}, + {Name: "location", Type: arrow.ListOf(arrow.StructOf( + arrow.Field{Name: "latitude", Type: arrow.PrimitiveTypes.Float32, Nullable: false}, + arrow.Field{Name: "longitude", Type: arrow.PrimitiveTypes.Float32, Nullable: false}, + )), Nullable: false}, + {Name: "person", Type: arrow.StructOf( + arrow.Field{Name: "name", Type: arrow.BinaryTypes.String, Nullable: true}, + arrow.Field{Name: "age", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + ), Nullable: true}, + }, nil) + + tests := []struct { + name string + schema *arrow.Schema + mapping table.NameMapping + expected *iceberg.Schema + err string + }{ + {"simple", schemaWithoutIDs, table.NameMapping{ + {FieldID: makeID(1), Names: []string{"foo"}}, + {FieldID: makeID(2), Names: []string{"bar"}}, + {FieldID: makeID(3), Names: []string{"baz"}}, + }, icebergSchemaSimple, ""}, + {"field missing", schemaWithoutIDs, table.NameMapping{ + {FieldID: makeID(1), Names: []string{"foo"}}, + }, nil, "field missing from name mapping: bar"}, + {"nested schema", schemaNestedWithoutIDs, table.NameMapping{ + {FieldID: makeID(1), Names: []string{"foo"}}, + {FieldID: makeID(2), Names: []string{"bar"}}, + {FieldID: makeID(3), Names: []string{"baz"}}, + {FieldID: makeID(4), Names: []string{"qux"}, + Fields: []table.MappedField{{FieldID: makeID(5), Names: []string{"element"}}}}, + {FieldID: makeID(6), Names: []string{"quux"}, Fields: []table.MappedField{ + {FieldID: makeID(7), Names: []string{"key"}}, + {FieldID: makeID(8), Names: []string{"value"}, Fields: []table.MappedField{ + {FieldID: makeID(9), Names: []string{"key"}}, + {FieldID: makeID(10), Names: []string{"value"}}, + }}, + }}, + {FieldID: makeID(11), Names: []string{"location"}, Fields: []table.MappedField{ + {FieldID: makeID(12), Names: []string{"element"}, Fields: []table.MappedField{ + {FieldID: makeID(13), Names: []string{"latitude"}}, + {FieldID: makeID(14), Names: []string{"longitude"}}, + }}, + }}, + {FieldID: makeID(15), Names: []string{"person"}, Fields: []table.MappedField{ + {FieldID: makeID(16), Names: []string{"name"}}, + {FieldID: makeID(17), Names: []string{"age"}}, + }}, + }, icebergSchemaNested, ""}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out, err := table.ArrowSchemaToIceberg(tt.schema, false, tt.mapping) + if tt.err != "" { + assert.ErrorContains(t, err, tt.err) + } else { + require.NoError(t, err) + assert.True(t, tt.expected.Equals(out), out.String(), tt.expected.String()) + } + }) + } +} diff --git a/table/name_mapping.go b/table/name_mapping.go new file mode 100644 index 0000000..b71b7d3 --- /dev/null +++ b/table/name_mapping.go @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "fmt" + "slices" + "strconv" + "strings" + + "github.com/apache/iceberg-go" +) + +type MappedField struct { + Names []string `json:"names"` + // iceberg spec says this is optional, but I don't see any examples + // of this being left empty. Does pyiceberg need to be updated or should + // the spec not say field-id is optional? + FieldID *int `json:"field-id,omitempty"` + Fields []MappedField `json:"fields,omitempty"` +} + +func (m *MappedField) Len() int { return len(m.Fields) } + +func (m *MappedField) String() string { + var bldr strings.Builder + bldr.WriteString("([") + bldr.WriteString(strings.Join(m.Names, ", ")) + bldr.WriteString("] -> ") + + if m.FieldID != nil { + bldr.WriteString(strconv.Itoa(*m.FieldID)) + } else { + bldr.WriteByte('?') + } + + if len(m.Fields) > 0 { + bldr.WriteByte(' ') + for i, f := range m.Fields { + if i != 0 { + bldr.WriteString(", ") + } + bldr.WriteString(f.String()) + } + } + + bldr.WriteByte(')') + return bldr.String() +} + +type NameMapping []MappedField + +func (nm NameMapping) String() string { + var bldr strings.Builder + bldr.WriteString("[\n") + for _, f := range nm { + bldr.WriteByte('\t') + bldr.WriteString(f.String()) + bldr.WriteByte('\n') + } + bldr.WriteByte(']') + return bldr.String() +} + +type NameMappingVisitor[S, T any] interface { + Mapping(nm NameMapping, fieldResults S) S + Fields(st []MappedField, fieldResults []T) S + Field(field MappedField, fieldResult S) T +} + +func VisitNameMapping[S, T any](obj NameMapping, visitor NameMappingVisitor[S, T]) (res S, err error) { + if obj == nil { + err = fmt.Errorf("%w: cannot visit nil NameMapping", iceberg.ErrInvalidArgument) + return + } + + defer recoverError(&err) + + return visitor.Mapping(obj, visitMappedFields([]MappedField(obj), visitor)), err +} + +func VisitMappedFields[S, T any](fields []MappedField, visitor NameMappingVisitor[S, T]) (res S, err error) { + defer recoverError(&err) + + return visitMappedFields(fields, visitor), err +} + +func visitMappedFields[S, T any](fields []MappedField, visitor NameMappingVisitor[S, T]) S { + results := make([]T, len(fields)) + for i, f := range fields { + results[i] = visitor.Field(f, visitMappedFields(f.Fields, visitor)) + } + + return visitor.Fields(fields, results) +} + +type NameMappingAccessor struct{} + +func (NameMappingAccessor) SchemaPartner(partner *MappedField) *MappedField { + return partner +} + +func (NameMappingAccessor) getField(p *MappedField, field string) *MappedField { + for _, f := range p.Fields { + if slices.Contains(f.Names, field) { + return &f + } + } + + return nil +} + +func (n NameMappingAccessor) FieldPartner(partnerStruct *MappedField, _ int, fieldName string) *MappedField { + if partnerStruct == nil { + return nil + } + + return n.getField(partnerStruct, fieldName) +} + +func (n NameMappingAccessor) ListElementPartner(partnerList *MappedField) *MappedField { + if partnerList == nil { + return nil + } + + return n.getField(partnerList, "element") +} + +func (n NameMappingAccessor) MapKeyPartner(partnerMap *MappedField) *MappedField { + if partnerMap == nil { + return nil + } + + return n.getField(partnerMap, "key") +} + +func (n NameMappingAccessor) MapValuePartner(partnerMap *MappedField) *MappedField { + if partnerMap == nil { + return nil + } + + return n.getField(partnerMap, "value") +} + +type nameMapProjectVisitor struct { + currentPath []string +} + +func (n *nameMapProjectVisitor) popPath() { + n.currentPath = n.currentPath[:len(n.currentPath)-1] +} + +func (n *nameMapProjectVisitor) BeforeField(f iceberg.NestedField, _ *MappedField) { + n.currentPath = append(n.currentPath, f.Name) +} + +func (n *nameMapProjectVisitor) AfterField(iceberg.NestedField, *MappedField) { + n.popPath() +} + +func (n *nameMapProjectVisitor) BeforeListElement(iceberg.NestedField, *MappedField) { + n.currentPath = append(n.currentPath, "element") +} + +func (n *nameMapProjectVisitor) AfterListElement(iceberg.NestedField, *MappedField) { + n.popPath() +} + +func (n *nameMapProjectVisitor) BeforeMapKey(iceberg.NestedField, *MappedField) { + n.currentPath = append(n.currentPath, "key") +} + +func (n *nameMapProjectVisitor) AfterMapKey(iceberg.NestedField, *MappedField) { + n.popPath() +} + +func (n *nameMapProjectVisitor) BeforeMapValue(iceberg.NestedField, *MappedField) { + n.currentPath = append(n.currentPath, "value") +} + +func (n *nameMapProjectVisitor) AfterMapValue(iceberg.NestedField, *MappedField) { + n.popPath() +} + +func (n *nameMapProjectVisitor) Schema(_ *iceberg.Schema, _ *MappedField, structResult iceberg.NestedField) iceberg.NestedField { + return structResult +} + +func (n *nameMapProjectVisitor) Struct(_ iceberg.StructType, _ *MappedField, fieldResults []iceberg.NestedField) iceberg.NestedField { + return iceberg.NestedField{ + Type: &iceberg.StructType{FieldList: fieldResults}, + } +} + +func (n *nameMapProjectVisitor) Field(field iceberg.NestedField, fieldPartner *MappedField, fieldResult iceberg.NestedField) iceberg.NestedField { + if fieldPartner == nil { + panic(fmt.Errorf("%w: field missing from name mapping: %s", + iceberg.ErrInvalidArgument, strings.Join(n.currentPath, "."))) + } + + return iceberg.NestedField{ + ID: *fieldPartner.FieldID, + Name: field.Name, + Type: fieldResult.Type, + Required: field.Required, + Doc: field.Doc, + InitialDefault: field.InitialDefault, + WriteDefault: field.WriteDefault, + } +} + +func (nameMapProjectVisitor) mappedFieldID(mapped *MappedField, name string) int { + for _, f := range mapped.Fields { + if slices.Contains(f.Names, name) { + if f.FieldID != nil { + return *f.FieldID + } + return -1 + } + } + + return -1 +} + +func (n *nameMapProjectVisitor) List(lt iceberg.ListType, listPartner *MappedField, elemResult iceberg.NestedField) iceberg.NestedField { + if listPartner == nil { + panic(fmt.Errorf("%w: field missing from name mapping: %s", + iceberg.ErrInvalidArgument, strings.Join(n.currentPath, "."))) + } + + elementID := n.mappedFieldID(listPartner, "element") + + return iceberg.NestedField{ + Type: &iceberg.ListType{ + ElementID: elementID, + Element: elemResult.Type, + ElementRequired: lt.ElementRequired, + }, + } +} + +func (n *nameMapProjectVisitor) Map(m iceberg.MapType, mapPartner *MappedField, keyResult, valResult iceberg.NestedField) iceberg.NestedField { + if mapPartner == nil { + panic(fmt.Errorf("%w: field missing from name mapping: %s", + iceberg.ErrInvalidArgument, strings.Join(n.currentPath, "."))) + } + + keyID := n.mappedFieldID(mapPartner, "key") + valID := n.mappedFieldID(mapPartner, "value") + return iceberg.NestedField{ + Type: &iceberg.MapType{ + KeyID: keyID, + KeyType: keyResult.Type, + ValueID: valID, + ValueType: valResult.Type, + ValueRequired: m.ValueRequired, + }, + } +} + +func (n *nameMapProjectVisitor) Primitive(p iceberg.PrimitiveType, primitivePartner *MappedField) iceberg.NestedField { + if primitivePartner == nil { + panic(fmt.Errorf("%w: field missing from name mapping: %s", + iceberg.ErrInvalidArgument, strings.Join(n.currentPath, "."))) + } + + return iceberg.NestedField{Type: p} +} + +func ApplyNameMapping(schemaWithoutIDs *iceberg.Schema, nameMapping NameMapping) (*iceberg.Schema, error) { + top, err := iceberg.VisitSchemaWithPartner[iceberg.NestedField, *MappedField](schemaWithoutIDs, + &MappedField{Fields: nameMapping}, + &nameMapProjectVisitor{currentPath: make([]string, 0, 1)}, + NameMappingAccessor{}) + if err != nil { + return nil, err + } + + return iceberg.NewSchema(schemaWithoutIDs.ID, + top.Type.(*iceberg.StructType).FieldList...), nil +} diff --git a/table/name_mapping_test.go b/table/name_mapping_test.go new file mode 100644 index 0000000..bbef128 --- /dev/null +++ b/table/name_mapping_test.go @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "encoding/json" + "testing" + + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + tableNameMappingNested = table.NameMapping{ + {FieldID: makeID(1), Names: []string{"foo"}}, + {FieldID: makeID(2), Names: []string{"bar"}}, + {FieldID: makeID(3), Names: []string{"baz"}}, + {FieldID: makeID(4), Names: []string{"qux"}, + Fields: []table.MappedField{{FieldID: makeID(5), Names: []string{"element"}}}}, + {FieldID: makeID(6), Names: []string{"quux"}, Fields: []table.MappedField{ + {FieldID: makeID(7), Names: []string{"key"}}, + {FieldID: makeID(8), Names: []string{"value"}, Fields: []table.MappedField{ + {FieldID: makeID(9), Names: []string{"key"}}, + {FieldID: makeID(10), Names: []string{"value"}}, + }}, + }}, + {FieldID: makeID(11), Names: []string{"location"}, Fields: []table.MappedField{ + {FieldID: makeID(12), Names: []string{"element"}, Fields: []table.MappedField{ + {FieldID: makeID(13), Names: []string{"latitude"}}, + {FieldID: makeID(14), Names: []string{"longitude"}}, + }}, + }}, + {FieldID: makeID(15), Names: []string{"person"}, Fields: []table.MappedField{ + {FieldID: makeID(16), Names: []string{"name"}}, + {FieldID: makeID(17), Names: []string{"age"}}, + }}, + } +) + +func TestJsonMappedField(t *testing.T) { + tests := []struct { + name string + str string + exp table.MappedField + }{ + {"simple", `{"field-id": 1, "names": ["id", "record_id"]}`, + table.MappedField{FieldID: makeID(1), Names: []string{"id", "record_id"}}}, + {"with null fields", `{"field-id": 1, "names": ["id", "record_id"], "fields": null}`, + table.MappedField{FieldID: makeID(1), Names: []string{"id", "record_id"}}}, + {"no names", `{"field-id": 1, "names": []}`, table.MappedField{FieldID: makeID(1), Names: []string{}}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var n table.MappedField + require.NoError(t, json.Unmarshal([]byte(tt.str), &n)) + assert.Equal(t, tt.exp, n) + }) + } +} + +func TestNameMappingFromJson(t *testing.T) { + mapping := `[ + {"names": ["foo", "bar"]}, + {"field-id": 1, "names": ["id", "record_id"]}, + {"field-id": 2, "names": ["data"]}, + {"field-id": 3, "names": ["location"], "fields": [ + {"field-id": 4, "names": ["latitude", "lat"]}, + {"field-id": 5, "names": ["longitude", "long"]} + ]} + ]` + + var nm table.NameMapping + require.NoError(t, json.Unmarshal([]byte(mapping), &nm)) + + assert.Equal(t, nm, table.NameMapping{ + {FieldID: nil, Names: []string{"foo", "bar"}}, + {FieldID: makeID(1), Names: []string{"id", "record_id"}}, + {FieldID: makeID(2), Names: []string{"data"}}, + {FieldID: makeID(3), Names: []string{"location"}, Fields: []table.MappedField{ + {FieldID: makeID(4), Names: []string{"latitude", "lat"}}, + {FieldID: makeID(5), Names: []string{"longitude", "long"}}, + }}, + }) +} + +func TestNameMappingToJson(t *testing.T) { + result, err := json.Marshal(tableNameMappingNested) + require.NoError(t, err) + assert.JSONEq(t, `[ + {"field-id": 1, "names": ["foo"]}, + {"field-id": 2, "names": ["bar"]}, + {"field-id": 3, "names": ["baz"]}, + {"field-id": 4, "names": ["qux"], "fields": [{"field-id": 5, "names": ["element"]}]}, + {"field-id": 6, "names": ["quux"], "fields": [ + {"field-id": 7, "names": ["key"]}, + {"field-id": 8, "names": ["value"], "fields": [ + {"field-id": 9, "names": ["key"]}, + {"field-id": 10, "names": ["value"]} + ]} + ]}, + {"field-id": 11, "names": ["location"], "fields": [ + {"field-id": 12, "names": ["element"], "fields": [ + {"field-id": 13, "names": ["latitude"]}, + {"field-id": 14, "names": ["longitude"]} + ]} + ]}, + {"field-id": 15, "names": ["person"], "fields": [ + {"field-id": 16, "names": ["name"]}, + {"field-id": 17, "names": ["age"]} + ]} +]`, string(result)) +} + +func TestNameMappingToString(t *testing.T) { + assert.Equal(t, `[ + ([foo] -> ?) + ([id, record_id] -> 1) + ([data] -> 2) + ([location] -> 3 ([lat, latitude] -> 4), ([long, longitude] -> 5)) +]`, table.NameMapping{ + {Names: []string{"foo"}}, + {FieldID: makeID(1), Names: []string{"id", "record_id"}}, + {FieldID: makeID(2), Names: []string{"data"}}, + {FieldID: makeID(3), Names: []string{"location"}, Fields: []table.MappedField{ + {FieldID: makeID(4), Names: []string{"lat", "latitude"}}, + {FieldID: makeID(5), Names: []string{"long", "longitude"}}, + }}}.String()) +} diff --git a/transforms.go b/transforms.go index 887d46b..477ef18 100644 --- a/transforms.go +++ b/transforms.go @@ -28,7 +28,7 @@ import ( "time" "unsafe" - "github.com/apache/arrow/go/v16/arrow/decimal128" + "github.com/apache/arrow-go/v18/arrow/decimal128" "github.com/google/uuid" "github.com/twmb/murmur3" ) diff --git a/types.go b/types.go index e7a8b4d..6729964 100644 --- a/types.go +++ b/types.go @@ -25,7 +25,7 @@ import ( "strings" "time" - "github.com/apache/arrow/go/v16/arrow/decimal128" + "github.com/apache/arrow-go/v18/arrow/decimal128" "golang.org/x/exp/slices" ) @@ -239,6 +239,8 @@ func (s *StructType) String() string { f.ID, f.Name) if f.Required { b.WriteString("required ") + } else { + b.WriteString("optional ") } b.WriteString(f.Type.String()) if f.Doc != "" { diff --git a/visitors_test.go b/visitors_test.go index 8b44236..cd93a60 100644 --- a/visitors_test.go +++ b/visitors_test.go @@ -22,7 +22,7 @@ import ( "strings" "testing" - "github.com/apache/arrow/go/v16/arrow/decimal128" + "github.com/apache/arrow-go/v18/arrow/decimal128" "github.com/apache/iceberg-go" "github.com/google/uuid" "github.com/stretchr/testify/assert" From f10722a96b1fa0160778b50eb7fb7490c8ecfb87 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Thu, 26 Sep 2024 19:16:46 -0400 Subject: [PATCH 2/8] cleanup --- .github/workflows/go-ci.yml | 2 +- .github/workflows/go-integration.yml | 2 +- go.sum | 4 ++-- schema.go | 2 +- table/arrow_utils.go | 8 +++++++- table/snapshots.go | 2 +- 6 files changed, 13 insertions(+), 7 deletions(-) diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index 3f90c29..578cb8c 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -39,7 +39,7 @@ jobs: strategy: fail-fast: false matrix: - go: [ '1.21', '1.22' ] + go: [ '1.22', '1.23' ] os: [ 'ubuntu-latest', 'windows-latest', 'macos-latest' ] steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/go-integration.yml b/.github/workflows/go-integration.yml index b736ef3..7dd5e75 100644 --- a/.github/workflows/go-integration.yml +++ b/.github/workflows/go-integration.yml @@ -42,7 +42,7 @@ jobs: - name: Install Go uses: actions/setup-go@v4 with: - go-version: 1.22 + go-version: 1.23 cache: true cache-dependency-path: go.sum diff --git a/go.sum b/go.sum index d7cd9dc..55d0cc0 100644 --- a/go.sum +++ b/go.sum @@ -160,8 +160,8 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ= -golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= diff --git a/schema.go b/schema.go index 4748184..7ea7757 100644 --- a/schema.go +++ b/schema.go @@ -20,11 +20,11 @@ package iceberg import ( "encoding/json" "fmt" + "maps" "strings" "sync" "sync/atomic" - "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) diff --git a/table/arrow_utils.go b/table/arrow_utils.go index 7c07720..6104fc6 100644 --- a/table/arrow_utils.go +++ b/table/arrow_utils.go @@ -26,11 +26,17 @@ import ( "github.com/apache/iceberg-go" ) +// constants to look for as Keys in Arrow field metadata const ( - ArrowFieldDocKey = "doc" + ArrowFieldDocKey = "doc" + // Arrow schemas that are generated from the Parquet library will utilize + // this key to identify the field id of the source Parquet field. + // We use this when converting to Iceberg to provide field IDs ArrowParquetFieldIDKey = "PARQUET:field_id" ) +// ArrowSchemaVisitor is an interface that can be implemented and used to +// call VisitArrowSchema for iterating type ArrowSchemaVisitor[T any] interface { Schema(*arrow.Schema, T) T Struct(*arrow.StructType, []T) T diff --git a/table/snapshots.go b/table/snapshots.go index 26dc8d2..c880d7d 100644 --- a/table/snapshots.go +++ b/table/snapshots.go @@ -21,11 +21,11 @@ import ( "encoding/json" "errors" "fmt" + "maps" "strconv" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/io" - "golang.org/x/exp/maps" ) type Operation string From c944ad9de50df9b192be39765ae229178d4c4bcb Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 27 Sep 2024 09:26:21 -0400 Subject: [PATCH 3/8] use appropriate staticcheck --- .github/workflows/go-ci.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index 578cb8c..85bb202 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -40,7 +40,7 @@ jobs: fail-fast: false matrix: go: [ '1.22', '1.23' ] - os: [ 'ubuntu-latest', 'windows-latest', 'macos-latest' ] + os: [ 'ubuntu-latest', 'windows-latest', 'macos-latest' ] steps: - uses: actions/checkout@v4 - name: Install Go @@ -50,7 +50,11 @@ jobs: cache: true cache-dependency-path: go.sum - name: Install staticcheck - run: go install honnef.co/go/tools/cmd/staticcheck@latest + if: matrix.go == '1.22' + run: go install honnef.co/go/tools/cmd/staticcheck@v0.4.7 + - name: Install staticcheck + if: matrix.go == '1.23' + run: go install honnef.co/go/tools/cmd/staticcheck@v0.5.1 - name: Lint run: staticcheck ./... - name: Run tests From 5d813e45e3ba14b2ef784555409f2261fb71b1ca Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 27 Sep 2024 09:31:04 -0400 Subject: [PATCH 4/8] cleanups --- go.mod | 4 +--- schema_test.go | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 8aef72b..bf4fa85 100644 --- a/go.mod +++ b/go.mod @@ -17,9 +17,7 @@ module github.com/apache/iceberg-go -go 1.22.0 - -toolchain go1.23.1 +go 1.22 require ( github.com/apache/arrow-go/v18 v18.0.0-20240924011512-14844aea3205 diff --git a/schema_test.go b/schema_test.go index 9190d8b..4e8e746 100644 --- a/schema_test.go +++ b/schema_test.go @@ -106,8 +106,8 @@ func TestNestedFieldToString(t *testing.T) { {2, "3: baz: optional boolean"}, {3, "4: qux: required list"}, {4, "6: quux: required map>"}, - {5, "11: location: required list>"}, - {6, "15: person: optional struct<16: name: string, 17: age: required int>"}, + {5, "11: location: required list>"}, + {6, "15: person: optional struct<16: name: optional string, 17: age: required int>"}, } for _, tt := range tests { From d2665efa2c47e5a68495dbc607b5ca629eb2f2e2 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 27 Sep 2024 09:33:13 -0400 Subject: [PATCH 5/8] update go.mod --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index bf4fa85..dfe2240 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ module github.com/apache/iceberg-go -go 1.22 +go 1.22.7 require ( github.com/apache/arrow-go/v18 v18.0.0-20240924011512-14844aea3205 From 3f70186339a1f70795a52201f4f7df03541a2dd6 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 27 Sep 2024 09:59:13 -0400 Subject: [PATCH 6/8] fix some tests --- go.mod | 2 +- go.sum | 2 ++ manifest.go | 7 ++++++- table/scanner.go | 2 +- table/scanner_test.go | 7 ++----- 5 files changed, 12 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index dfe2240..83a7fcd 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/aws/smithy-go v1.20.4 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 github.com/google/uuid v1.6.0 - github.com/hamba/avro/v2 v2.25.1 + github.com/hamba/avro/v2 v2.26.0 github.com/pterm/pterm v0.12.79 github.com/stretchr/testify v1.9.0 github.com/twmb/murmur3 v1.1.8 diff --git a/go.sum b/go.sum index 55d0cc0..89d60de 100644 --- a/go.sum +++ b/go.sum @@ -86,6 +86,8 @@ github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0= github.com/gookit/color v1.5.4/go.mod h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w= github.com/hamba/avro/v2 v2.25.1 h1:t8cOyv0wkNAPF6/khArMtR0nK9HtGa+WKbp9q+KdFZQ= github.com/hamba/avro/v2 v2.25.1/go.mod h1:I8glyswHnpED3Nlx2ZdUe+4LJnCOOyiCzLMno9i/Uu0= +github.com/hamba/avro/v2 v2.26.0 h1:IaT5l6W3zh7K67sMrT2+RreJyDTllBGVJm4+Hedk9qE= +github.com/hamba/avro/v2 v2.26.0/go.mod h1:I8glyswHnpED3Nlx2ZdUe+4LJnCOOyiCzLMno9i/Uu0= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= diff --git a/manifest.go b/manifest.go index 9a0d8b7..b8320e3 100644 --- a/manifest.go +++ b/manifest.go @@ -666,7 +666,12 @@ func avroPartitionData(input map[string]any) map[string]any { } } default: - out[k] = v + switch v := v.(type) { + case time.Time: + out[k] = Timestamp(v.UTC().UnixMicro()) + default: + out[k] = v + } } } return out diff --git a/table/scanner.go b/table/scanner.go index 4417fc1..ea33372 100644 --- a/table/scanner.go +++ b/table/scanner.go @@ -342,7 +342,7 @@ Loop: for { select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, context.Cause(ctx) case entries, ok := <-entryChan: if !ok { // closed! diff --git a/table/scanner_test.go b/table/scanner_test.go index a35d2dd..0f7f639 100644 --- a/table/scanner_test.go +++ b/table/scanner_test.go @@ -54,14 +54,11 @@ func TestScanner(t *testing.T) { {"test_partitioned_by_years", iceberg.LessThan(iceberg.Reference("dt"), "2023-03-05"), 1}, {"test_partitioned_by_years", iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05"), 1}, {"test_partitioned_by_months", iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05"), 1}, - {"test_partitioned_by_days", iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"), 8}, + {"test_partitioned_by_days", iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"), 4}, {"test_partitioned_by_hours", iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"), 8}, {"test_partitioned_by_truncate", iceberg.GreaterThanEqual(iceberg.Reference("letter"), "e"), 8}, {"test_partitioned_by_bucket", iceberg.GreaterThanEqual(iceberg.Reference("number"), int32(5)), 6}, - // for some reason when I run the provisioning locally i get 5 data files - // but GHA CI running spark provisioning ends up with only 4 files? - // anyone know why? - {"test_uuid_and_fixed_unpartitioned", iceberg.AlwaysTrue{}, 4}, + {"test_uuid_and_fixed_unpartitioned", iceberg.AlwaysTrue{}, 5}, {"test_uuid_and_fixed_unpartitioned", iceberg.EqualTo(iceberg.Reference("uuid_col"), "102cb62f-e6f8-4eb0-9973-d9b012ff0967"), 1}, } From 188b431c213c72af5e665579d3961bd5d559837f Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 27 Sep 2024 10:03:55 -0400 Subject: [PATCH 7/8] fix test --- table/scanner_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table/scanner_test.go b/table/scanner_test.go index 0f7f639..af4b8f6 100644 --- a/table/scanner_test.go +++ b/table/scanner_test.go @@ -58,7 +58,7 @@ func TestScanner(t *testing.T) { {"test_partitioned_by_hours", iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"), 8}, {"test_partitioned_by_truncate", iceberg.GreaterThanEqual(iceberg.Reference("letter"), "e"), 8}, {"test_partitioned_by_bucket", iceberg.GreaterThanEqual(iceberg.Reference("number"), int32(5)), 6}, - {"test_uuid_and_fixed_unpartitioned", iceberg.AlwaysTrue{}, 5}, + {"test_uuid_and_fixed_unpartitioned", iceberg.AlwaysTrue{}, 4}, {"test_uuid_and_fixed_unpartitioned", iceberg.EqualTo(iceberg.Reference("uuid_col"), "102cb62f-e6f8-4eb0-9973-d9b012ff0967"), 1}, } From e84d4b99de678290978b49eba861173e10588c37 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Mon, 7 Oct 2024 16:36:41 -0600 Subject: [PATCH 8/8] Update .github/workflows/go-ci.yml Co-authored-by: Eduard Tudenhoefner --- .github/workflows/go-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index 85bb202..2b158b5 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -40,7 +40,7 @@ jobs: fail-fast: false matrix: go: [ '1.22', '1.23' ] - os: [ 'ubuntu-latest', 'windows-latest', 'macos-latest' ] + os: [ 'ubuntu-latest', 'windows-latest', 'macos-latest' ] steps: - uses: actions/checkout@v4 - name: Install Go