From f7f933d4fe42b14be7e48be82e40dafc40a02f88 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Wed, 17 Jan 2024 12:19:34 +1100 Subject: [PATCH 01/13] feat: add support for catalogs with glue implementation to start --- catalog/catalog.go | 30 ++++++++++ catalog/glue.go | 131 +++++++++++++++++++++++++++++++++++++++++++ catalog/glue_test.go | 47 ++++++++++++++++ go.mod | 1 + go.sum | 2 + 5 files changed, 211 insertions(+) create mode 100644 catalog/catalog.go create mode 100644 catalog/glue.go create mode 100644 catalog/glue_test.go diff --git a/catalog/catalog.go b/catalog/catalog.go new file mode 100644 index 0000000..1ff62c6 --- /dev/null +++ b/catalog/catalog.go @@ -0,0 +1,30 @@ +package catalog + +import ( + "context" + "errors" + + "github.com/apache/iceberg-go/table" +) + +type CatalogType string + +const ( + REST CatalogType = "rest" + Hive CatalogType = "hive" + Glue CatalogType = "glue" + DynamoDB CatalogType = "dynamodb" + SQL CatalogType = "sql" +) + +var ( + // ErrNoSuchTable is returned when a table does not exist in the catalog. + ErrNoSuchTable = errors.New("table does not exist") +) + +// Catalog for iceberg table operations like create, drop, load, list and others. +type Catalog interface { + GetTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) + ListTables(ctx context.Context, identifier table.Identifier) ([]*table.Table, error) + CatalogType() CatalogType +} diff --git a/catalog/glue.go b/catalog/glue.go new file mode 100644 index 0000000..a2cb36a --- /dev/null +++ b/catalog/glue.go @@ -0,0 +1,131 @@ +package catalog + +import ( + "context" + "errors" + "fmt" + + "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/table" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/glue" + "github.com/aws/aws-sdk-go-v2/service/glue/types" +) + +var ( + _ Catalog = (*GlueCatalog)(nil) +) + +type GlueAPI interface { + GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error) + GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error) +} + +type GlueCatalog struct { + glueSvc GlueAPI +} + +func NewGlueCatalog(awscfg aws.Config) *GlueCatalog { + return &GlueCatalog{ + glueSvc: glue.NewFromConfig(awscfg), + } +} + +// GetTable loads a table from the Glue Catalog using the given database and table name. +func (c *GlueCatalog) GetTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) { + database, tableName, err := identifierToGlueTable(identifier) + if err != nil { + return nil, err + } + + params := &glue.GetTableInput{DatabaseName: aws.String(database), Name: aws.String(tableName)} + + tblRes, err := c.glueSvc.GetTable(ctx, params) + if err != nil { + if errors.Is(err, &types.EntityNotFoundException{}) { + return nil, ErrNoSuchTable + } + return nil, fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) + } + + iofs, err := io.LoadFS(map[string]string{}, tblRes.Table.Parameters["metadata_location"]) + if err != nil { + return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err) + } + + icebergTable, err := table.NewFromLocation([]string{tableName}, tblRes.Table.Parameters["metadata_location"], iofs) + if err != nil { + return nil, fmt.Errorf("failed to create table from location %s.%s: %w", database, tableName, err) + } + + return icebergTable, nil +} + +// ListTables returns a list of iceberg tables in the given Glue database. +func (c *GlueCatalog) ListTables(ctx context.Context, identifier table.Identifier) ([]*table.Table, error) { + database, err := identifierToGlueDatabase(identifier) + if err != nil { + return nil, err + } + + params := &glue.GetTablesInput{DatabaseName: aws.String(database)} + + tblsRes, err := c.glueSvc.GetTables(ctx, params) + if err != nil { + return nil, fmt.Errorf("failed to list tables in namespace %s: %w", database, err) + } + + var icebergTables []*table.Table + + for _, tbl := range tblsRes.TableList { + // skip non iceberg tables + // TODO: consider what this would look like for non ICEBERG tables as you can convert them to ICEBERG tables via the Glue catalog API. + if tbl.Parameters["table_type"] != "ICEBERG" { + continue + } + + iofs, err := io.LoadFS(map[string]string{}, tbl.Parameters["metadata_location"]) + if err != nil { + return nil, fmt.Errorf("failed to load table %s.%s: %w", database, aws.ToString(tbl.Name), err) + } + + icebergTable, err := table.NewFromLocation([]string{*tbl.Name}, tbl.Parameters["metadata_location"], iofs) + if err != nil { + return nil, fmt.Errorf("failed to create table from location %s.%s: %w", database, aws.ToString(tbl.Name), err) + } + + icebergTables = append(icebergTables, icebergTable) + } + + return icebergTables, nil +} + +func (c *GlueCatalog) CatalogType() CatalogType { + return Glue +} + +func identifierToGlueTable(identifier table.Identifier) (string, string, error) { + if len(identifier) != 2 { + return "", "", fmt.Errorf("invalid identifier, missing database name: %v", identifier) + } + + return identifier[0], identifier[1], nil +} + +func identifierToGlueDatabase(identifier table.Identifier) (string, error) { + if len(identifier) != 1 { + return "", fmt.Errorf("invalid identifier, missing database name: %v", identifier) + } + + return identifier[0], nil +} + +// GlueTableIdentifier returns a glue table identifier for an iceberg table in the format [database, table]. +func GlueTableIdentifier(database string, table string) table.Identifier { + return []string{database, table} +} + +// GlueDatabaseIdentifier returns a database identifier for a Glue database in the format [database]. +func GlueDatabaseIdentifier(database string) table.Identifier { + return []string{database} +} diff --git a/catalog/glue_test.go b/catalog/glue_test.go new file mode 100644 index 0000000..34975f9 --- /dev/null +++ b/catalog/glue_test.go @@ -0,0 +1,47 @@ +package catalog + +import ( + "context" + "os" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/stretchr/testify/require" +) + +func TestGlueGetTableIntegration(t *testing.T) { + if os.Getenv("TEST_DATABASE_NAME") == "" { + t.Skip() + } + if os.Getenv("TEST_TABLE_NAME") == "" { + t.Skip() + } + assert := require.New(t) + + awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse)) + assert.NoError(err) + + catalog := NewGlueCatalog(awscfg) + + table, err := catalog.GetTable(context.TODO(), GlueTableIdentifier(os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME"))) + assert.NoError(err) + assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier()) +} + +func TestGlueListTableIntegration(t *testing.T) { + if os.Getenv("TEST_DATABASE_NAME") == "" { + t.Skip() + } + + assert := require.New(t) + + awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse)) + assert.NoError(err) + + catalog := NewGlueCatalog(awscfg) + + tables, err := catalog.ListTables(context.TODO(), GlueDatabaseIdentifier(os.Getenv("TEST_DATABASE_NAME"))) + assert.NoError(err) + assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, tables[1].Identifier()) +} diff --git a/go.mod b/go.mod index b10f96c..2dca7eb 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/aws/aws-sdk-go-v2 v1.24.1 github.com/aws/aws-sdk-go-v2/config v1.26.3 github.com/aws/aws-sdk-go-v2/credentials v1.16.14 + github.com/aws/aws-sdk-go-v2/service/glue v1.73.1 github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0 github.com/google/uuid v1.3.1 github.com/hamba/avro/v2 v2.16.0 diff --git a/go.sum b/go.sum index 462fdde..4e0f385 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2 h1:GrSw8s0Gs/5zZ0SX+gX4zQjRnRsM github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY= github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10 h1:5oE2WzJE56/mVveuDZPJESKlg/00AaS2pY2QZcnxg4M= github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10/go.mod h1:FHbKWQtRBYUz4vO5WBWjzMD2by126ny5y/1EoaWoLfI= +github.com/aws/aws-sdk-go-v2/service/glue v1.73.1 h1:z/NBYW8RygzWrDgNWib10fuLUBl0SLj0KruGoEHxnKQ= +github.com/aws/aws-sdk-go-v2/service/glue v1.73.1/go.mod h1:F3B9DC5FsIHAxUtHZdY5KUeqN+tHoGlRPzSSYdXjC38= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4 h1:/b31bi3YVNlkzkBrm9LfpaKoaYZUxIAj4sHfOTmLfqw= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4/go.mod h1:2aGXHFmbInwgP9ZfpmdIfOELL79zhdNYNmReK8qDfdQ= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10 h1:L0ai8WICYHozIKK+OtPzVJBugL7culcuM4E4JOpIEm8= From 56c761b4bbc44d35333574f638bc977d4242cf13 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Wed, 17 Jan 2024 12:46:14 +1100 Subject: [PATCH 02/13] refactored to seperate table info from loading --- catalog/catalog.go | 11 ++++-- catalog/glue.go | 56 ++++++++++++++---------------- catalog/glue_test.go | 82 ++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 114 insertions(+), 35 deletions(-) diff --git a/catalog/catalog.go b/catalog/catalog.go index 1ff62c6..49c7d9b 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -24,7 +24,14 @@ var ( // Catalog for iceberg table operations like create, drop, load, list and others. type Catalog interface { - GetTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) - ListTables(ctx context.Context, identifier table.Identifier) ([]*table.Table, error) + GetTable(ctx context.Context, identifier table.Identifier) (CatalogTable, error) + ListTables(ctx context.Context, identifier table.Identifier) ([]CatalogTable, error) CatalogType() CatalogType } + +// CatalogTable is the details of a table in a catalog. +type CatalogTable struct { + Identifier table.Identifier // this identifier may vary depending on the catalog implementation + Location string // URL to the table location + CatalogType CatalogType +} diff --git a/catalog/glue.go b/catalog/glue.go index a2cb36a..b0e298f 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" - "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/glue" @@ -32,37 +31,38 @@ func NewGlueCatalog(awscfg aws.Config) *GlueCatalog { } // GetTable loads a table from the Glue Catalog using the given database and table name. -func (c *GlueCatalog) GetTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) { +func (c *GlueCatalog) GetTable(ctx context.Context, identifier table.Identifier) (CatalogTable, error) { database, tableName, err := identifierToGlueTable(identifier) if err != nil { - return nil, err + return CatalogTable{}, err } - params := &glue.GetTableInput{DatabaseName: aws.String(database), Name: aws.String(tableName)} - - tblRes, err := c.glueSvc.GetTable(ctx, params) + tblRes, err := c.glueSvc.GetTable(ctx, + &glue.GetTableInput{ + DatabaseName: aws.String(database), + Name: aws.String(tableName), + }, + ) if err != nil { if errors.Is(err, &types.EntityNotFoundException{}) { - return nil, ErrNoSuchTable + return CatalogTable{}, ErrNoSuchTable } - return nil, fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) - } - - iofs, err := io.LoadFS(map[string]string{}, tblRes.Table.Parameters["metadata_location"]) - if err != nil { - return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err) + return CatalogTable{}, fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) } - icebergTable, err := table.NewFromLocation([]string{tableName}, tblRes.Table.Parameters["metadata_location"], iofs) - if err != nil { - return nil, fmt.Errorf("failed to create table from location %s.%s: %w", database, tableName, err) + if tblRes.Table.Parameters["table_type"] != "ICEBERG" { + return CatalogTable{}, errors.New("table is not an iceberg table") } - return icebergTable, nil + return CatalogTable{ + Identifier: identifier, + Location: tblRes.Table.Parameters["metadata_location"], + CatalogType: Glue, + }, nil } // ListTables returns a list of iceberg tables in the given Glue database. -func (c *GlueCatalog) ListTables(ctx context.Context, identifier table.Identifier) ([]*table.Table, error) { +func (c *GlueCatalog) ListTables(ctx context.Context, identifier table.Identifier) ([]CatalogTable, error) { database, err := identifierToGlueDatabase(identifier) if err != nil { return nil, err @@ -75,7 +75,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context, identifier table.Identifie return nil, fmt.Errorf("failed to list tables in namespace %s: %w", database, err) } - var icebergTables []*table.Table + var icebergTables []CatalogTable for _, tbl := range tblsRes.TableList { // skip non iceberg tables @@ -84,17 +84,13 @@ func (c *GlueCatalog) ListTables(ctx context.Context, identifier table.Identifie continue } - iofs, err := io.LoadFS(map[string]string{}, tbl.Parameters["metadata_location"]) - if err != nil { - return nil, fmt.Errorf("failed to load table %s.%s: %w", database, aws.ToString(tbl.Name), err) - } - - icebergTable, err := table.NewFromLocation([]string{*tbl.Name}, tbl.Parameters["metadata_location"], iofs) - if err != nil { - return nil, fmt.Errorf("failed to create table from location %s.%s: %w", database, aws.ToString(tbl.Name), err) - } - - icebergTables = append(icebergTables, icebergTable) + icebergTables = append(icebergTables, + CatalogTable{ + Identifier: GlueTableIdentifier(database, aws.ToString(tbl.Name)), + Location: tbl.Parameters["metadata_location"], + CatalogType: Glue, + }, + ) } return icebergTables, nil diff --git a/catalog/glue_test.go b/catalog/glue_test.go index 34975f9..e0754b3 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -7,9 +7,84 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/glue" + "github.com/aws/aws-sdk-go-v2/service/glue/types" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) +type mockGlueClient struct { + mock.Mock +} + +func (m *mockGlueClient) GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error) { + args := m.Called(ctx, params, optFns) + return args.Get(0).(*glue.GetTableOutput), args.Error(1) +} + +func (m *mockGlueClient) GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error) { + args := m.Called(ctx, params, optFns) + return args.Get(0).(*glue.GetTablesOutput), args.Error(1) +} + +func TestGlueGetTable(t *testing.T) { + assert := require.New(t) + + mockGlueSvc := &mockGlueClient{} + + mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{ + DatabaseName: aws.String("test_database"), + Name: aws.String("test_table"), + }, mock.Anything).Return(&glue.GetTableOutput{ + Table: &types.Table{ + Parameters: map[string]string{ + "table_type": "ICEBERG", + "metadata_location": "s3://test-bucket/test_table/metadata/abc123-123.metadata.json", + }, + }, + }, nil) + + glueCatalog := &GlueCatalog{ + glueSvc: mockGlueSvc, + } + + table, err := glueCatalog.GetTable(context.TODO(), GlueTableIdentifier("test_database", "test_table")) + assert.NoError(err) + assert.Equal([]string{"test_database", "test_table"}, table.Identifier) + assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json", table.Location) + assert.Equal(table.CatalogType, Glue) +} + +func TestGlueListTables(t *testing.T) { + assert := require.New(t) + + mockGlueSvc := &mockGlueClient{} + + mockGlueSvc.On("GetTables", mock.Anything, &glue.GetTablesInput{ + DatabaseName: aws.String("test_database"), + }, mock.Anything).Return(&glue.GetTablesOutput{ + TableList: []types.Table{ + { + Name: aws.String("test_table"), + Parameters: map[string]string{ + "table_type": "ICEBERG", + "metadata_location": "s3://test-bucket/test_table/metadata/abc123-123.metadata.json", + }, + }, + }, + }, nil) + + glueCatalog := &GlueCatalog{ + glueSvc: mockGlueSvc, + } + + tables, err := glueCatalog.ListTables(context.TODO(), GlueDatabaseIdentifier("test_database")) + assert.NoError(err) + assert.Equal([]string{"test_database", "test_table"}, tables[0].Identifier) + assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json", tables[0].Location) + assert.Equal(tables[0].CatalogType, Glue) +} + func TestGlueGetTableIntegration(t *testing.T) { if os.Getenv("TEST_DATABASE_NAME") == "" { t.Skip() @@ -19,14 +94,15 @@ func TestGlueGetTableIntegration(t *testing.T) { } assert := require.New(t) - awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse)) + awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponseWithBody)) assert.NoError(err) catalog := NewGlueCatalog(awscfg) table, err := catalog.GetTable(context.TODO(), GlueTableIdentifier(os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME"))) assert.NoError(err) - assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier()) + assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, table.Identifier) + assert.Equal(table.CatalogType, Glue) } func TestGlueListTableIntegration(t *testing.T) { @@ -43,5 +119,5 @@ func TestGlueListTableIntegration(t *testing.T) { tables, err := catalog.ListTables(context.TODO(), GlueDatabaseIdentifier(os.Getenv("TEST_DATABASE_NAME"))) assert.NoError(err) - assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, tables[1].Identifier()) + assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, tables[1].Identifier) } From 1bed31e09eb665661c3720e9b01d9337e1708366 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Wed, 17 Jan 2024 12:55:07 +1100 Subject: [PATCH 03/13] add an implementation of load table using the catalog --- catalog/catalog.go | 1 + catalog/glue.go | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/catalog/catalog.go b/catalog/catalog.go index 49c7d9b..65d0e2a 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -26,6 +26,7 @@ var ( type Catalog interface { GetTable(ctx context.Context, identifier table.Identifier) (CatalogTable, error) ListTables(ctx context.Context, identifier table.Identifier) ([]CatalogTable, error) + LoadTable(ctx context.Context, table CatalogTable) (*table.Table, error) CatalogType() CatalogType } diff --git a/catalog/glue.go b/catalog/glue.go index b0e298f..d4b54b3 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/glue" @@ -96,6 +97,27 @@ func (c *GlueCatalog) ListTables(ctx context.Context, identifier table.Identifie return icebergTables, nil } +// LoadTable loads a table from the catalog table details. +func (c *GlueCatalog) LoadTable(ctx context.Context, catalogTable CatalogTable) (*table.Table, error) { + database, tableName, err := identifierToGlueTable(catalogTable.Identifier) + if err != nil { + return nil, err + } + + // TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog. + iofs, err := io.LoadFS(map[string]string{}, catalogTable.Location) + if err != nil { + return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err) + } + + icebergTable, err := table.NewFromLocation([]string{tableName}, catalogTable.Location, iofs) + if err != nil { + return nil, fmt.Errorf("failed to create table from location %s.%s: %w", database, tableName, err) + } + + return icebergTable, nil +} + func (c *GlueCatalog) CatalogType() CatalogType { return Glue } From e992a05d99dd20eb499c7d7f0a8a5daa4de5783b Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Wed, 17 Jan 2024 13:07:57 +1100 Subject: [PATCH 04/13] added integration test for load table --- .gitignore | 2 ++ catalog/glue.go | 2 ++ catalog/glue_test.go | 31 ++++++++++++++++++++++++++++++- 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ddbb91e..6c1bcf4 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,5 @@ lib/ *.ipr *.iws *.iml + +.envrc* diff --git a/catalog/glue.go b/catalog/glue.go index d4b54b3..1c29ad2 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -104,6 +104,8 @@ func (c *GlueCatalog) LoadTable(ctx context.Context, catalogTable CatalogTable) return nil, err } + fmt.Println("catalogTable.Location", catalogTable.Location) + // TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog. iofs, err := io.LoadFS(map[string]string{}, catalogTable.Location) if err != nil { diff --git a/catalog/glue_test.go b/catalog/glue_test.go index e0754b3..45fbb57 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -109,7 +109,9 @@ func TestGlueListTableIntegration(t *testing.T) { if os.Getenv("TEST_DATABASE_NAME") == "" { t.Skip() } - + if os.Getenv("TEST_TABLE_NAME") == "" { + t.Skip() + } assert := require.New(t) awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse)) @@ -121,3 +123,30 @@ func TestGlueListTableIntegration(t *testing.T) { assert.NoError(err) assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, tables[1].Identifier) } + +func TestGlueLoadTableIntegration(t *testing.T) { + if os.Getenv("TEST_DATABASE_NAME") == "" { + t.Skip() + } + if os.Getenv("TEST_TABLE_NAME") == "" { + t.Skip() + } + if os.Getenv("TEST_TABLE_LOCATION") == "" { + t.Skip() + } + + assert := require.New(t) + + awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse)) + assert.NoError(err) + + catalog := NewGlueCatalog(awscfg) + + table, err := catalog.LoadTable(context.TODO(), CatalogTable{ + Identifier: []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, + Location: os.Getenv("TEST_TABLE_LOCATION"), + CatalogType: Glue, + }) + assert.NoError(err) + assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier()) +} From 4f5ce6ddd800e519c555673bce7edf44d2f1b28f Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Fri, 19 Jan 2024 18:25:46 +1100 Subject: [PATCH 05/13] fix: added missing license headers from added files --- catalog/catalog.go | 17 +++++++++++++++++ catalog/glue.go | 17 +++++++++++++++++ catalog/glue_test.go | 17 +++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/catalog/catalog.go b/catalog/catalog.go index 65d0e2a..5acff37 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package catalog import ( diff --git a/catalog/glue.go b/catalog/glue.go index 1c29ad2..8357bc3 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package catalog import ( diff --git a/catalog/glue_test.go b/catalog/glue_test.go index 45fbb57..e68b8c8 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package catalog import ( From b9d4c1da6e4c13601d09978cf6505035acb83e45 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Fri, 19 Jan 2024 19:10:54 +1100 Subject: [PATCH 06/13] fix: updates based on feedback to align with python library This has the following changes: * Just provide a load table, which looks up the table in the catalog and returns it with metadata loaded. * Remove the table structure, and follow the python list tables which returns identifiers instead. * Update the interface documentation to ensure it is clear about inputs and outputs like the python library. * Adjust the tests and verify via AWS locally --- catalog/catalog.go | 16 ++++----- catalog/glue.go | 82 ++++++++++++++++++++------------------------ catalog/glue_test.go | 38 +++----------------- 3 files changed, 49 insertions(+), 87 deletions(-) diff --git a/catalog/catalog.go b/catalog/catalog.go index 5acff37..0a232d1 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -41,15 +41,11 @@ var ( // Catalog for iceberg table operations like create, drop, load, list and others. type Catalog interface { - GetTable(ctx context.Context, identifier table.Identifier) (CatalogTable, error) - ListTables(ctx context.Context, identifier table.Identifier) ([]CatalogTable, error) - LoadTable(ctx context.Context, table CatalogTable) (*table.Table, error) + // ListTables returns a list of table identifiers in the catalog, with the returned + // identifiers containing the information required to load the table via that catalog. + ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) + // LoadTable loads a table from the catalog and returns a Table with the metadata. + LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) + // CatalogType returns the type of the catalog. CatalogType() CatalogType } - -// CatalogTable is the details of a table in a catalog. -type CatalogTable struct { - Identifier table.Identifier // this identifier may vary depending on the catalog implementation - Location string // URL to the table location - CatalogType CatalogType -} diff --git a/catalog/glue.go b/catalog/glue.go index 8357bc3..8afc422 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -48,40 +48,11 @@ func NewGlueCatalog(awscfg aws.Config) *GlueCatalog { } } -// GetTable loads a table from the Glue Catalog using the given database and table name. -func (c *GlueCatalog) GetTable(ctx context.Context, identifier table.Identifier) (CatalogTable, error) { - database, tableName, err := identifierToGlueTable(identifier) - if err != nil { - return CatalogTable{}, err - } - - tblRes, err := c.glueSvc.GetTable(ctx, - &glue.GetTableInput{ - DatabaseName: aws.String(database), - Name: aws.String(tableName), - }, - ) - if err != nil { - if errors.Is(err, &types.EntityNotFoundException{}) { - return CatalogTable{}, ErrNoSuchTable - } - return CatalogTable{}, fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) - } - - if tblRes.Table.Parameters["table_type"] != "ICEBERG" { - return CatalogTable{}, errors.New("table is not an iceberg table") - } - - return CatalogTable{ - Identifier: identifier, - Location: tblRes.Table.Parameters["metadata_location"], - CatalogType: Glue, - }, nil -} - // ListTables returns a list of iceberg tables in the given Glue database. -func (c *GlueCatalog) ListTables(ctx context.Context, identifier table.Identifier) ([]CatalogTable, error) { - database, err := identifierToGlueDatabase(identifier) +// +// The namespace should just contain the Glue database name. +func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) { + database, err := identifierToGlueDatabase(namespace) if err != nil { return nil, err } @@ -93,7 +64,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context, identifier table.Identifie return nil, fmt.Errorf("failed to list tables in namespace %s: %w", database, err) } - var icebergTables []CatalogTable + var icebergTables []table.Identifier for _, tbl := range tblsRes.TableList { // skip non iceberg tables @@ -103,11 +74,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context, identifier table.Identifie } icebergTables = append(icebergTables, - CatalogTable{ - Identifier: GlueTableIdentifier(database, aws.ToString(tbl.Name)), - Location: tbl.Parameters["metadata_location"], - CatalogType: Glue, - }, + GlueTableIdentifier(database, aws.ToString(tbl.Name)), ) } @@ -115,21 +82,26 @@ func (c *GlueCatalog) ListTables(ctx context.Context, identifier table.Identifie } // LoadTable loads a table from the catalog table details. -func (c *GlueCatalog) LoadTable(ctx context.Context, catalogTable CatalogTable) (*table.Table, error) { - database, tableName, err := identifierToGlueTable(catalogTable.Identifier) +// +// The identifier should contain the Glue database name, then glue table name. +func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) { + database, tableName, err := identifierToGlueTable(identifier) if err != nil { return nil, err } - fmt.Println("catalogTable.Location", catalogTable.Location) + location, err := c.getTable(ctx, database, tableName) + if err != nil { + return nil, err + } // TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog. - iofs, err := io.LoadFS(map[string]string{}, catalogTable.Location) + iofs, err := io.LoadFS(map[string]string{}, location) if err != nil { return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err) } - icebergTable, err := table.NewFromLocation([]string{tableName}, catalogTable.Location, iofs) + icebergTable, err := table.NewFromLocation([]string{tableName}, location, iofs) if err != nil { return nil, fmt.Errorf("failed to create table from location %s.%s: %w", database, tableName, err) } @@ -141,6 +113,28 @@ func (c *GlueCatalog) CatalogType() CatalogType { return Glue } +// GetTable loads a table from the Glue Catalog using the given database and table name. +func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) (string, error) { + tblRes, err := c.glueSvc.GetTable(ctx, + &glue.GetTableInput{ + DatabaseName: aws.String(database), + Name: aws.String(tableName), + }, + ) + if err != nil { + if errors.Is(err, &types.EntityNotFoundException{}) { + return "", ErrNoSuchTable + } + return "", fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) + } + + if tblRes.Table.Parameters["table_type"] != "ICEBERG" { + return "", errors.New("table is not an iceberg table") + } + + return tblRes.Table.Parameters["metadata_location"], nil +} + func identifierToGlueTable(identifier table.Identifier) (string, string, error) { if len(identifier) != 2 { return "", "", fmt.Errorf("invalid identifier, missing database name: %v", identifier) diff --git a/catalog/glue_test.go b/catalog/glue_test.go index e68b8c8..4970aa7 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -65,11 +65,9 @@ func TestGlueGetTable(t *testing.T) { glueSvc: mockGlueSvc, } - table, err := glueCatalog.GetTable(context.TODO(), GlueTableIdentifier("test_database", "test_table")) + location, err := glueCatalog.getTable(context.TODO(), "test_database", "test_table") assert.NoError(err) - assert.Equal([]string{"test_database", "test_table"}, table.Identifier) - assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json", table.Location) - assert.Equal(table.CatalogType, Glue) + assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json", location) } func TestGlueListTables(t *testing.T) { @@ -97,29 +95,7 @@ func TestGlueListTables(t *testing.T) { tables, err := glueCatalog.ListTables(context.TODO(), GlueDatabaseIdentifier("test_database")) assert.NoError(err) - assert.Equal([]string{"test_database", "test_table"}, tables[0].Identifier) - assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json", tables[0].Location) - assert.Equal(tables[0].CatalogType, Glue) -} - -func TestGlueGetTableIntegration(t *testing.T) { - if os.Getenv("TEST_DATABASE_NAME") == "" { - t.Skip() - } - if os.Getenv("TEST_TABLE_NAME") == "" { - t.Skip() - } - assert := require.New(t) - - awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponseWithBody)) - assert.NoError(err) - - catalog := NewGlueCatalog(awscfg) - - table, err := catalog.GetTable(context.TODO(), GlueTableIdentifier(os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME"))) - assert.NoError(err) - assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, table.Identifier) - assert.Equal(table.CatalogType, Glue) + assert.Equal([]string{"test_database", "test_table"}, tables[0]) } func TestGlueListTableIntegration(t *testing.T) { @@ -138,7 +114,7 @@ func TestGlueListTableIntegration(t *testing.T) { tables, err := catalog.ListTables(context.TODO(), GlueDatabaseIdentifier(os.Getenv("TEST_DATABASE_NAME"))) assert.NoError(err) - assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, tables[1].Identifier) + assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, tables[1]) } func TestGlueLoadTableIntegration(t *testing.T) { @@ -159,11 +135,7 @@ func TestGlueLoadTableIntegration(t *testing.T) { catalog := NewGlueCatalog(awscfg) - table, err := catalog.LoadTable(context.TODO(), CatalogTable{ - Identifier: []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, - Location: os.Getenv("TEST_TABLE_LOCATION"), - CatalogType: Glue, - }) + table, err := catalog.LoadTable(context.TODO(), []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}) assert.NoError(err) assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier()) } From bc3077360d544107d85df436a5f342da05cf862c Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Tue, 23 Jan 2024 08:07:49 +1100 Subject: [PATCH 07/13] fix: do not expose glue client interface in public api --- catalog/glue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/catalog/glue.go b/catalog/glue.go index 8afc422..8b6b70d 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -33,13 +33,13 @@ var ( _ Catalog = (*GlueCatalog)(nil) ) -type GlueAPI interface { +type glueAPI interface { GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error) GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error) } type GlueCatalog struct { - glueSvc GlueAPI + glueSvc glueAPI } func NewGlueCatalog(awscfg aws.Config) *GlueCatalog { From 6327be256c8591f0fb1546d3f3e5ead7fb6570f4 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Tue, 23 Jan 2024 08:42:03 +1100 Subject: [PATCH 08/13] feat: move to functional options to cleanup creation of catalog --- catalog/catalog.go | 14 ++++++++++++++ catalog/glue.go | 10 ++++++++-- catalog/glue_test.go | 4 ++-- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/catalog/catalog.go b/catalog/catalog.go index 0a232d1..e68fcdd 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -22,6 +22,7 @@ import ( "errors" "github.com/apache/iceberg-go/table" + "github.com/aws/aws-sdk-go-v2/aws" ) type CatalogType string @@ -39,6 +40,19 @@ var ( ErrNoSuchTable = errors.New("table does not exist") ) +// WithAwsConfig sets the AWS configuration for the catalog. +func WithAwsConfig(cfg aws.Config) CatalogOption { + return func(o *CatalogOptions) { + o.awsConfig = cfg + } +} + +type CatalogOption func(*CatalogOptions) + +type CatalogOptions struct { + awsConfig aws.Config +} + // Catalog for iceberg table operations like create, drop, load, list and others. type Catalog interface { // ListTables returns a list of table identifiers in the catalog, with the returned diff --git a/catalog/glue.go b/catalog/glue.go index 8b6b70d..1e78839 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -42,9 +42,15 @@ type GlueCatalog struct { glueSvc glueAPI } -func NewGlueCatalog(awscfg aws.Config) *GlueCatalog { +func NewGlueCatalog(opts ...CatalogOption) *GlueCatalog { + options := &CatalogOptions{} + + for _, o := range opts { + o(options) + } + return &GlueCatalog{ - glueSvc: glue.NewFromConfig(awscfg), + glueSvc: glue.NewFromConfig(options.awsConfig), } } diff --git a/catalog/glue_test.go b/catalog/glue_test.go index 4970aa7..f19fb25 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -110,7 +110,7 @@ func TestGlueListTableIntegration(t *testing.T) { awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse)) assert.NoError(err) - catalog := NewGlueCatalog(awscfg) + catalog := NewGlueCatalog(WithAwsConfig(awscfg)) tables, err := catalog.ListTables(context.TODO(), GlueDatabaseIdentifier(os.Getenv("TEST_DATABASE_NAME"))) assert.NoError(err) @@ -133,7 +133,7 @@ func TestGlueLoadTableIntegration(t *testing.T) { awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse)) assert.NoError(err) - catalog := NewGlueCatalog(awscfg) + catalog := NewGlueCatalog(WithAwsConfig(awscfg)) table, err := catalog.LoadTable(context.TODO(), []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}) assert.NoError(err) From 1aa77dda55e6d562fc07dd70ce370bc2123a795f Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Thu, 25 Jan 2024 14:27:32 +1100 Subject: [PATCH 09/13] fix: update the LoadTable to accept LoadFS props --- catalog/catalog.go | 2 +- catalog/glue.go | 8 ++++++-- catalog/glue_test.go | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/catalog/catalog.go b/catalog/catalog.go index e68fcdd..966a4d6 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -59,7 +59,7 @@ type Catalog interface { // identifiers containing the information required to load the table via that catalog. ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) // LoadTable loads a table from the catalog and returns a Table with the metadata. - LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) + LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error) // CatalogType returns the type of the catalog. CatalogType() CatalogType } diff --git a/catalog/glue.go b/catalog/glue.go index 1e78839..30f776a 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -90,19 +90,23 @@ func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier // LoadTable loads a table from the catalog table details. // // The identifier should contain the Glue database name, then glue table name. -func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) { +func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error) { database, tableName, err := identifierToGlueTable(identifier) if err != nil { return nil, err } + if props == nil { + props = map[string]string{} + } + location, err := c.getTable(ctx, database, tableName) if err != nil { return nil, err } // TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog. - iofs, err := io.LoadFS(map[string]string{}, location) + iofs, err := io.LoadFS(props, location) if err != nil { return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err) } diff --git a/catalog/glue_test.go b/catalog/glue_test.go index f19fb25..ebe2eb3 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -135,7 +135,7 @@ func TestGlueLoadTableIntegration(t *testing.T) { catalog := NewGlueCatalog(WithAwsConfig(awscfg)) - table, err := catalog.LoadTable(context.TODO(), []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}) + table, err := catalog.LoadTable(context.TODO(), []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, nil) assert.NoError(err) assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier()) } From 1efa2d008f8b8a7e4b03d0688f0f5fc0a3b7784f Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Thu, 25 Jan 2024 18:50:27 +1100 Subject: [PATCH 10/13] fix: ensure glue catalog list retrieves all table entries using pagination Also addressed a couple of linter suggestions. --- catalog/catalog.go | 8 ++++---- catalog/glue.go | 46 +++++++++++++++++++++++++++++--------------- catalog/glue_test.go | 9 ++++++++- 3 files changed, 42 insertions(+), 21 deletions(-) diff --git a/catalog/catalog.go b/catalog/catalog.go index 966a4d6..b127893 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -41,15 +41,15 @@ var ( ) // WithAwsConfig sets the AWS configuration for the catalog. -func WithAwsConfig(cfg aws.Config) CatalogOption { - return func(o *CatalogOptions) { +func WithAwsConfig(cfg aws.Config) Option { + return func(o *Options) { o.awsConfig = cfg } } -type CatalogOption func(*CatalogOptions) +type Option func(*Options) -type CatalogOptions struct { +type Options struct { awsConfig aws.Config } diff --git a/catalog/glue.go b/catalog/glue.go index 30f776a..c48b6b8 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -29,6 +29,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/glue/types" ) +const glueTableTypeIceberg = "ICEBERG" + var ( _ Catalog = (*GlueCatalog)(nil) ) @@ -42,8 +44,8 @@ type GlueCatalog struct { glueSvc glueAPI } -func NewGlueCatalog(opts ...CatalogOption) *GlueCatalog { - options := &CatalogOptions{} +func NewGlueCatalog(opts ...Option) *GlueCatalog { + options := &Options{} for _, o := range opts { o(options) @@ -65,23 +67,22 @@ func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier params := &glue.GetTablesInput{DatabaseName: aws.String(database)} - tblsRes, err := c.glueSvc.GetTables(ctx, params) - if err != nil { - return nil, fmt.Errorf("failed to list tables in namespace %s: %w", database, err) - } - var icebergTables []table.Identifier - for _, tbl := range tblsRes.TableList { - // skip non iceberg tables - // TODO: consider what this would look like for non ICEBERG tables as you can convert them to ICEBERG tables via the Glue catalog API. - if tbl.Parameters["table_type"] != "ICEBERG" { - continue + for { + tblsRes, err := c.glueSvc.GetTables(ctx, params) + if err != nil { + return nil, fmt.Errorf("failed to list tables in namespace %s: %w", database, err) } icebergTables = append(icebergTables, - GlueTableIdentifier(database, aws.ToString(tbl.Name)), - ) + filterTableListByType(database, tblsRes.TableList, glueTableTypeIceberg)...) + + if tblsRes.NextToken == nil { + break + } + + params.NextToken = tblsRes.NextToken } return icebergTables, nil @@ -162,11 +163,24 @@ func identifierToGlueDatabase(identifier table.Identifier) (string, error) { } // GlueTableIdentifier returns a glue table identifier for an iceberg table in the format [database, table]. -func GlueTableIdentifier(database string, table string) table.Identifier { - return []string{database, table} +func GlueTableIdentifier(database string, tableName string) table.Identifier { + return []string{database, tableName} } // GlueDatabaseIdentifier returns a database identifier for a Glue database in the format [database]. func GlueDatabaseIdentifier(database string) table.Identifier { return []string{database} } + +func filterTableListByType(database string, tableList []types.Table, tableType string) []table.Identifier { + var filtered []table.Identifier + + for _, tbl := range tableList { + if tbl.Parameters["table_type"] != tableType { + continue + } + filtered = append(filtered, GlueTableIdentifier(database, aws.ToString(tbl.Name))) + } + + return filtered +} diff --git a/catalog/glue_test.go b/catalog/glue_test.go index ebe2eb3..1d3c42f 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -86,8 +86,14 @@ func TestGlueListTables(t *testing.T) { "metadata_location": "s3://test-bucket/test_table/metadata/abc123-123.metadata.json", }, }, + { + Name: aws.String("other_table"), + Parameters: map[string]string{ + "metadata_location": "s3://test-bucket/other_table/", + }, + }, }, - }, nil) + }, nil).Once() glueCatalog := &GlueCatalog{ glueSvc: mockGlueSvc, @@ -95,6 +101,7 @@ func TestGlueListTables(t *testing.T) { tables, err := glueCatalog.ListTables(context.TODO(), GlueDatabaseIdentifier("test_database")) assert.NoError(err) + assert.Len(tables, 1) assert.Equal([]string{"test_database", "test_table"}, tables[0]) } From da7002347f643a034392541a5309ddd3ad8410c2 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Sat, 27 Jan 2024 05:51:08 +1100 Subject: [PATCH 11/13] fix: wrap not found sentinel error to add more context based on feedback --- catalog/glue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catalog/glue.go b/catalog/glue.go index c48b6b8..6bf749b 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -134,7 +134,7 @@ func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) ) if err != nil { if errors.Is(err, &types.EntityNotFoundException{}) { - return "", ErrNoSuchTable + return "", fmt.Errorf("failed to get table %s.%s: %w", database, tableName, ErrNoSuchTable) } return "", fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) } From 595e0af546f675d4cfaacd7a6fe9d2d532350ba5 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Sat, 27 Jan 2024 12:18:04 +1100 Subject: [PATCH 12/13] fix: make options private to enable changes later --- catalog/catalog.go | 6 +++--- catalog/glue.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/catalog/catalog.go b/catalog/catalog.go index b127893..7b04468 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -42,14 +42,14 @@ var ( // WithAwsConfig sets the AWS configuration for the catalog. func WithAwsConfig(cfg aws.Config) Option { - return func(o *Options) { + return func(o *options) { o.awsConfig = cfg } } -type Option func(*Options) +type Option func(*options) -type Options struct { +type options struct { awsConfig aws.Config } diff --git a/catalog/glue.go b/catalog/glue.go index 6bf749b..3caca0f 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -45,14 +45,14 @@ type GlueCatalog struct { } func NewGlueCatalog(opts ...Option) *GlueCatalog { - options := &Options{} + glueOps := &options{} for _, o := range opts { - o(options) + o(glueOps) } return &GlueCatalog{ - glueSvc: glue.NewFromConfig(options.awsConfig), + glueSvc: glue.NewFromConfig(glueOps.awsConfig), } } From b1c3974682d92a53d67039ecff6d0a39e65f28ec Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Tue, 30 Jan 2024 21:35:37 +1100 Subject: [PATCH 13/13] docs: updated catalog support table in readme --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a0f6587..c942dc8 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ | Operation | REST | Hive | DynamoDB | Glue | | :----------------------- | :--: | :--: | :------: | :--: | +| Load Table | | | | X | +| List Tables | | | | X | | Create Table | | | | | | Update Current Snapshot | | | | | | Create New Snapshot | | | | | @@ -63,8 +65,8 @@ ### Read/Write Data Support * No intrinsic support for reading/writing data yet - * Data can be manually read currently by retrieving data files via Manifests. - * Plan to add [Apache Arrow](https://pkg.go.dev/github.com/apache/arrow/go/v14@v14.0.0) support eventually. +* Data can be manually read currently by retrieving data files via Manifests. +* Plan to add [Apache Arrow](https://pkg.go.dev/github.com/apache/arrow/go/v14@v14.0.0) support eventually. # Get in Touch