Skip to content

Commit

Permalink
[debezium] Add NewIterator() to Adapter interface (#236)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Mar 6, 2024
1 parent 182e8c7 commit e229140
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 76 deletions.
17 changes: 1 addition & 16 deletions integration_tests/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/integration_tests/utils"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/debezium"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/rdbms"
"github.com/artie-labs/reader/sources/mysql/adapter"
Expand Down Expand Up @@ -62,21 +61,7 @@ func readTable(db *sql.DB, tableName string, batchSize int) ([]lib.RawMessage, e
return nil, err
}

scanner, err := dbzAdapter.NewIterator()
if err != nil {
return nil, fmt.Errorf("failed to build scanner: %w", err)
}

dbzTransformer := debezium.NewDebeziumTransformer(dbzAdapter, &scanner)
rows := []lib.RawMessage{}
for dbzTransformer.HasNext() {
batch, err := dbzTransformer.Next()
if err != nil {
logger.Fatal("Failed to get batch", slog.Any("err", err))
}
rows = append(rows, batch...)
}
return rows, nil
return utils.ReadTable(db, dbzAdapter)
}

const testTypesCreateTableQuery = `
Expand Down
16 changes: 1 addition & 15 deletions integration_tests/postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/integration_tests/utils"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/debezium"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/rdbms"
"github.com/artie-labs/reader/sources/postgres/adapter"
Expand Down Expand Up @@ -68,20 +67,7 @@ func readTable(db *sql.DB, tableName string, batchSize int) ([]lib.RawMessage, e
return nil, err
}

scanner, err := dbzAdapter.NewIterator()
if err != nil {
return nil, fmt.Errorf("failed to build scanner: %w", err)
}
dbzTransformer := debezium.NewDebeziumTransformer(dbzAdapter, &scanner)
rows := []lib.RawMessage{}
for dbzTransformer.HasNext() {
batch, err := dbzTransformer.Next()
if err != nil {
logger.Fatal("Failed to get batch", slog.Any("err", err))
}
rows = append(rows, batch...)
}
return rows, nil
return utils.ReadTable(db, dbzAdapter)
}

const testTypesCreateTableQuery = `
Expand Down
18 changes: 18 additions & 0 deletions integration_tests/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/debezium"
"github.com/artie-labs/transfer/lib/cdc/util"
)

Expand All @@ -25,6 +26,23 @@ func CreateTemporaryTable(db *sql.DB, query string) (string, func()) {
}
}

func ReadTable(db *sql.DB, dbzAdapter debezium.Adapter) ([]lib.RawMessage, error) {
dbzTransformer, err := debezium.NewDebeziumTransformer(dbzAdapter)
if err != nil {
return nil, err
}

rows := []lib.RawMessage{}
for dbzTransformer.HasNext() {
batch, err := dbzTransformer.Next()
if err != nil {
return nil, err
}
rows = append(rows, batch...)
}
return rows, nil
}

func GetPayload(message lib.RawMessage) util.SchemaEventPayload {
payloadTyped, ok := message.GetPayload().(util.SchemaEventPayload)
if !ok {
Expand Down
31 changes: 21 additions & 10 deletions lib/debezium/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,37 @@ import (
"github.com/artie-labs/reader/lib"
)

type Row = map[string]any

type RowsIterator interface {
HasNext() bool
Next() ([]Row, error)
}

type Adapter interface {
TableName() string
TopicSuffix() string
PartitionKey(row map[string]any) map[string]any
PartitionKey(row Row) map[string]any
Fields() []debezium.Field
ConvertRowToDebezium(row map[string]any) (map[string]any, error)
NewIterator() (RowsIterator, error)
ConvertRowToDebezium(row Row) (Row, error)
}

type DebeziumTransformer struct {
adapter Adapter
schema debezium.Schema
iter batchRowIterator
iter RowsIterator
}

func NewDebeziumTransformer(adapter Adapter, iter batchRowIterator) *DebeziumTransformer {
func NewDebeziumTransformer(adapter Adapter) (*DebeziumTransformer, error) {
iter, err := adapter.NewIterator()
if err != nil {
return nil, fmt.Errorf("failed to create iterator :%w", err)
}
return NewDebeziumTransformerWithIterator(adapter, iter), nil
}

func NewDebeziumTransformerWithIterator(adapter Adapter, iter RowsIterator) *DebeziumTransformer {
schema := debezium.Schema{
FieldsObject: []debezium.FieldsObject{{
Fields: adapter.Fields(),
Expand All @@ -41,11 +57,6 @@ func NewDebeziumTransformer(adapter Adapter, iter batchRowIterator) *DebeziumTra
}
}

type batchRowIterator interface {
HasNext() bool
Next() ([]map[string]any, error)
}

func (d *DebeziumTransformer) HasNext() bool {
return d != nil && d.iter.HasNext()
}
Expand All @@ -72,7 +83,7 @@ func (d *DebeziumTransformer) Next() ([]lib.RawMessage, error) {
return result, nil
}

func (d *DebeziumTransformer) createPayload(row map[string]any) (util.SchemaEventPayload, error) {
func (d *DebeziumTransformer) createPayload(row Row) (util.SchemaEventPayload, error) {
dbzRow, err := d.adapter.ConvertRowToDebezium(row)
if err != nil {
return util.SchemaEventPayload{}, fmt.Errorf("failed to convert row to Debezium: %w", err)
Expand Down
26 changes: 18 additions & 8 deletions lib/debezium/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type mockAdatper struct {
partitionKeys []string
fields []debezium.Field
iter RowsIterator
}

func (m mockAdatper) TableName() string {
Expand All @@ -34,6 +35,10 @@ func (m mockAdatper) Fields() []debezium.Field {
return m.fields
}

func (m mockAdatper) NewIterator() (RowsIterator, error) {
return m.iter, nil
}

func (m mockAdatper) ConvertRowToDebezium(row map[string]any) (map[string]any, error) {
newRow := make(map[string]any)
for key, value := range row {
Expand Down Expand Up @@ -63,7 +68,8 @@ func (m *mockIterator) Next() ([]map[string]any, error) {
func TestDebeziumTransformer_Iteration(t *testing.T) {
{
// Empty iterator
transformer := NewDebeziumTransformer(mockAdatper{}, &mockIterator{})
transformer, err := NewDebeziumTransformer(mockAdatper{iter: &mockIterator{}})
assert.NoError(t, err)
assert.False(t, transformer.HasNext())
rows, err := transformer.Next()
assert.NoError(t, err)
Expand All @@ -72,7 +78,8 @@ func TestDebeziumTransformer_Iteration(t *testing.T) {
{
// One empty batch
batches := [][]map[string]any{{}}
transformer := NewDebeziumTransformer(mockAdatper{}, &mockIterator{batches: batches})
transformer, err := NewDebeziumTransformer(mockAdatper{iter: &mockIterator{batches: batches}})
assert.NoError(t, err)
assert.True(t, transformer.HasNext())
rows, err := transformer.Next()
assert.NoError(t, err)
Expand All @@ -88,7 +95,8 @@ func TestDebeziumTransformer_Iteration(t *testing.T) {
batches := [][]map[string]any{{
{"foo": "bar", "qux": "quux"},
}}
transformer := NewDebeziumTransformer(mockAdatper{}, &mockIterator{batches: batches})
transformer, err := NewDebeziumTransformer(mockAdatper{iter: &mockIterator{batches: batches}})
assert.NoError(t, err)
// First batch
assert.True(t, transformer.HasNext())
rows, err := transformer.Next()
Expand All @@ -115,7 +123,8 @@ func TestDebeziumTransformer_Iteration(t *testing.T) {
{"corge": "grault", "garply": "waldo"},
},
}
transformer := NewDebeziumTransformer(mockAdatper{}, &mockIterator{batches: batches})
transformer, err := NewDebeziumTransformer(mockAdatper{iter: &mockIterator{batches: batches}})
assert.NoError(t, err)
// First batch
assert.True(t, transformer.HasNext())
rows, err := transformer.Next()
Expand Down Expand Up @@ -152,10 +161,10 @@ func TestDebeziumTransformer_Next(t *testing.T) {
batches := [][]map[string]any{{
{"foo": "bar", "qux": 12, "baz": "corge"},
}}
transformer := NewDebeziumTransformer(
mockAdatper{fields: fields, partitionKeys: []string{"foo", "qux"}},
&mockIterator{batches: batches},
transformer, err := NewDebeziumTransformer(
mockAdatper{fields: fields, partitionKeys: []string{"foo", "qux"}, iter: &mockIterator{batches: batches}},
)
assert.NoError(t, err)
assert.True(t, transformer.HasNext())
rows, err := transformer.Next()
assert.NoError(t, err)
Expand Down Expand Up @@ -195,7 +204,8 @@ func TestDebeziumTransformer_CreatePayload(t *testing.T) {
{Type: "int"},
}

transformer := NewDebeziumTransformer(mockAdatper{fields: fields}, nil)
transformer, err := NewDebeziumTransformer(mockAdatper{fields: fields, iter: &mockIterator{}})
assert.NoError(t, err)
payload, err := transformer.createPayload(map[string]any{"foo": "bar", "qux": "quux"})
assert.NoError(t, err)
payload.Payload.Source.TsMs = 12345 // Modify source time since it'll be ~now
Expand Down
4 changes: 2 additions & 2 deletions lib/mysql/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"github.com/artie-labs/reader/lib/rdbms/scan"
)

func NewScanner(db *sql.DB, table mysql.Table, cfg scan.ScannerConfig) (scan.Scanner, error) {
func NewScanner(db *sql.DB, table mysql.Table, cfg scan.ScannerConfig) (*scan.Scanner, error) {
primaryKeyBounds, err := table.GetPrimaryKeysBounds(db)
if err != nil {
return scan.Scanner{}, err
return nil, err
}

adapter := scanAdapter{tableName: table.Name, columns: table.Columns}
Expand Down
4 changes: 2 additions & 2 deletions lib/postgres/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ func convertToStringForQuery(value any, dataType schema.DataType) (string, error
}
}

func NewScanner(db *sql.DB, table Table, cfg scan.ScannerConfig) (scan.Scanner, error) {
func NewScanner(db *sql.DB, table Table, cfg scan.ScannerConfig) (*scan.Scanner, error) {
primaryKeyBounds, err := table.GetPrimaryKeysBounds(db)
if err != nil {
return scan.Scanner{}, err
return nil, err
}

adapter := scanAdapter{schema: table.Schema, tableName: table.Name, columns: table.Columns}
Expand Down
8 changes: 4 additions & 4 deletions lib/rdbms/scan/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ type Scanner struct {
done bool
}

func NewScanner(db *sql.DB, _primaryKeys []primary_key.Key, cfg ScannerConfig, adapter ScanAdapter) (Scanner, error) {
func NewScanner(db *sql.DB, _primaryKeys []primary_key.Key, cfg ScannerConfig, adapter ScanAdapter) (*Scanner, error) {
primaryKeys := primary_key.NewKeys(_primaryKeys)
if err := primaryKeys.LoadValues(cfg.OptionalStartingValues, cfg.OptionalEndingValues); err != nil {
return Scanner{}, fmt.Errorf("failed to override primary key values: %w", err)
return nil, fmt.Errorf("failed to override primary key values: %w", err)
}

retryCfg, err := retry.NewJitterRetryConfig(jitterBaseMs, jitterMaxMs, cfg.ErrorRetries, retry.AlwaysRetry)
if err != nil {
return Scanner{}, fmt.Errorf("failed to build retry config: %w", err)
return nil, fmt.Errorf("failed to build retry config: %w", err)
}

return Scanner{
return &Scanner{
db: db,
batchSize: cfg.BatchSize,
retryCfg: retryCfg,
Expand Down
11 changes: 6 additions & 5 deletions sources/mysql/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"log/slog"
"strings"

"github.com/artie-labs/transfer/lib/debezium"
transferDbz "github.com/artie-labs/transfer/lib/debezium"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/debezium"
"github.com/artie-labs/reader/lib/debezium/converters"
"github.com/artie-labs/reader/lib/mysql"
"github.com/artie-labs/reader/lib/mysql/scanner"
Expand All @@ -21,7 +22,7 @@ const defaultErrorRetries = 10
type mysqlAdapter struct {
db *sql.DB
table mysql.Table
fields []debezium.Field
fields []transferDbz.Field
scannerCfg scan.ScannerConfig
rowConverter converters.RowConverter
}
Expand All @@ -37,7 +38,7 @@ func NewMySQLAdapter(db *sql.DB, tableCfg config.MySQLTable) (mysqlAdapter, erro
}

func newMySQLAdapter(db *sql.DB, table mysql.Table, scannerCfg scan.ScannerConfig) (mysqlAdapter, error) {
fields := make([]debezium.Field, len(table.Columns))
fields := make([]transferDbz.Field, len(table.Columns))
valueConverters := map[string]converters.ValueConverter{}
for i, col := range table.Columns {
converter, err := valueConverterForType(col.Type, col.Opts)
Expand Down Expand Up @@ -65,11 +66,11 @@ func (m mysqlAdapter) TopicSuffix() string {
return strings.ReplaceAll(m.table.Name, `"`, ``)
}

func (m mysqlAdapter) Fields() []debezium.Field {
func (m mysqlAdapter) Fields() []transferDbz.Field {
return m.fields
}

func (m mysqlAdapter) NewIterator() (scan.Scanner, error) {
func (m mysqlAdapter) NewIterator() (debezium.RowsIterator, error) {
return scanner.NewScanner(m.db, m.table, m.scannerCfg)
}

Expand Down
4 changes: 1 addition & 3 deletions sources/mysql/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s Source) snapshotTable(ctx context.Context, writer kafkalib.BatchWriter,
return fmt.Errorf("failed to create MySQL adapter: %w", err)
}

scanner, err := adapter.NewIterator()
dbzTransformer, err := debezium.NewDebeziumTransformer(adapter)
if err != nil {
if errors.Is(err, rdbms.ErrNoPkValuesForEmptyTable) {
logger.Info("Table does not contain any rows, skipping...")
Expand All @@ -66,8 +66,6 @@ func (s Source) snapshotTable(ctx context.Context, writer kafkalib.BatchWriter,
}

logger.Info("Scanning table", slog.Any("batchSize", tableCfg.BatchSize))

dbzTransformer := debezium.NewDebeziumTransformer(adapter, &scanner)
count, err := writer.WriteIterator(ctx, dbzTransformer)
if err != nil {
return fmt.Errorf("failed to snapshot for table %s: %w", tableCfg.Name, err)
Expand Down
9 changes: 5 additions & 4 deletions sources/postgres/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"log/slog"
"strings"

"github.com/artie-labs/transfer/lib/debezium"
transferDbz "github.com/artie-labs/transfer/lib/debezium"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/debezium"
"github.com/artie-labs/reader/lib/postgres"
"github.com/artie-labs/reader/lib/rdbms/scan"
)
Expand Down Expand Up @@ -43,15 +44,15 @@ func (p postgresAdapter) TopicSuffix() string {
return fmt.Sprintf("%s.%s", p.table.Schema, strings.ReplaceAll(p.table.Name, `"`, ``))
}

func (p postgresAdapter) Fields() []debezium.Field {
fields := make([]debezium.Field, len(p.table.Columns))
func (p postgresAdapter) Fields() []transferDbz.Field {
fields := make([]transferDbz.Field, len(p.table.Columns))
for i, col := range p.table.Columns {
fields[i] = ColumnToField(col)
}
return fields
}

func (p postgresAdapter) NewIterator() (scan.Scanner, error) {
func (p postgresAdapter) NewIterator() (debezium.RowsIterator, error) {
return postgres.NewScanner(p.db, p.table, p.scannerCfg)
}

Expand Down
Loading

0 comments on commit e229140

Please sign in to comment.