From d9bd22b31bbaefbb8923e0f31e9db0fbdbc6dc0d Mon Sep 17 00:00:00 2001 From: natusioe Date: Tue, 15 Oct 2024 18:42:34 -0700 Subject: [PATCH] feat(catalog/glue): implement remaining glue catalog --- catalog/glue.go | 315 +++++++++++++++++++++++++++---- catalog/glue_test.go | 429 ++++++++++++++++++++++++++++++++++++++++--- cmd/iceberg/main.go | 55 ++++-- utils.go | 16 ++ utils_test.go | 78 ++++++++ 5 files changed, 824 insertions(+), 69 deletions(-) create mode 100644 utils_test.go diff --git a/catalog/glue.go b/catalog/glue.go index 91b21ff..ee05e6f 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -30,22 +30,40 @@ import ( "github.com/aws/aws-sdk-go-v2/service/glue/types" ) -const glueTypeIceberg = "ICEBERG" +const ( + glueTypeIceberg = "ICEBERG" + databaseTypePropsKey = "database_type" + tableTypePropsKey = "table_type" + descriptionPropsKey = "Description" + + // Database location. + locationPropsKey = "Location" + + // Table metadata location pointer. + metadataLocationPropsKey = "metadata_location" +) var ( _ Catalog = (*GlueCatalog)(nil) ) type glueAPI interface { + CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error) GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error) GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error) + DeleteTable(ctx context.Context, params *glue.DeleteTableInput, optFns ...func(*glue.Options)) (*glue.DeleteTableOutput, error) + GetDatabase(ctx context.Context, params *glue.GetDatabaseInput, optFns ...func(*glue.Options)) (*glue.GetDatabaseOutput, error) GetDatabases(ctx context.Context, params *glue.GetDatabasesInput, optFns ...func(*glue.Options)) (*glue.GetDatabasesOutput, error) + CreateDatabase(ctx context.Context, params *glue.CreateDatabaseInput, optFns ...func(*glue.Options)) (*glue.CreateDatabaseOutput, error) + DeleteDatabase(ctx context.Context, params *glue.DeleteDatabaseInput, optFns ...func(*glue.Options)) (*glue.DeleteDatabaseOutput, error) + UpdateDatabase(ctx context.Context, params *glue.UpdateDatabaseInput, optFns ...func(*glue.Options)) (*glue.UpdateDatabaseOutput, error) } type GlueCatalog struct { glueSvc glueAPI } +// NewGlueCatalog creates a new instance of GlueCatalog with the given options. func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog { glueOps := &options{} @@ -58,7 +76,11 @@ func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog { } } -// ListTables returns a list of iceberg tables in the given Glue database. +func (c *GlueCatalog) CatalogType() CatalogType { + return Glue +} + +// ListTables returns a list of Iceberg tables in the given Glue database. // // The namespace should just contain the Glue database name. func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) { @@ -92,7 +114,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier // LoadTable loads a table from the catalog table details. // -// The identifier should contain the Glue database name, then glue table name. +// The identifier should contain the Glue database name, then Glue table name. func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) { database, tableName, err := identifierToGlueTable(identifier) if err != nil { @@ -103,11 +125,16 @@ func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier props = map[string]string{} } - location, err := c.getTable(ctx, database, tableName) + glueTable, err := c.getTable(ctx, database, tableName) if err != nil { return nil, err } + location, ok := glueTable.Parameters[metadataLocationPropsKey] + if !ok { + return nil, fmt.Errorf("missing metadata location for table %s.%s", database, tableName) + } + // TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog. iofs, err := io.LoadFS(props, location) if err != nil { @@ -122,33 +149,93 @@ func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier return icebergTable, nil } -func (c *GlueCatalog) CatalogType() CatalogType { - return Glue -} - +// DropTable deletes an Iceberg table from the Glue catalog. func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier) error { - return fmt.Errorf("%w: [Glue Catalog] drop table", iceberg.ErrNotImplemented) + database, tableName, err := identifierToGlueTable(identifier) + if err != nil { + return err + } + + // Check if the table exists and is an Iceberg table. + _, err = c.getTable(ctx, database, tableName) + if err != nil { + return err + } + + params := &glue.DeleteTableInput{ + DatabaseName: aws.String(database), + Name: aws.String(tableName), + } + _, err = c.glueSvc.DeleteTable(ctx, params) + if err != nil { + return fmt.Errorf("failed to drop table %s.%s: %w", database, tableName, err) + } + + return nil } +// RenameTable renames an Iceberg table in the Glue catalog. func (c *GlueCatalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) { - return nil, fmt.Errorf("%w: [Glue Catalog] rename table", iceberg.ErrNotImplemented) -} + fromDatabase, fromTable, err := identifierToGlueTable(from) + if err != nil { + return nil, err + } -func (c *GlueCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error { - return fmt.Errorf("%w: [Glue Catalog] create namespace", iceberg.ErrNotImplemented) -} + toDatabase, toTable, err := identifierToGlueTable(to) + if err != nil { + return nil, err + } -func (c *GlueCatalog) DropNamespace(ctx context.Context, namespace table.Identifier) error { - return fmt.Errorf("%w: [Glue Catalog] drop namespace", iceberg.ErrNotImplemented) -} + if fromDatabase != toDatabase { + return nil, fmt.Errorf("cannot rename table across namespaces: %s -> %s", fromDatabase, toDatabase) + } -func (c *GlueCatalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) { - return nil, fmt.Errorf("%w: [Glue Catalog] load namespace properties", iceberg.ErrNotImplemented) -} + // Fetch the existing Glue table to copy the metadata into the new table. + fromGlueTable, err := c.getTable(ctx, fromDatabase, fromTable) + if err != nil { + return nil, fmt.Errorf("failed to fetch the table %s.%s: %w", fromDatabase, fromTable, err) + } -func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier, - removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error) { - return PropertiesUpdateSummary{}, fmt.Errorf("%w: [Glue Catalog] update namespace properties", iceberg.ErrNotImplemented) + // Create the new table. + _, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{ + DatabaseName: aws.String(toDatabase), + TableInput: &types.TableInput{ + Name: aws.String(toTable), + Owner: fromGlueTable.Owner, + Description: fromGlueTable.Description, + Parameters: fromGlueTable.Parameters, + StorageDescriptor: fromGlueTable.StorageDescriptor, + }, + }) + if err != nil { + return nil, fmt.Errorf("failed to create the table %s.%s: %w", fromDatabase, fromTable, err) + } + + // Drop the old table. + _, err = c.glueSvc.DeleteTable(ctx, &glue.DeleteTableInput{ + DatabaseName: aws.String(fromDatabase), + Name: aws.String(fromTable), + }) + if err != nil { + // Best-effort rollback the table creation. + _, rollbackErr := c.glueSvc.DeleteTable(ctx, &glue.DeleteTableInput{ + DatabaseName: aws.String(toDatabase), + Name: aws.String(toTable), + }) + if rollbackErr != nil { + fmt.Printf("failed to rollback the new table %s.%s: %v", toDatabase, toTable, rollbackErr) + } + + return nil, fmt.Errorf("failed to rename the table %s.%s: %w", fromDatabase, fromTable, err) + } + + // Load the new table to return. + renamedTable, err := c.LoadTable(ctx, GlueTableIdentifier(toDatabase, toTable), nil) + if err != nil { + return nil, fmt.Errorf("failed to load renamed table %s.%s: %w", toDatabase, toTable, err) + } + + return renamedTable, nil } // ListNamespaces returns a list of Iceberg namespaces from the given Glue catalog. @@ -180,8 +267,155 @@ func (c *GlueCatalog) ListNamespaces(ctx context.Context, parent table.Identifie return icebergNamespaces, nil } +// CreateNamespace creates a new Iceberg namespace in the Glue catalog. +func (c *GlueCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error { + database, err := identifierToGlueDatabase(namespace) + if err != nil { + return err + } + + databaseParameters := map[string]string{ + databaseTypePropsKey: glueTypeIceberg, + } + + description := props[descriptionPropsKey] + locationURI := props[locationPropsKey] + + if description != "" { + databaseParameters[descriptionPropsKey] = description + } + if locationURI != "" { + databaseParameters[locationPropsKey] = locationURI + } + + databaseInput := &types.DatabaseInput{ + Name: aws.String(database), + Parameters: databaseParameters, + } + + params := &glue.CreateDatabaseInput{DatabaseInput: databaseInput} + _, err = c.glueSvc.CreateDatabase(ctx, params) + + if err != nil { + return fmt.Errorf("failed to create database %s: %w", database, err) + } + + return nil +} + +// DropNamespace deletes an Iceberg namespace from the Glue catalog. +func (c *GlueCatalog) DropNamespace(ctx context.Context, namespace table.Identifier) error { + databaseName, err := identifierToGlueDatabase(namespace) + if err != nil { + return err + } + + // Check if the database exists and is an iceberg database. + _, err = c.getDatabase(ctx, databaseName) + if err != nil { + return err + } + + params := &glue.DeleteDatabaseInput{Name: aws.String(databaseName)} + _, err = c.glueSvc.DeleteDatabase(ctx, params) + if err != nil { + return fmt.Errorf("failed to drop namespace %s: %w", databaseName, err) + } + + return nil +} + +// LoadNamespaceProperties loads the properties of an Iceberg namespace from the Glue catalog. +func (c *GlueCatalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) { + databaseName, err := identifierToGlueDatabase(namespace) + if err != nil { + return nil, err + } + + database, err := c.getDatabase(ctx, databaseName) + if err != nil { + return nil, err + } + + props := make(map[string]string) + if database.Parameters != nil { + for k, v := range database.Parameters { + props[k] = v + } + } + + return props, nil +} + +// UpdateNamespaceProperties updates the properties of an Iceberg namespace in the Glue catalog. +// The removals list contains the keys to remove, and the updates map contains the keys and values to update. +func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier, + removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error) { + + databaseName, err := identifierToGlueDatabase(namespace) + if err != nil { + return PropertiesUpdateSummary{}, err + } + + database, err := c.getDatabase(ctx, databaseName) + if err != nil { + return PropertiesUpdateSummary{}, err + } + + overlap := []string{} + for _, key := range removals { + if _, exists := updates[key]; exists { + overlap = append(overlap, key) + } + } + if len(overlap) > 0 { + return PropertiesUpdateSummary{}, fmt.Errorf("conflict between removals and updates for keys: %v", overlap) + } + + updatedProperties := make(map[string]string) + if database.Parameters != nil { + for k, v := range database.Parameters { + updatedProperties[k] = v + } + } + + // Removals. + removed := []string{} + for _, key := range removals { + if _, exists := updatedProperties[key]; exists { + delete(updatedProperties, key) + removed = append(removed, key) + } + } + + // Updates. + updated := []string{} + for key, value := range updates { + if updatedProperties[key] != value { + updatedProperties[key] = value + updated = append(updated, key) + } + } + + _, err = c.glueSvc.UpdateDatabase(ctx, &glue.UpdateDatabaseInput{Name: aws.String(databaseName), DatabaseInput: &types.DatabaseInput{ + Name: aws.String(databaseName), + Parameters: updatedProperties, + }}) + if err != nil { + return PropertiesUpdateSummary{}, fmt.Errorf("failed to update namespace properties %s: %w", databaseName, err) + } + + propertiesUpdateSummary := PropertiesUpdateSummary{ + Removed: removed, + Updated: updated, + Missing: iceberg.Difference(removals, removed), + } + + return propertiesUpdateSummary, nil +} + // GetTable loads a table from the Glue Catalog using the given database and table name. -func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) (string, error) { +func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) (*types.Table, error) { tblRes, err := c.glueSvc.GetTable(ctx, &glue.GetTableInput{ DatabaseName: aws.String(database), @@ -190,16 +424,33 @@ func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) ) if err != nil { if errors.Is(err, &types.EntityNotFoundException{}) { - return "", fmt.Errorf("failed to get table %s.%s: %w", database, tableName, ErrNoSuchTable) + return nil, fmt.Errorf("failed to get table %s.%s: %w", database, tableName, ErrNoSuchTable) + } + return nil, fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) + } + + if tblRes.Table.Parameters[tableTypePropsKey] != glueTypeIceberg { + return nil, fmt.Errorf("table %s.%s is not an iceberg table", database, tableName) + } + + return tblRes.Table, nil +} + +// GetDatabase loads a database from the Glue Catalog using the given database name. +func (c *GlueCatalog) getDatabase(ctx context.Context, databaseName string) (*types.Database, error) { + database, err := c.glueSvc.GetDatabase(ctx, &glue.GetDatabaseInput{Name: aws.String(databaseName)}) + if err != nil { + if errors.Is(err, &types.EntityNotFoundException{}) { + return nil, fmt.Errorf("failed to get namespace %s: %w", databaseName, ErrNoSuchNamespace) } - return "", fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) + return nil, fmt.Errorf("failed to get namespace %s: %w", databaseName, err) } - if tblRes.Table.Parameters["table_type"] != "ICEBERG" { - return "", errors.New("table is not an iceberg table") + if database.Database.Parameters[databaseTypePropsKey] != glueTypeIceberg { + return nil, fmt.Errorf("namespace %s is not an iceberg namespace", databaseName) } - return tblRes.Table.Parameters["metadata_location"], nil + return database.Database, nil } func identifierToGlueTable(identifier table.Identifier) (string, string, error) { @@ -218,7 +469,7 @@ func identifierToGlueDatabase(identifier table.Identifier) (string, error) { return identifier[0], nil } -// GlueTableIdentifier returns a glue table identifier for an iceberg table in the format [database, table]. +// GlueTableIdentifier returns a glue table identifier for an Iceberg table in the format [database, table]. func GlueTableIdentifier(database string, tableName string) table.Identifier { return []string{database, tableName} } @@ -232,7 +483,7 @@ func filterTableListByType(database string, tableList []types.Table, tableType s var filtered []table.Identifier for _, tbl := range tableList { - if tbl.Parameters["table_type"] != tableType { + if tbl.Parameters[tableTypePropsKey] != tableType { continue } filtered = append(filtered, GlueTableIdentifier(database, aws.ToString(tbl.Name))) @@ -245,7 +496,7 @@ func filterDatabaseListByType(databases []types.Database, databaseType string) [ var filtered []table.Identifier for _, database := range databases { - if database.Parameters["database_type"] != databaseType { + if database.Parameters[databaseTypePropsKey] != databaseType { continue } filtered = append(filtered, GlueDatabaseIdentifier(aws.ToString(database.Name))) diff --git a/catalog/glue_test.go b/catalog/glue_test.go index 5889537..c08f4cc 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -19,6 +19,7 @@ package catalog import ( "context" + "errors" "os" "testing" @@ -34,6 +35,11 @@ type mockGlueClient struct { mock.Mock } +func (m *mockGlueClient) CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error) { + args := m.Called(ctx, params, optFns) + return args.Get(0).(*glue.CreateTableOutput), args.Error(1) +} + func (m *mockGlueClient) GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error) { args := m.Called(ctx, params, optFns) return args.Get(0).(*glue.GetTableOutput), args.Error(1) @@ -44,11 +50,51 @@ func (m *mockGlueClient) GetTables(ctx context.Context, params *glue.GetTablesIn return args.Get(0).(*glue.GetTablesOutput), args.Error(1) } +func (m *mockGlueClient) DeleteTable(ctx context.Context, params *glue.DeleteTableInput, optFns ...func(*glue.Options)) (*glue.DeleteTableOutput, error) { + args := m.Called(ctx, params, optFns) + return args.Get(0).(*glue.DeleteTableOutput), args.Error(1) +} + +func (m *mockGlueClient) GetDatabase(ctx context.Context, params *glue.GetDatabaseInput, optFns ...func(*glue.Options)) (*glue.GetDatabaseOutput, error) { + args := m.Called(ctx, params, optFns) + return args.Get(0).(*glue.GetDatabaseOutput), args.Error(1) +} + func (m *mockGlueClient) GetDatabases(ctx context.Context, params *glue.GetDatabasesInput, optFns ...func(*glue.Options)) (*glue.GetDatabasesOutput, error) { args := m.Called(ctx, params, optFns) return args.Get(0).(*glue.GetDatabasesOutput), args.Error(1) } +func (m *mockGlueClient) CreateDatabase(ctx context.Context, params *glue.CreateDatabaseInput, optFns ...func(*glue.Options)) (*glue.CreateDatabaseOutput, error) { + args := m.Called(ctx, params, optFns) + return args.Get(0).(*glue.CreateDatabaseOutput), args.Error(1) +} + +func (m *mockGlueClient) DeleteDatabase(ctx context.Context, params *glue.DeleteDatabaseInput, optFns ...func(*glue.Options)) (*glue.DeleteDatabaseOutput, error) { + args := m.Called(ctx, params, optFns) + return args.Get(0).(*glue.DeleteDatabaseOutput), args.Error(1) +} + +func (m *mockGlueClient) UpdateDatabase(ctx context.Context, params *glue.UpdateDatabaseInput, optFns ...func(*glue.Options)) (*glue.UpdateDatabaseOutput, error) { + args := m.Called(ctx, params, optFns) + return args.Get(0).(*glue.UpdateDatabaseOutput), args.Error(1) +} + +var testIcebergGlueTable = types.Table{ + Name: aws.String("test_table"), + Parameters: map[string]string{ + tableTypePropsKey: "ICEBERG", + metadataLocationPropsKey: "s3://test-bucket/test_table/metadata/abc123-123.metadata.json", + }, +} + +var testNonIcebergGlueTable = types.Table{ + Name: aws.String("other_table"), + Parameters: map[string]string{ + metadataLocationPropsKey: "s3://test-bucket/other_table/", + }, +} + func TestGlueGetTable(t *testing.T) { assert := require.New(t) @@ -57,22 +103,15 @@ func TestGlueGetTable(t *testing.T) { mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{ DatabaseName: aws.String("test_database"), Name: aws.String("test_table"), - }, mock.Anything).Return(&glue.GetTableOutput{ - Table: &types.Table{ - Parameters: map[string]string{ - "table_type": "ICEBERG", - "metadata_location": "s3://test-bucket/test_table/metadata/abc123-123.metadata.json", - }, - }, - }, nil) + }, mock.Anything).Return(&glue.GetTableOutput{Table: &testIcebergGlueTable}, nil) glueCatalog := &GlueCatalog{ glueSvc: mockGlueSvc, } - location, err := glueCatalog.getTable(context.TODO(), "test_database", "test_table") + table, err := glueCatalog.getTable(context.TODO(), "test_database", "test_table") assert.NoError(err) - assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json", location) + assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json", table.Parameters[metadataLocationPropsKey]) } func TestGlueListTables(t *testing.T) { @@ -83,21 +122,7 @@ func TestGlueListTables(t *testing.T) { mockGlueSvc.On("GetTables", mock.Anything, &glue.GetTablesInput{ DatabaseName: aws.String("test_database"), }, mock.Anything).Return(&glue.GetTablesOutput{ - TableList: []types.Table{ - { - Name: aws.String("test_table"), - Parameters: map[string]string{ - "table_type": "ICEBERG", - "metadata_location": "s3://test-bucket/test_table/metadata/abc123-123.metadata.json", - }, - }, - { - Name: aws.String("other_table"), - Parameters: map[string]string{ - "metadata_location": "s3://test-bucket/other_table/", - }, - }, - }, + TableList: []types.Table{testIcebergGlueTable, testNonIcebergGlueTable}, }, nil).Once() glueCatalog := &GlueCatalog{ @@ -140,6 +165,360 @@ func TestGlueListNamespaces(t *testing.T) { assert.Equal([]string{"test_database"}, databases[0]) } +func TestGlueDropTable(t *testing.T) { + assert := require.New(t) + + mockGlueSvc := &mockGlueClient{} + + mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{ + DatabaseName: aws.String("test_database"), + Name: aws.String("test_table"), + }, mock.Anything).Return(&glue.GetTableOutput{ + Table: &testIcebergGlueTable, + }, nil).Once() + + mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{ + DatabaseName: aws.String("test_database"), + Name: aws.String("test_table"), + }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once() + + glueCatalog := &GlueCatalog{ + glueSvc: mockGlueSvc, + } + + err := glueCatalog.DropTable(context.TODO(), GlueTableIdentifier("test_database", "test_table")) + assert.NoError(err) +} + +func TestGlueCreateNamespace(t *testing.T) { + assert := require.New(t) + + mockGlueSvc := &mockGlueClient{} + + mockGlueSvc.On("CreateDatabase", mock.Anything, &glue.CreateDatabaseInput{ + DatabaseInput: &types.DatabaseInput{ + Name: aws.String("test_namespace"), + Parameters: map[string]string{ + databaseTypePropsKey: glueTypeIceberg, + descriptionPropsKey: "Test Description", + locationPropsKey: "s3://test-location", + }, + }, + }, mock.Anything).Return(&glue.CreateDatabaseOutput{}, nil).Once() + + glueCatalog := &GlueCatalog{ + glueSvc: mockGlueSvc, + } + + props := map[string]string{ + descriptionPropsKey: "Test Description", + locationPropsKey: "s3://test-location", + } + + err := glueCatalog.CreateNamespace(context.TODO(), GlueDatabaseIdentifier("test_namespace"), props) + assert.NoError(err) +} + +func TestGlueDropNamespace(t *testing.T) { + assert := require.New(t) + + mockGlueSvc := &mockGlueClient{} + + mockGlueSvc.On("GetDatabase", mock.Anything, &glue.GetDatabaseInput{ + Name: aws.String("test_namespace"), + }, mock.Anything).Return(&glue.GetDatabaseOutput{ + Database: &types.Database{ + Name: aws.String("test_namespace"), + Parameters: map[string]string{ + "database_type": "ICEBERG", + }, + }, + }, nil).Once() + + mockGlueSvc.On("DeleteDatabase", mock.Anything, &glue.DeleteDatabaseInput{ + Name: aws.String("test_namespace"), + }, mock.Anything).Return(&glue.DeleteDatabaseOutput{}, nil).Once() + + glueCatalog := &GlueCatalog{ + glueSvc: mockGlueSvc, + } + + err := glueCatalog.DropNamespace(context.TODO(), GlueDatabaseIdentifier("test_namespace")) + assert.NoError(err) +} + +func TestGlueUpdateNamespaceProperties(t *testing.T) { + tests := []struct { + name string + initial map[string]string + updates map[string]string + removals []string + expected PropertiesUpdateSummary + shouldError bool + }{ + { + name: "Overlapping removals and updates", + initial: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + updates: map[string]string{ + "key1": "new_value1", + "key3": "value3", + }, + removals: []string{"key1"}, + shouldError: true, + }, + { + name: "Some keys in removals are missing", + initial: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + updates: map[string]string{ + "key3": "value3", + }, + removals: []string{"key4"}, + expected: PropertiesUpdateSummary{ + Removed: []string{}, + Updated: []string{"key3"}, + Missing: []string{"key4"}, + }, + shouldError: false, + }, + { + name: "No changes to some properties", + initial: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + updates: map[string]string{ + "key1": "value1", + "key3": "value3", + }, + removals: []string{}, + expected: PropertiesUpdateSummary{ + Removed: []string{}, + Updated: []string{"key3"}, + Missing: []string{}, + }, + shouldError: false, + }, + { + name: "Happy path with updates and removals", + initial: map[string]string{ + "key1": "value1", + "key2": "value2", + "key4": "value4", + }, + updates: map[string]string{ + "key2": "new_value2", + }, + removals: []string{"key4"}, + expected: PropertiesUpdateSummary{ + Removed: []string{"key4"}, + Updated: []string{"key2"}, + Missing: []string{}, + }, + shouldError: false, + }, + { + name: "Happy path with only updates", + initial: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + updates: map[string]string{ + "key2": "new_value2", + }, + removals: []string{}, + expected: PropertiesUpdateSummary{ + Removed: []string{}, + Updated: []string{"key2"}, + Missing: []string{}, + }, + shouldError: false, + }, + { + name: "Happy path with only removals", + initial: map[string]string{ + "key1": "value1", + "key2": "value2", + "key3": "value3", + }, + updates: map[string]string{}, + removals: []string{"key2", "key3"}, + expected: PropertiesUpdateSummary{ + Removed: []string{"key2", "key3"}, + Updated: []string{}, + Missing: []string{}, + }, + shouldError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := require.New(t) + + mockGlueSvc := &mockGlueClient{} + + tt.initial[databaseTypePropsKey] = glueTypeIceberg + + mockGlueSvc.On("GetDatabase", mock.Anything, &glue.GetDatabaseInput{ + Name: aws.String("test_namespace"), + }, mock.Anything).Return(&glue.GetDatabaseOutput{ + Database: &types.Database{ + Name: aws.String("test_namespace"), + Parameters: tt.initial, + }, + }, nil).Once() + + if !tt.shouldError { + mockGlueSvc.On("UpdateDatabase", mock.Anything, mock.Anything, mock.Anything).Return(&glue.UpdateDatabaseOutput{}, nil).Once() + } + + glueCatalog := &GlueCatalog{ + glueSvc: mockGlueSvc, + } + + summary, err := glueCatalog.UpdateNamespaceProperties(context.TODO(), GlueDatabaseIdentifier("test_namespace"), tt.removals, tt.updates) + if tt.shouldError { + assert.Error(err) + } else { + assert.NoError(err) + assert.EqualValues(tt.expected.Removed, summary.Removed) + assert.EqualValues(tt.expected.Updated, summary.Updated) + assert.EqualValues(tt.expected.Missing, summary.Missing) + } + }) + } +} + +func TestGlueRenameTable(t *testing.T) { + t.Skip("Skipping this test temporarily because LoadTable is not testable due to the dependency on the IO.") + + assert := require.New(t) + + mockGlueSvc := &mockGlueClient{} + + // Mock GetTable response + mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{ + DatabaseName: aws.String("test_database"), + Name: aws.String("test_table"), + }, mock.Anything).Return(&glue.GetTableOutput{ + Table: &types.Table{ + Name: aws.String("test_table"), + Parameters: map[string]string{ + tableTypePropsKey: glueTypeIceberg, + }, + Owner: aws.String("owner"), + Description: aws.String("description"), + StorageDescriptor: &types.StorageDescriptor{}, + }, + }, nil).Once() + + mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{ + DatabaseName: aws.String("test_database"), + Name: aws.String("new_test_table"), + }, mock.Anything).Return(&glue.GetTableOutput{ + Table: &types.Table{ + Name: aws.String("new_test_table"), + Parameters: map[string]string{ + tableTypePropsKey: glueTypeIceberg, + metadataLocationPropsKey: "s3://test-bucket/new_test_table/metadata/abc123-123.metadata.json", + }, + Owner: aws.String("owner"), + Description: aws.String("description"), + StorageDescriptor: &types.StorageDescriptor{}, + }, + }, nil).Once() + + // Mock CreateTable response + mockGlueSvc.On("CreateTable", mock.Anything, &glue.CreateTableInput{ + DatabaseName: aws.String("test_database"), + TableInput: &types.TableInput{ + Name: aws.String("new_test_table"), + Owner: aws.String("owner"), + Description: aws.String("description"), + Parameters: map[string]string{tableTypePropsKey: glueTypeIceberg}, + StorageDescriptor: &types.StorageDescriptor{}, + }, + }, mock.Anything).Return(&glue.CreateTableOutput{}, nil).Once() + + // Mock DeleteTable response for old table + mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{ + DatabaseName: aws.String("test_database"), + Name: aws.String("test_table"), + }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once() + + glueCatalog := &GlueCatalog{ + glueSvc: mockGlueSvc, + } + + renamedTable, err := glueCatalog.RenameTable(context.TODO(), GlueTableIdentifier("test_database", "test_table"), GlueTableIdentifier("test_database", "new_test_table")) + assert.NoError(err) + assert.Equal("new_test_table", renamedTable.Identifier()[1]) +} + +func TestGlueRenameTable_DeleteTableFailureRollback(t *testing.T) { + assert := require.New(t) + + mockGlueSvc := &mockGlueClient{} + + // Mock GetTable response + mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{ + DatabaseName: aws.String("test_database"), + Name: aws.String("test_table"), + }, mock.Anything).Return(&glue.GetTableOutput{ + Table: &types.Table{ + Name: aws.String("test_table"), + Parameters: map[string]string{ + tableTypePropsKey: glueTypeIceberg, + }, + Owner: aws.String("owner"), + Description: aws.String("description"), + StorageDescriptor: &types.StorageDescriptor{}, + }, + }, nil).Once() + + // Mock CreateTable response + mockGlueSvc.On("CreateTable", mock.Anything, &glue.CreateTableInput{ + DatabaseName: aws.String("test_database"), + TableInput: &types.TableInput{ + Name: aws.String("new_test_table"), + Owner: aws.String("owner"), + Description: aws.String("description"), + Parameters: map[string]string{tableTypePropsKey: glueTypeIceberg}, + StorageDescriptor: &types.StorageDescriptor{}, + }, + }, mock.Anything).Return(&glue.CreateTableOutput{}, nil).Once() + + // Mock DeleteTable response for old table (fail) + mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{ + DatabaseName: aws.String("test_database"), + Name: aws.String("test_table"), + }, mock.Anything).Return(&glue.DeleteTableOutput{}, errors.New("delete table failed")).Once() + + // Mock DeleteTable response for rollback (new table) + mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{ + DatabaseName: aws.String("test_database"), + Name: aws.String("new_test_table"), + }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once() + + glueCatalog := &GlueCatalog{ + glueSvc: mockGlueSvc, + } + + renamedTable, err := glueCatalog.RenameTable(context.TODO(), GlueTableIdentifier("test_database", "test_table"), GlueTableIdentifier("test_database", "new_test_table")) + assert.Error(err) + assert.Nil(renamedTable) + mockGlueSvc.AssertCalled(t, "DeleteTable", mock.Anything, &glue.DeleteTableInput{ + DatabaseName: aws.String("test_database"), + Name: aws.String("new_test_table"), + }, mock.Anything) +} + func TestGlueListTablesIntegration(t *testing.T) { if os.Getenv("TEST_DATABASE_NAME") == "" { t.Skip() diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go index fb25618..f270bc7 100644 --- a/cmd/iceberg/main.go +++ b/cmd/iceberg/main.go @@ -37,6 +37,7 @@ Usage: iceberg list [options] [PARENT] iceberg describe [options] [namespace | table] IDENTIFIER iceberg (schema | spec | uuid | location) [options] TABLE_ID + iceberg create [options] (namespace | table) IDENTIFIER iceberg drop [options] (namespace | table) IDENTIFIER iceberg files [options] TABLE_ID [--history] iceberg rename [options] @@ -53,12 +54,14 @@ Arguments: VALUE value to set Options: - -h --help show this helpe messages and exit - --catalog TEXT specify the catalog type [default: rest] - --uri TEXT specify the catalog URI - --output TYPE output type (json/text) [default: text] - --credential TEXT specify credentials for the catalog - --warehouse TEXT specify the warehouse to use` + -h --help show this helpe messages and exit + --catalog TEXT specify the catalog type [default: rest] + --uri TEXT specify the catalog URI + --output TYPE output type (json/text) [default: text] + --credential TEXT specify credentials for the catalog + --warehouse TEXT specify the warehouse to use + --description TEXT specify a description for the namespace + --location-uri TEXT specify a location URI for the namespace` func main() { args, err := docopt.ParseArgs(usage, os.Args[1:], iceberg.Version()) @@ -74,6 +77,7 @@ func main() { Uuid bool `docopt:"uuid"` Location bool `docopt:"location"` Props bool `docopt:"properties"` + Create bool `docopt:"create"` Drop bool `docopt:"drop"` Files bool `docopt:"files"` Rename bool `docopt:"rename"` @@ -94,12 +98,14 @@ func main() { PropName string `docopt:"PROPNAME"` Value string `docopt:"VALUE"` - Catalog string `docopt:"--catalog"` - URI string `docopt:"--uri"` - Output string `docopt:"--output"` - History bool `docopt:"--history"` - Cred string `docopt:"--credential"` - Warehouse string `docopt:"--warehouse"` + Catalog string `docopt:"--catalog"` + URI string `docopt:"--uri"` + Output string `docopt:"--output"` + History bool `docopt:"--history"` + Cred string `docopt:"--credential"` + Warehouse string `docopt:"--warehouse"` + Description string `docopt:"--description"` + LocationURI string `docopt:"--location-uri"` }{} if err := args.Bind(&cfg); err != nil { @@ -131,6 +137,9 @@ func main() { if cat, err = catalog.NewRestCatalog("rest", cfg.URI, opts...); err != nil { log.Fatal(err) } + case catalog.Glue: + opts := []catalog.Option[catalog.GlueCatalog]{} + cat = catalog.NewGlueCatalog(opts...) default: log.Fatal("unrecognized catalog type") } @@ -191,6 +200,28 @@ func main() { os.Exit(1) } } + + case cfg.Create: + switch { + case cfg.Namespace: + props := iceberg.Properties{} + if cfg.Description != "" { + props["Description"] = cfg.Description + } + + if cfg.LocationURI != "" { + props["Location"] = cfg.LocationURI + } + + err := cat.CreateNamespace(context.Background(), catalog.ToRestIdentifier(cfg.Ident), props) + if err != nil { + output.Error(err) + os.Exit(1) + } + default: + output.Error(errors.New("not implemented")) + os.Exit(1) + } case cfg.Files: tbl := loadTable(output, cat, cfg.TableID) output.Files(tbl, cfg.History) diff --git a/utils.go b/utils.go index c0a00fe..7807acb 100644 --- a/utils.go +++ b/utils.go @@ -196,3 +196,19 @@ func (l literalSet) All(fn func(Literal) bool) bool { } return true } + +// Helper function to find the difference between two slices (a - b). +func Difference(a, b []string) []string { + m := make(map[string]bool) + for _, item := range b { + m[item] = true + } + + diff := make([]string, 0) + for _, item := range a { + if !m[item] { + diff = append(diff, item) + } + } + return diff +} diff --git a/utils_test.go b/utils_test.go new file mode 100644 index 0000000..9f4f888 --- /dev/null +++ b/utils_test.go @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iceberg + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDifference(t *testing.T) { + tests := []struct { + name string + a []string + b []string + expected []string + }{ + { + name: "No elements in common", + a: []string{"a", "b", "c"}, + b: []string{"d", "e", "f"}, + expected: []string{"a", "b", "c"}, + }, + { + name: "Some elements in common", + a: []string{"a", "b", "c"}, + b: []string{"b"}, + expected: []string{"a", "c"}, + }, + { + name: "All elements in common", + a: []string{"a", "b", "c"}, + b: []string{"a", "b", "c"}, + expected: []string{}, + }, + { + name: "Empty slice a", + a: []string{}, + b: []string{"a", "b", "c"}, + expected: []string{}, + }, + { + name: "Empty slice b", + a: []string{"a", "b", "c"}, + b: []string{}, + expected: []string{"a", "b", "c"}, + }, + { + name: "No elements in slice b present in slice a", + a: []string{"a", "b", "c"}, + b: []string{"x", "y", "z"}, + expected: []string{"a", "b", "c"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := require.New(t) + result := Difference(tt.a, tt.b) + assert.ElementsMatch(tt.expected, result) + }) + } +}