Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for catalogs with glue implementation to start #51

Merged
merged 13 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ lib/
*.ipr
*.iws
*.iml

.envrc*
65 changes: 65 additions & 0 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package catalog

import (
"context"
"errors"

"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
)

type CatalogType string

const (
REST CatalogType = "rest"
Hive CatalogType = "hive"
Glue CatalogType = "glue"
DynamoDB CatalogType = "dynamodb"
SQL CatalogType = "sql"
)

var (
// ErrNoSuchTable is returned when a table does not exist in the catalog.
ErrNoSuchTable = errors.New("table does not exist")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a go expert, but typically we do something like:

Suggested change
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchTable = errors.New("Table does not exist: {namespace.table_name}")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Yeah I have done this in python, and Java, however in Go it is more common to have these static error types for things like NotFound so they can be used to match a specific error. This paired the being forced to handle the error in the same block means you can "wrap" the error at that point and include the table name.

A consumer of the API would do something like this.

table, err := cat.GetTable(identfier)
if err != nil {
    // do something specific on not found
   if errors.Is(err, catalog.ErrNoSuchTable) {
      // something here
   }
   
   return fmt.Errorf("failed to get table (%s): %w", identifier, err)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfeidau Ideally cat.GetTable would return fmt.Errorf("failed to get table (%s): %w", identifier, ErrNoSuchTable) rather than making the consumer do that wrapping itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade good point, fix added to wrap sentinel not found error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable. Thanks for the explanation 👍

)

// WithAwsConfig sets the AWS configuration for the catalog.
func WithAwsConfig(cfg aws.Config) Option {
return func(o *Options) {
o.awsConfig = cfg
}
}

type Option func(*Options)

type Options struct {
awsConfig aws.Config
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to export these? We don't expect consumers to create their own Option functions right? If so we'd need to export the members of the Options struct which we don't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade Yeah I am a bit on the fence with this, feels half in and half out when the Options is private.

I am questioning whether this should be a Config structure with public members, but I wanted to get a couple more catalog implementations in and adjust it then.

Again keen to get a bit more code in the catalog and adjust it based on needs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're gonna go that route, I'd vote making the whole thing private for now, and then if we adjust it we can make it public or a public config struct later.

Personally my default is always to make things private until it's explicitly necessary for it to be public. Keeps the surface API as small as possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade cool, I have made options private so we can think about it once there is more implementations.


// Catalog for iceberg table operations like create, drop, load, list and others.
type Catalog interface {
// ListTables returns a list of table identifiers in the catalog, with the returned
// identifiers containing the information required to load the table via that catalog.
ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my ignorance in go, having namespace havng type as table.Identifier feels really confusing. Should we have something like typedef.Identifier like pyiceberg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackye1995 yeah I think that Identifier will need to moved at some point, either to the root namespace being iceberg, which is a common convention, or somewhere else to avoid cyclic dependencies which will happen as this evolves.

Good call out, and something I did want to chat with @zeroshade about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd be fine with shifting table.Identifier to instead be iceberg.Identifier if that would make things easier to understand and more reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to doing that

// LoadTable loads a table from the catalog and returns a Table with the metadata.
LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error)
// CatalogType returns the type of the catalog.
CatalogType() CatalogType
}
186 changes: 186 additions & 0 deletions catalog/glue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package catalog

import (
"context"
"errors"
"fmt"

"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/glue"
"github.com/aws/aws-sdk-go-v2/service/glue/types"
)

const glueTableTypeIceberg = "ICEBERG"

var (
_ Catalog = (*GlueCatalog)(nil)
)

type glueAPI interface {
GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error)
GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error)
Comment on lines +39 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think LoadTable is clearer, as I generally try to avoid Get in the name. Why do we need GetTables? Wouldn't calling LoadTable multiple times give the same result? This also ties in with the comment on ErrNoSuchTable above; if we don't give the table name in the exception, you don't know which one doesn't exists.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is describing the AWS Glue API, which has GetTable and GetTables API methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko yeah @jackye1995 is 100% correct on this, I am using an interface to enable mocking of just the operations in the AWS I want to consume. This pattern is covered here and is widely used in Go. https://aws.github.io/aws-sdk-go-v2/docs/unit-testing/#mocking-client-operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to note is this interface is private as well thanks to feedback from @zeroshade .

}

type GlueCatalog struct {
glueSvc glueAPI
}

func NewGlueCatalog(opts ...Option) *GlueCatalog {
options := &Options{}

for _, o := range opts {
o(options)
}

return &GlueCatalog{
glueSvc: glue.NewFromConfig(options.awsConfig),
}
}

// ListTables returns a list of iceberg tables in the given Glue database.
//
// The namespace should just contain the Glue database name.
func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) {
database, err := identifierToGlueDatabase(namespace)
if err != nil {
return nil, err
}

params := &glue.GetTablesInput{DatabaseName: aws.String(database)}

var icebergTables []table.Identifier

for {
tblsRes, err := c.glueSvc.GetTables(ctx, params)
if err != nil {
return nil, fmt.Errorf("failed to list tables in namespace %s: %w", database, err)
}

icebergTables = append(icebergTables,
filterTableListByType(database, tblsRes.TableList, glueTableTypeIceberg)...)

if tblsRes.NextToken == nil {
break
}

params.NextToken = tblsRes.NextToken
}

return icebergTables, nil
}

// LoadTable loads a table from the catalog table details.
//
// The identifier should contain the Glue database name, then glue table name.
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error) {
database, tableName, err := identifierToGlueTable(identifier)
if err != nil {
return nil, err
}

if props == nil {
props = map[string]string{}
}

location, err := c.getTable(ctx, database, tableName)
if err != nil {
return nil, err
}

// TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog.
iofs, err := io.LoadFS(props, location)
if err != nil {
return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err)
}

icebergTable, err := table.NewFromLocation([]string{tableName}, location, iofs)
if err != nil {
return nil, fmt.Errorf("failed to create table from location %s.%s: %w", database, tableName, err)
}

return icebergTable, nil
}

func (c *GlueCatalog) CatalogType() CatalogType {
return Glue
}

// GetTable loads a table from the Glue Catalog using the given database and table name.
func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) (string, error) {
tblRes, err := c.glueSvc.GetTable(ctx,
&glue.GetTableInput{
DatabaseName: aws.String(database),
Name: aws.String(tableName),
},
)
if err != nil {
if errors.Is(err, &types.EntityNotFoundException{}) {
return "", ErrNoSuchTable
}
return "", fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err)
}

if tblRes.Table.Parameters["table_type"] != "ICEBERG" {
return "", errors.New("table is not an iceberg table")
}

return tblRes.Table.Parameters["metadata_location"], nil
}

func identifierToGlueTable(identifier table.Identifier) (string, string, error) {
if len(identifier) != 2 {
return "", "", fmt.Errorf("invalid identifier, missing database name: %v", identifier)
}

return identifier[0], identifier[1], nil
}

func identifierToGlueDatabase(identifier table.Identifier) (string, error) {
if len(identifier) != 1 {
return "", fmt.Errorf("invalid identifier, missing database name: %v", identifier)
}

return identifier[0], nil
}

// GlueTableIdentifier returns a glue table identifier for an iceberg table in the format [database, table].
func GlueTableIdentifier(database string, tableName string) table.Identifier {
return []string{database, tableName}
}

// GlueDatabaseIdentifier returns a database identifier for a Glue database in the format [database].
func GlueDatabaseIdentifier(database string) table.Identifier {
return []string{database}
}
Comment on lines +166 to +173
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these be made generic rather than Glue specific? Any catalog specific logic should happen by manipulating the Identifier internally to that catalog, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade I wanted to provide some helpers for handling this identifier tuple for Glue to make it SUPER clear and use the terminology of Glue, rather than providing some vague external table mapping indexes in a tuple to a given catalogues expectation.

Users of Glue should be able to clearly see their terminology in the API as it will get them running faster, and make troubleshooting easier.

The use of a tuple for identifier feels a bit odd in a Go API, ideally I think these semantics would be in the identifier using a struct with a dash of generics. Catalogs could then extend this when needed but yeah

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that makes sense. We can always adjust later if we find we're duplicating things.


func filterTableListByType(database string, tableList []types.Table, tableType string) []table.Identifier {
var filtered []table.Identifier

for _, tbl := range tableList {
if tbl.Parameters["table_type"] != tableType {
continue
}
filtered = append(filtered, GlueTableIdentifier(database, aws.ToString(tbl.Name)))
}

return filtered
}
Loading