Skip to content

Commit

Permalink
Create table fast follow + enabling MySQL (#564)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 2, 2024
1 parent 68c4136 commit 52e11e8
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 64 deletions.
38 changes: 38 additions & 0 deletions lib/transfer/columns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package transfer

import (
"fmt"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/reader/lib/debezium/transformer"
)

type Adapter interface {
FieldConverters() []transformer.FieldConverter
PartitionKeys() []string
}

func BuildTransferColumns(adapter Adapter) ([]columns.Column, error) {
var cols columns.Columns
for _, fc := range adapter.FieldConverters() {
kd, err := fc.ValueConverter.ToField(fc.Name).ToKindDetails()
if err != nil {
return nil, fmt.Errorf("failed to convert field %q to kind details: %w", fc.Name, err)
}

cols.AddColumn(columns.NewColumn(fc.Name, kd))
}

for _, pk := range adapter.PartitionKeys() {
err := cols.UpsertColumn(pk, columns.UpsertColumnArg{
PrimaryKey: typing.ToPtr(true),
})

if err != nil {
return nil, fmt.Errorf("failed to upsert primary key column %q: %w", pk, err)
}
}

return cols.GetColumns(), nil
}
52 changes: 52 additions & 0 deletions lib/transfer/columns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package transfer

import (
"testing"

"github.com/artie-labs/transfer/lib/typing"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/reader/lib/debezium/converters"
"github.com/artie-labs/reader/lib/debezium/transformer"
)

type mockAdapter struct {
fieldConverters []transformer.FieldConverter
partitionKeys []string
}

func (m mockAdapter) FieldConverters() []transformer.FieldConverter {
return m.fieldConverters
}

func (m mockAdapter) PartitionKeys() []string {
return m.partitionKeys
}

func TestBuildTransferColumns(t *testing.T) {
adapter := mockAdapter{
partitionKeys: []string{"id"},
fieldConverters: []transformer.FieldConverter{
{
Name: "id",
ValueConverter: converters.StringPassthrough{},
},
{
Name: "name",
ValueConverter: converters.StringPassthrough{},
},
},
}

cols, err := BuildTransferColumns(adapter)
assert.NoError(t, err)

assert.Equal(t, 2, len(cols))
assert.Equal(t, "id", cols[0].Name())
assert.Equal(t, typing.String, cols[0].KindDetails)
assert.True(t, cols[0].PrimaryKey())

assert.Equal(t, "name", cols[1].Name())
assert.Equal(t, typing.String, cols[1].KindDetails)
assert.False(t, cols[1].PrimaryKey())
}
16 changes: 13 additions & 3 deletions sources/mysql/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/debezium/transformer"
"github.com/artie-labs/reader/lib/rdbms"
"github.com/artie-labs/reader/lib/transfer"
"github.com/artie-labs/reader/sources/mysql/adapter"
"github.com/artie-labs/reader/writers"
)
Expand All @@ -39,15 +40,24 @@ func (s Snapshot) snapshotTable(ctx context.Context, writer writers.Writer, tabl
logger := slog.With(slog.String("table", tableCfg.Name), slog.String("database", s.cfg.Database))
snapshotStartTime := time.Now()

adapter, err := adapter.NewMySQLAdapter(s.db, s.cfg.Database, tableCfg)
dbzAdapter, err := adapter.NewMySQLAdapter(s.db, s.cfg.Database, tableCfg)
if err != nil {
return fmt.Errorf("failed to create MySQL adapter: %w", err)
}

dbzTransformer, err := transformer.NewDebeziumTransformer(adapter)
dbzTransformer, err := transformer.NewDebeziumTransformer(dbzAdapter)
if err != nil {
if errors.Is(err, rdbms.ErrNoPkValuesForEmptyTable) {
logger.Info("Table does not contain any rows, skipping...")
cols, err := transfer.BuildTransferColumns(dbzAdapter)
if err != nil {
return fmt.Errorf("failed to build transfer columns: %w", err)
}

if err = writer.CreateTable(ctx, dbzAdapter.TableName(), cols); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}

logger.Info("Table has been created, it does not contain any rows")
return nil
} else {
return fmt.Errorf("failed to build Debezium transformer for table %q: %w", tableCfg.Name, err)
Expand Down
27 changes: 0 additions & 27 deletions sources/postgres/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"fmt"
"log/slog"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/debezium/converters"
"github.com/artie-labs/reader/lib/debezium/transformer"
Expand Down Expand Up @@ -64,30 +61,6 @@ func NewPostgresAdapter(db *sql.DB, tableCfg config.PostgreSQLTable) (PostgresAd
}, nil
}

func (p PostgresAdapter) BuildTransferColumns() ([]columns.Column, error) {
var cols columns.Columns
for _, fc := range p.FieldConverters() {
kd, err := fc.ValueConverter.ToField(fc.Name).ToKindDetails()
if err != nil {
return nil, fmt.Errorf("failed to convert field %q to kind details: %w", fc.Name, err)
}

cols.AddColumn(columns.NewColumn(fc.Name, kd))
}

for _, pk := range p.PartitionKeys() {
err := cols.UpsertColumn(pk, columns.UpsertColumnArg{
PrimaryKey: typing.ToPtr(true),
})

if err != nil {
return nil, fmt.Errorf("failed to upsert primary key column %q: %w", pk, err)
}
}

return cols.GetColumns(), nil
}

func (p PostgresAdapter) TableName() string {
return p.table.Name
}
Expand Down
33 changes: 0 additions & 33 deletions sources/postgres/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ import (
"time"

"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/stretchr/testify/assert"

"github.com/artie-labs/reader/lib/debezium/converters"
"github.com/artie-labs/reader/lib/debezium/transformer"
"github.com/artie-labs/reader/lib/postgres"
"github.com/artie-labs/reader/lib/postgres/schema"
)
Expand Down Expand Up @@ -331,33 +328,3 @@ func TestValueConverterForType_Convert(t *testing.T) {
}
}
}

func TestPostgresAdapter_BuildTransferColumns(t *testing.T) {
adapter := PostgresAdapter{
table: postgres.Table{
PrimaryKeys: []string{"id"},
},
fieldConverters: []transformer.FieldConverter{
{
Name: "id",
ValueConverter: converters.StringPassthrough{},
},
{
Name: "name",
ValueConverter: converters.StringPassthrough{},
},
},
}

cols, err := adapter.BuildTransferColumns()
assert.NoError(t, err)

assert.Equal(t, 2, len(cols))
assert.Equal(t, "id", cols[0].Name())
assert.Equal(t, typing.String, cols[0].KindDetails)
assert.True(t, cols[0].PrimaryKey())

assert.Equal(t, "name", cols[1].Name())
assert.Equal(t, typing.String, cols[1].KindDetails)
assert.False(t, cols[1].PrimaryKey())
}
3 changes: 2 additions & 1 deletion sources/postgres/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/debezium/transformer"
"github.com/artie-labs/reader/lib/rdbms"
"github.com/artie-labs/reader/lib/transfer"
"github.com/artie-labs/reader/sources/postgres/adapter"
"github.com/artie-labs/reader/writers"
)
Expand Down Expand Up @@ -51,7 +52,7 @@ func (s *Source) Run(ctx context.Context, writer writers.Writer) error {
dbzTransformer, err := transformer.NewDebeziumTransformer(dbzAdapter)
if err != nil {
if errors.Is(err, rdbms.ErrNoPkValuesForEmptyTable) {
cols, err := dbzAdapter.BuildTransferColumns()
cols, err := transfer.BuildTransferColumns(dbzAdapter)
if err != nil {
return fmt.Errorf("failed to build transfer columns: %w", err)
}
Expand Down

0 comments on commit 52e11e8

Please sign in to comment.