-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add support for catalogs with glue implementation to start #51
Changes from all commits
f7f933d
56c761b
1bed31e
e992a05
4f5ce6d
b9d4c1d
bc30773
6327be2
1aa77dd
1efa2d0
da70023
595e0af
b1c3974
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,3 +44,5 @@ lib/ | |
*.ipr | ||
*.iws | ||
*.iml | ||
|
||
.envrc* |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,8 @@ | |
|
||
| Operation | REST | Hive | DynamoDB | Glue | | ||
| :----------------------- | :--: | :--: | :------: | :--: | | ||
| Load Table | | | | X | | ||
| List Tables | | | | X | | ||
| Create Table | | | | | | ||
| Update Current Snapshot | | | | | | ||
| Create New Snapshot | | | | | | ||
|
@@ -63,8 +65,8 @@ | |
### Read/Write Data Support | ||
|
||
* No intrinsic support for reading/writing data yet | ||
* Data can be manually read currently by retrieving data files via Manifests. | ||
* Plan to add [Apache Arrow](https://pkg.go.dev/github.com/apache/arrow/go/[email protected]) support eventually. | ||
* Data can be manually read currently by retrieving data files via Manifests. | ||
* Plan to add [Apache Arrow](https://pkg.go.dev/github.com/apache/arrow/go/[email protected]) support eventually. | ||
|
||
# Get in Touch | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package catalog | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
|
||
"github.com/apache/iceberg-go/table" | ||
"github.com/aws/aws-sdk-go-v2/aws" | ||
) | ||
|
||
type CatalogType string | ||
|
||
const ( | ||
REST CatalogType = "rest" | ||
Hive CatalogType = "hive" | ||
Glue CatalogType = "glue" | ||
DynamoDB CatalogType = "dynamodb" | ||
SQL CatalogType = "sql" | ||
) | ||
|
||
var ( | ||
// ErrNoSuchTable is returned when a table does not exist in the catalog. | ||
ErrNoSuchTable = errors.New("table does not exist") | ||
) | ||
|
||
// WithAwsConfig sets the AWS configuration for the catalog. | ||
func WithAwsConfig(cfg aws.Config) Option { | ||
return func(o *options) { | ||
o.awsConfig = cfg | ||
} | ||
} | ||
|
||
type Option func(*options) | ||
|
||
type options struct { | ||
awsConfig aws.Config | ||
} | ||
|
||
// Catalog for iceberg table operations like create, drop, load, list and others. | ||
type Catalog interface { | ||
// ListTables returns a list of table identifiers in the catalog, with the returned | ||
// identifiers containing the information required to load the table via that catalog. | ||
ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just for my ignorance in go, having namespace havng type as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jackye1995 yeah I think that Good call out, and something I did want to chat with @zeroshade about. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'd be fine with shifting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to doing that |
||
// LoadTable loads a table from the catalog and returns a Table with the metadata. | ||
LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error) | ||
// CatalogType returns the type of the catalog. | ||
CatalogType() CatalogType | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package catalog | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
"github.com/apache/iceberg-go/io" | ||
"github.com/apache/iceberg-go/table" | ||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/service/glue" | ||
"github.com/aws/aws-sdk-go-v2/service/glue/types" | ||
) | ||
|
||
const glueTableTypeIceberg = "ICEBERG" | ||
|
||
var ( | ||
_ Catalog = (*GlueCatalog)(nil) | ||
) | ||
|
||
type glueAPI interface { | ||
GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error) | ||
GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error) | ||
Comment on lines
+39
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is describing the AWS Glue API, which has There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Fokko yeah @jackye1995 is 100% correct on this, I am using an interface to enable mocking of just the operations in the AWS I want to consume. This pattern is covered here and is widely used in Go. https://aws.github.io/aws-sdk-go-v2/docs/unit-testing/#mocking-client-operations There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One thing to note is this interface is private as well thanks to feedback from @zeroshade . |
||
} | ||
|
||
type GlueCatalog struct { | ||
glueSvc glueAPI | ||
} | ||
|
||
func NewGlueCatalog(opts ...Option) *GlueCatalog { | ||
glueOps := &options{} | ||
|
||
for _, o := range opts { | ||
o(glueOps) | ||
} | ||
|
||
return &GlueCatalog{ | ||
glueSvc: glue.NewFromConfig(glueOps.awsConfig), | ||
} | ||
} | ||
|
||
// ListTables returns a list of iceberg tables in the given Glue database. | ||
// | ||
// The namespace should just contain the Glue database name. | ||
func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) { | ||
database, err := identifierToGlueDatabase(namespace) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
params := &glue.GetTablesInput{DatabaseName: aws.String(database)} | ||
|
||
var icebergTables []table.Identifier | ||
|
||
for { | ||
tblsRes, err := c.glueSvc.GetTables(ctx, params) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to list tables in namespace %s: %w", database, err) | ||
} | ||
|
||
icebergTables = append(icebergTables, | ||
filterTableListByType(database, tblsRes.TableList, glueTableTypeIceberg)...) | ||
|
||
if tblsRes.NextToken == nil { | ||
break | ||
} | ||
|
||
params.NextToken = tblsRes.NextToken | ||
} | ||
|
||
return icebergTables, nil | ||
} | ||
|
||
// LoadTable loads a table from the catalog table details. | ||
// | ||
// The identifier should contain the Glue database name, then glue table name. | ||
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error) { | ||
database, tableName, err := identifierToGlueTable(identifier) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if props == nil { | ||
props = map[string]string{} | ||
} | ||
|
||
location, err := c.getTable(ctx, database, tableName) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog. | ||
iofs, err := io.LoadFS(props, location) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err) | ||
} | ||
|
||
icebergTable, err := table.NewFromLocation([]string{tableName}, location, iofs) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create table from location %s.%s: %w", database, tableName, err) | ||
} | ||
|
||
return icebergTable, nil | ||
} | ||
|
||
func (c *GlueCatalog) CatalogType() CatalogType { | ||
return Glue | ||
} | ||
|
||
// GetTable loads a table from the Glue Catalog using the given database and table name. | ||
func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) (string, error) { | ||
tblRes, err := c.glueSvc.GetTable(ctx, | ||
&glue.GetTableInput{ | ||
DatabaseName: aws.String(database), | ||
Name: aws.String(tableName), | ||
}, | ||
) | ||
if err != nil { | ||
if errors.Is(err, &types.EntityNotFoundException{}) { | ||
return "", fmt.Errorf("failed to get table %s.%s: %w", database, tableName, ErrNoSuchTable) | ||
} | ||
return "", fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) | ||
} | ||
|
||
if tblRes.Table.Parameters["table_type"] != "ICEBERG" { | ||
return "", errors.New("table is not an iceberg table") | ||
} | ||
|
||
return tblRes.Table.Parameters["metadata_location"], nil | ||
} | ||
|
||
func identifierToGlueTable(identifier table.Identifier) (string, string, error) { | ||
if len(identifier) != 2 { | ||
return "", "", fmt.Errorf("invalid identifier, missing database name: %v", identifier) | ||
} | ||
|
||
return identifier[0], identifier[1], nil | ||
} | ||
|
||
func identifierToGlueDatabase(identifier table.Identifier) (string, error) { | ||
if len(identifier) != 1 { | ||
return "", fmt.Errorf("invalid identifier, missing database name: %v", identifier) | ||
} | ||
|
||
return identifier[0], nil | ||
} | ||
|
||
// GlueTableIdentifier returns a glue table identifier for an iceberg table in the format [database, table]. | ||
func GlueTableIdentifier(database string, tableName string) table.Identifier { | ||
return []string{database, tableName} | ||
} | ||
|
||
// GlueDatabaseIdentifier returns a database identifier for a Glue database in the format [database]. | ||
func GlueDatabaseIdentifier(database string) table.Identifier { | ||
return []string{database} | ||
} | ||
Comment on lines
+166
to
+173
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could these be made generic rather than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zeroshade I wanted to provide some helpers for handling this identifier tuple for Glue to make it SUPER clear and use the terminology of Glue, rather than providing some vague external table mapping indexes in a tuple to a given catalogues expectation. Users of Glue should be able to clearly see their terminology in the API as it will get them running faster, and make troubleshooting easier. The use of a tuple for identifier feels a bit odd in a Go API, ideally I think these semantics would be in the identifier using a struct with a dash of generics. Catalogs could then extend this when needed but yeah There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, that makes sense. We can always adjust later if we find we're duplicating things. |
||
|
||
func filterTableListByType(database string, tableList []types.Table, tableType string) []table.Identifier { | ||
var filtered []table.Identifier | ||
|
||
for _, tbl := range tableList { | ||
if tbl.Parameters["table_type"] != tableType { | ||
continue | ||
} | ||
filtered = append(filtered, GlueTableIdentifier(database, aws.ToString(tbl.Name))) | ||
} | ||
|
||
return filtered | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a go expert, but typically we do something like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko Yeah I have done this in python, and Java, however in Go it is more common to have these static error types for things like
NotFound
so they can be used to match a specific error. This paired the being forced to handle the error in the same block means you can "wrap" the error at that point and include the table name.A consumer of the API would do something like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfeidau Ideally
cat.GetTable
would returnfmt.Errorf("failed to get table (%s): %w", identifier, ErrNoSuchTable)
rather than making the consumer do that wrapping itself.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zeroshade good point, fix added to wrap sentinel not found error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable. Thanks for the explanation 👍