Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for catalogs with glue implementation to start #51

Merged
merged 13 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ lib/
*.ipr
*.iws
*.iml

.envrc*
51 changes: 51 additions & 0 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package catalog

import (
"context"
"errors"

"github.com/apache/iceberg-go/table"
)

type CatalogType string

const (
REST CatalogType = "rest"
Hive CatalogType = "hive"
Glue CatalogType = "glue"
DynamoDB CatalogType = "dynamodb"
SQL CatalogType = "sql"
)

var (
// ErrNoSuchTable is returned when a table does not exist in the catalog.
ErrNoSuchTable = errors.New("table does not exist")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a go expert, but typically we do something like:

Suggested change
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchTable = errors.New("Table does not exist: {namespace.table_name}")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Yeah I have done this in python, and Java, however in Go it is more common to have these static error types for things like NotFound so they can be used to match a specific error. This paired the being forced to handle the error in the same block means you can "wrap" the error at that point and include the table name.

A consumer of the API would do something like this.

table, err := cat.GetTable(identfier)
if err != nil {
    // do something specific on not found
   if errors.Is(err, catalog.ErrNoSuchTable) {
      // something here
   }
   
   return fmt.Errorf("failed to get table (%s): %w", identifier, err)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfeidau Ideally cat.GetTable would return fmt.Errorf("failed to get table (%s): %w", identifier, ErrNoSuchTable) rather than making the consumer do that wrapping itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade good point, fix added to wrap sentinel not found error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable. Thanks for the explanation 👍

)

// Catalog for iceberg table operations like create, drop, load, list and others.
type Catalog interface {
// ListTables returns a list of table identifiers in the catalog, with the returned
// identifiers containing the information required to load the table via that catalog.
ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my ignorance in go, having namespace havng type as table.Identifier feels really confusing. Should we have something like typedef.Identifier like pyiceberg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackye1995 yeah I think that Identifier will need to moved at some point, either to the root namespace being iceberg, which is a common convention, or somewhere else to avoid cyclic dependencies which will happen as this evolves.

Good call out, and something I did want to chat with @zeroshade about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd be fine with shifting table.Identifier to instead be iceberg.Identifier if that would make things easier to understand and more reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to doing that

// LoadTable loads a table from the catalog and returns a Table with the metadata.
LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error)
// CatalogType returns the type of the catalog.
CatalogType() CatalogType
}
162 changes: 162 additions & 0 deletions catalog/glue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package catalog

import (
"context"
"errors"
"fmt"

"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/glue"
"github.com/aws/aws-sdk-go-v2/service/glue/types"
)

var (
_ Catalog = (*GlueCatalog)(nil)
)

type GlueAPI interface {
GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error)
GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error)
Comment on lines +39 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think LoadTable is clearer, as I generally try to avoid Get in the name. Why do we need GetTables? Wouldn't calling LoadTable multiple times give the same result? This also ties in with the comment on ErrNoSuchTable above; if we don't give the table name in the exception, you don't know which one doesn't exists.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is describing the AWS Glue API, which has GetTable and GetTables API methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko yeah @jackye1995 is 100% correct on this, I am using an interface to enable mocking of just the operations in the AWS I want to consume. This pattern is covered here and is widely used in Go. https://aws.github.io/aws-sdk-go-v2/docs/unit-testing/#mocking-client-operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to note is this interface is private as well thanks to feedback from @zeroshade .

}

type GlueCatalog struct {
glueSvc GlueAPI
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we intending for these to be used externally? I think it would make more sense for all interactions with the catalog to be through the Catalog interface and that we shouldn't export these at all. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point, last thing you want is that interface in the public API, great pick up, and fixed 😅


func NewGlueCatalog(awscfg aws.Config) *GlueCatalog {
return &GlueCatalog{
glueSvc: glue.NewFromConfig(awscfg),
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than NewGlueCatalog like this, maybe we follow the more typical iceberg setup (similar to pyiceberg) and create a LoadCatalog function taking a uri + map[string]string (properties/options) and return the Catalog interface directly.

We can then infer which type of catalog to create based on the uri or from a type property. I have an example of this in my older iceberg implementation of stuff located here: https://github.com/zeroshade/icegopher/blob/main/table/catalog/catalog.go also containing an implementation for the rest catalog there.

Thoughts?

Copy link
Contributor Author

@wolfeidau wolfeidau Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade yeah this abstraction seems a bit more than I would expect in a Go library, I prefer the more common functional options where you inject specific overrides. https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis

Providing maps of properties just result in you needing to re-implement the myriad of overrides provided by say the aws.Config, or similar configs for other providers.

Creating an abstraction where "everything is created" feels a bit unnecessary, especially when each of these things already has a rich creation API, like the aws SDK as can be seen in it's configuration section https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/.

As an integration point, I think it would be better to provide a set of functional options and strip back the abstraction to specific create methods for each catalogue as it is more typical of a Go API. Go tends to encourage a flatter API with fewer abstractions like these.

That said, these abstractions may make sense in the future, but again function options are more common in Go.

I have moved NewGlueCatalog(awscfg aws.Config) *GlueCatalog to use functional options for the AWS configuration as it is cleaner, more extensible, and removes this coupling directly to aws.Config.

@zeroshade Just my two cents, and hopefully explains why I kept this catalogue implementation simple.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an integration point, I think it would be better to provide a set of functional options and strip back the abstraction to specific create methods for each catalogue as it is more typical of a Go API. Go tends to encourage a flatter API with fewer abstractions like these.

That's fair and I'm fine with that. Perhaps in the future we'd add the more generic abstraction to help people convert from existing iceberg libraries but a per-catalog-type New functions will work for now.

Copy link
Contributor Author

@wolfeidau wolfeidau Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade The current model feels like it is following the factory method pattern https://en.wikipedia.org/wiki/Factory_method_pattern, which is great pattern for certain use cases, but in this case there are lots of other things to consider when creating these catalogs, like identity, custom platform specific configuration ect.

I prefer the Go model which although using common interfaces, encourages a more explicit creation for these things as this avoids "magic", which is important for reliable, deterministic operation of systems.

You still have common configuration/metrics/logging components, however these are instantiated based on the New routine in that catalog implementation.

Really it comes down to migrations being more of a considered action in most Go libraries, this ensures the configuration and security of these implementations is respected.

For example accessing an S3 data store could require specific role configuration for authentication, endpoints and event hooks, which may differ from the information used between different tables. A developer can do all this with the AWS SDK config, then pass it in when creating an IO for that particular store, and this is clearly specified in their code.

That said the operation of the catalog using a common interface is great, this i where other common components interact and operate so it is nice to have.

Again as I said before, this is just something I have observed as idiomatic in Go libraries, like the AWS SDK.


// ListTables returns a list of iceberg tables in the given Glue database.
//
// The namespace should just contain the Glue database name.
func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) {
database, err := identifierToGlueDatabase(namespace)
if err != nil {
return nil, err
}

params := &glue.GetTablesInput{DatabaseName: aws.String(database)}

tblsRes, err := c.glueSvc.GetTables(ctx, params)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HonahX good call, this is now fixed and matches the existing implementations.

if err != nil {
return nil, fmt.Errorf("failed to list tables in namespace %s: %w", database, err)
}

var icebergTables []table.Identifier

for _, tbl := range tblsRes.TableList {
// skip non iceberg tables
// TODO: consider what this would look like for non ICEBERG tables as you can convert them to ICEBERG tables via the Glue catalog API.
if tbl.Parameters["table_type"] != "ICEBERG" {
continue
}

icebergTables = append(icebergTables,
GlueTableIdentifier(database, aws.ToString(tbl.Name)),
)
}

return icebergTables, nil
}

// LoadTable loads a table from the catalog table details.
//
// The identifier should contain the Glue database name, then glue table name.
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) {
database, tableName, err := identifierToGlueTable(identifier)
if err != nil {
return nil, err
}

location, err := c.getTable(ctx, database, tableName)
if err != nil {
return nil, err
}

// TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog.
iofs, err := io.LoadFS(map[string]string{}, location)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add optional key-value pairs as a slice of options arguments to this to pass to LoadFS instead of hard coding the empty map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade apologies, this is fixed, should have passed it in in the first place 😅

if err != nil {
return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err)
}

icebergTable, err := table.NewFromLocation([]string{tableName}, location, iofs)
if err != nil {
return nil, fmt.Errorf("failed to create table from location %s.%s: %w", database, tableName, err)
}

return icebergTable, nil
}

func (c *GlueCatalog) CatalogType() CatalogType {
return Glue
}

// GetTable loads a table from the Glue Catalog using the given database and table name.
func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) (string, error) {
tblRes, err := c.glueSvc.GetTable(ctx,
&glue.GetTableInput{
DatabaseName: aws.String(database),
Name: aws.String(tableName),
},
)
if err != nil {
if errors.Is(err, &types.EntityNotFoundException{}) {
return "", ErrNoSuchTable
}
return "", fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err)
}

if tblRes.Table.Parameters["table_type"] != "ICEBERG" {
return "", errors.New("table is not an iceberg table")
}

return tblRes.Table.Parameters["metadata_location"], nil
}

func identifierToGlueTable(identifier table.Identifier) (string, string, error) {
if len(identifier) != 2 {
return "", "", fmt.Errorf("invalid identifier, missing database name: %v", identifier)
}

return identifier[0], identifier[1], nil
}

func identifierToGlueDatabase(identifier table.Identifier) (string, error) {
if len(identifier) != 1 {
return "", fmt.Errorf("invalid identifier, missing database name: %v", identifier)
}

return identifier[0], nil
}

// GlueTableIdentifier returns a glue table identifier for an iceberg table in the format [database, table].
func GlueTableIdentifier(database string, table string) table.Identifier {
return []string{database, table}
}

// GlueDatabaseIdentifier returns a database identifier for a Glue database in the format [database].
func GlueDatabaseIdentifier(database string) table.Identifier {
return []string{database}
}
141 changes: 141 additions & 0 deletions catalog/glue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package catalog

import (
"context"
"os"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/glue"
"github.com/aws/aws-sdk-go-v2/service/glue/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

type mockGlueClient struct {
mock.Mock
}

func (m *mockGlueClient) GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error) {
args := m.Called(ctx, params, optFns)
return args.Get(0).(*glue.GetTableOutput), args.Error(1)
}

func (m *mockGlueClient) GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error) {
args := m.Called(ctx, params, optFns)
return args.Get(0).(*glue.GetTablesOutput), args.Error(1)
}

func TestGlueGetTable(t *testing.T) {
assert := require.New(t)

mockGlueSvc := &mockGlueClient{}

mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{
DatabaseName: aws.String("test_database"),
Name: aws.String("test_table"),
}, mock.Anything).Return(&glue.GetTableOutput{
Table: &types.Table{
Parameters: map[string]string{
"table_type": "ICEBERG",
"metadata_location": "s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
},
},
}, nil)

glueCatalog := &GlueCatalog{
glueSvc: mockGlueSvc,
}

location, err := glueCatalog.getTable(context.TODO(), "test_database", "test_table")
assert.NoError(err)
assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json", location)
}

func TestGlueListTables(t *testing.T) {
assert := require.New(t)

mockGlueSvc := &mockGlueClient{}

mockGlueSvc.On("GetTables", mock.Anything, &glue.GetTablesInput{
DatabaseName: aws.String("test_database"),
}, mock.Anything).Return(&glue.GetTablesOutput{
TableList: []types.Table{
{
Name: aws.String("test_table"),
Parameters: map[string]string{
"table_type": "ICEBERG",
"metadata_location": "s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
},
},
},
}, nil)

glueCatalog := &GlueCatalog{
glueSvc: mockGlueSvc,
}

tables, err := glueCatalog.ListTables(context.TODO(), GlueDatabaseIdentifier("test_database"))
assert.NoError(err)
assert.Equal([]string{"test_database", "test_table"}, tables[0])
}

func TestGlueListTableIntegration(t *testing.T) {
if os.Getenv("TEST_DATABASE_NAME") == "" {
t.Skip()
}
if os.Getenv("TEST_TABLE_NAME") == "" {
t.Skip()
}
assert := require.New(t)

awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse))
assert.NoError(err)

catalog := NewGlueCatalog(awscfg)

tables, err := catalog.ListTables(context.TODO(), GlueDatabaseIdentifier(os.Getenv("TEST_DATABASE_NAME")))
assert.NoError(err)
assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, tables[1])
}

func TestGlueLoadTableIntegration(t *testing.T) {
if os.Getenv("TEST_DATABASE_NAME") == "" {
t.Skip()
}
if os.Getenv("TEST_TABLE_NAME") == "" {
t.Skip()
}
if os.Getenv("TEST_TABLE_LOCATION") == "" {
t.Skip()
}

assert := require.New(t)

awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse))
assert.NoError(err)

catalog := NewGlueCatalog(awscfg)

table, err := catalog.LoadTable(context.TODO(), []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")})
assert.NoError(err)
assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier())
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/aws/aws-sdk-go-v2 v1.24.1
github.com/aws/aws-sdk-go-v2/config v1.26.3
github.com/aws/aws-sdk-go-v2/credentials v1.16.14
github.com/aws/aws-sdk-go-v2/service/glue v1.73.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0
github.com/google/uuid v1.3.1
github.com/hamba/avro/v2 v2.16.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2 h1:GrSw8s0Gs/5zZ0SX+gX4zQjRnRsM
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10 h1:5oE2WzJE56/mVveuDZPJESKlg/00AaS2pY2QZcnxg4M=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10/go.mod h1:FHbKWQtRBYUz4vO5WBWjzMD2by126ny5y/1EoaWoLfI=
github.com/aws/aws-sdk-go-v2/service/glue v1.73.1 h1:z/NBYW8RygzWrDgNWib10fuLUBl0SLj0KruGoEHxnKQ=
github.com/aws/aws-sdk-go-v2/service/glue v1.73.1/go.mod h1:F3B9DC5FsIHAxUtHZdY5KUeqN+tHoGlRPzSSYdXjC38=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4 h1:/b31bi3YVNlkzkBrm9LfpaKoaYZUxIAj4sHfOTmLfqw=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4/go.mod h1:2aGXHFmbInwgP9ZfpmdIfOELL79zhdNYNmReK8qDfdQ=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10 h1:L0ai8WICYHozIKK+OtPzVJBugL7culcuM4E4JOpIEm8=
Expand Down