diff --git a/lib/transfer/columns.go b/lib/transfer/columns.go new file mode 100644 index 00000000..0da5fd23 --- /dev/null +++ b/lib/transfer/columns.go @@ -0,0 +1,38 @@ +package transfer + +import ( + "fmt" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" + + "github.com/artie-labs/reader/lib/debezium/transformer" +) + +type Adapter interface { + FieldConverters() []transformer.FieldConverter + PartitionKeys() []string +} + +func BuildTransferColumns(adapter Adapter) ([]columns.Column, error) { + var cols columns.Columns + for _, fc := range adapter.FieldConverters() { + kd, err := fc.ValueConverter.ToField(fc.Name).ToKindDetails() + if err != nil { + return nil, fmt.Errorf("failed to convert field %q to kind details: %w", fc.Name, err) + } + + cols.AddColumn(columns.NewColumn(fc.Name, kd)) + } + + for _, pk := range adapter.PartitionKeys() { + err := cols.UpsertColumn(pk, columns.UpsertColumnArg{ + PrimaryKey: typing.ToPtr(true), + }) + + if err != nil { + return nil, fmt.Errorf("failed to upsert primary key column %q: %w", pk, err) + } + } + + return cols.GetColumns(), nil +} diff --git a/lib/transfer/columns_test.go b/lib/transfer/columns_test.go new file mode 100644 index 00000000..77f4686e --- /dev/null +++ b/lib/transfer/columns_test.go @@ -0,0 +1,52 @@ +package transfer + +import ( + "testing" + + "github.com/artie-labs/transfer/lib/typing" + "github.com/stretchr/testify/assert" + + "github.com/artie-labs/reader/lib/debezium/converters" + "github.com/artie-labs/reader/lib/debezium/transformer" +) + +type mockAdapter struct { + fieldConverters []transformer.FieldConverter + partitionKeys []string +} + +func (m mockAdapter) FieldConverters() []transformer.FieldConverter { + return m.fieldConverters +} + +func (m mockAdapter) PartitionKeys() []string { + return m.partitionKeys +} + +func TestBuildTransferColumns(t *testing.T) { + adapter := mockAdapter{ + partitionKeys: []string{"id"}, + fieldConverters: []transformer.FieldConverter{ + { + Name: "id", + ValueConverter: converters.StringPassthrough{}, + }, + { + Name: "name", + ValueConverter: converters.StringPassthrough{}, + }, + }, + } + + cols, err := BuildTransferColumns(adapter) + assert.NoError(t, err) + + assert.Equal(t, 2, len(cols)) + assert.Equal(t, "id", cols[0].Name()) + assert.Equal(t, typing.String, cols[0].KindDetails) + assert.True(t, cols[0].PrimaryKey()) + + assert.Equal(t, "name", cols[1].Name()) + assert.Equal(t, typing.String, cols[1].KindDetails) + assert.False(t, cols[1].PrimaryKey()) +} diff --git a/sources/mysql/snapshot.go b/sources/mysql/snapshot.go index 865fcec2..3652285b 100644 --- a/sources/mysql/snapshot.go +++ b/sources/mysql/snapshot.go @@ -13,6 +13,7 @@ import ( "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib/debezium/transformer" "github.com/artie-labs/reader/lib/rdbms" + "github.com/artie-labs/reader/lib/transfer" "github.com/artie-labs/reader/sources/mysql/adapter" "github.com/artie-labs/reader/writers" ) @@ -39,15 +40,24 @@ func (s Snapshot) snapshotTable(ctx context.Context, writer writers.Writer, tabl logger := slog.With(slog.String("table", tableCfg.Name), slog.String("database", s.cfg.Database)) snapshotStartTime := time.Now() - adapter, err := adapter.NewMySQLAdapter(s.db, s.cfg.Database, tableCfg) + dbzAdapter, err := adapter.NewMySQLAdapter(s.db, s.cfg.Database, tableCfg) if err != nil { return fmt.Errorf("failed to create MySQL adapter: %w", err) } - dbzTransformer, err := transformer.NewDebeziumTransformer(adapter) + dbzTransformer, err := transformer.NewDebeziumTransformer(dbzAdapter) if err != nil { if errors.Is(err, rdbms.ErrNoPkValuesForEmptyTable) { - logger.Info("Table does not contain any rows, skipping...") + cols, err := transfer.BuildTransferColumns(dbzAdapter) + if err != nil { + return fmt.Errorf("failed to build transfer columns: %w", err) + } + + if err = writer.CreateTable(ctx, dbzAdapter.TableName(), cols); err != nil { + return fmt.Errorf("failed to create table: %w", err) + } + + logger.Info("Table has been created, it does not contain any rows") return nil } else { return fmt.Errorf("failed to build Debezium transformer for table %q: %w", tableCfg.Name, err) diff --git a/sources/postgres/adapter/adapter.go b/sources/postgres/adapter/adapter.go index 79d9a42d..54d99068 100644 --- a/sources/postgres/adapter/adapter.go +++ b/sources/postgres/adapter/adapter.go @@ -5,9 +5,6 @@ import ( "fmt" "log/slog" - "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib/debezium/converters" "github.com/artie-labs/reader/lib/debezium/transformer" @@ -64,30 +61,6 @@ func NewPostgresAdapter(db *sql.DB, tableCfg config.PostgreSQLTable) (PostgresAd }, nil } -func (p PostgresAdapter) BuildTransferColumns() ([]columns.Column, error) { - var cols columns.Columns - for _, fc := range p.FieldConverters() { - kd, err := fc.ValueConverter.ToField(fc.Name).ToKindDetails() - if err != nil { - return nil, fmt.Errorf("failed to convert field %q to kind details: %w", fc.Name, err) - } - - cols.AddColumn(columns.NewColumn(fc.Name, kd)) - } - - for _, pk := range p.PartitionKeys() { - err := cols.UpsertColumn(pk, columns.UpsertColumnArg{ - PrimaryKey: typing.ToPtr(true), - }) - - if err != nil { - return nil, fmt.Errorf("failed to upsert primary key column %q: %w", pk, err) - } - } - - return cols.GetColumns(), nil -} - func (p PostgresAdapter) TableName() string { return p.table.Name } diff --git a/sources/postgres/adapter/adapter_test.go b/sources/postgres/adapter/adapter_test.go index 676d5358..8d107ebe 100644 --- a/sources/postgres/adapter/adapter_test.go +++ b/sources/postgres/adapter/adapter_test.go @@ -5,12 +5,9 @@ import ( "time" "github.com/artie-labs/transfer/lib/debezium" - "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/stretchr/testify/assert" - "github.com/artie-labs/reader/lib/debezium/converters" - "github.com/artie-labs/reader/lib/debezium/transformer" "github.com/artie-labs/reader/lib/postgres" "github.com/artie-labs/reader/lib/postgres/schema" ) @@ -331,33 +328,3 @@ func TestValueConverterForType_Convert(t *testing.T) { } } } - -func TestPostgresAdapter_BuildTransferColumns(t *testing.T) { - adapter := PostgresAdapter{ - table: postgres.Table{ - PrimaryKeys: []string{"id"}, - }, - fieldConverters: []transformer.FieldConverter{ - { - Name: "id", - ValueConverter: converters.StringPassthrough{}, - }, - { - Name: "name", - ValueConverter: converters.StringPassthrough{}, - }, - }, - } - - cols, err := adapter.BuildTransferColumns() - assert.NoError(t, err) - - assert.Equal(t, 2, len(cols)) - assert.Equal(t, "id", cols[0].Name()) - assert.Equal(t, typing.String, cols[0].KindDetails) - assert.True(t, cols[0].PrimaryKey()) - - assert.Equal(t, "name", cols[1].Name()) - assert.Equal(t, typing.String, cols[1].KindDetails) - assert.False(t, cols[1].PrimaryKey()) -} diff --git a/sources/postgres/snapshot.go b/sources/postgres/snapshot.go index 9444d482..11775922 100644 --- a/sources/postgres/snapshot.go +++ b/sources/postgres/snapshot.go @@ -13,6 +13,7 @@ import ( "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib/debezium/transformer" "github.com/artie-labs/reader/lib/rdbms" + "github.com/artie-labs/reader/lib/transfer" "github.com/artie-labs/reader/sources/postgres/adapter" "github.com/artie-labs/reader/writers" ) @@ -51,7 +52,7 @@ func (s *Source) Run(ctx context.Context, writer writers.Writer) error { dbzTransformer, err := transformer.NewDebeziumTransformer(dbzAdapter) if err != nil { if errors.Is(err, rdbms.ErrNoPkValuesForEmptyTable) { - cols, err := dbzAdapter.BuildTransferColumns() + cols, err := transfer.BuildTransferColumns(dbzAdapter) if err != nil { return fmt.Errorf("failed to build transfer columns: %w", err) }