From 77e0014bcc142a6cb0f47f363927594caa324f2b Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 21 Mar 2024 21:45:52 -0400 Subject: [PATCH] Fix MySQL Kafka Topic (#314) --- integration_tests/mysql/main.go | 18 +++++++++--------- sources/mysql/adapter/adapter.go | 10 ++++++---- sources/mysql/adapter/adapter_test.go | 8 ++++---- sources/mysql/snapshot.go | 4 ++-- 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/integration_tests/mysql/main.go b/integration_tests/mysql/main.go index 905ac645..3e4b3fec 100644 --- a/integration_tests/mysql/main.go +++ b/integration_tests/mysql/main.go @@ -49,22 +49,22 @@ func main() { logger.Fatal("Unable to change sql_mode", slog.Any("err", err)) } - if err = testTypes(db); err != nil { + if err = testTypes(db, mysqlCfg.Database); err != nil { logger.Fatal("Types test failed", slog.Any("err", err)) } - if err = testScan(db); err != nil { + if err = testScan(db, mysqlCfg.Database); err != nil { logger.Fatal("Scan test failed", slog.Any("err", err)) } } -func readTable(db *sql.DB, tableName string, batchSize int) ([]lib.RawMessage, error) { +func readTable(db *sql.DB, dbName, tableName string, batchSize int) ([]lib.RawMessage, error) { tableCfg := config.MySQLTable{ Name: tableName, BatchSize: uint(batchSize), } - dbzAdapter, err := adapter.NewMySQLAdapter(db, tableCfg) + dbzAdapter, err := adapter.NewMySQLAdapter(db, dbName, tableCfg) if err != nil { return nil, err } @@ -441,12 +441,12 @@ const expectedPayloadTemplate = `{ }` // testTypes checks that MySQL data types are handled correctly. -func testTypes(db *sql.DB) error { +func testTypes(db *sql.DB, dbName string) error { tempTableName, dropTableFunc := utils.CreateTemporaryTable(db, testTypesCreateTableQuery) defer dropTableFunc() // Check reading an empty table - _, err := readTable(db, tempTableName, 100) + _, err := readTable(db, dbName, tempTableName, 100) if err == nil { return fmt.Errorf("expected an error") } else if !errors.Is(err, rdbms.ErrNoPkValuesForEmptyTable) { @@ -458,7 +458,7 @@ func testTypes(db *sql.DB) error { return fmt.Errorf("unable to insert data: %w", err) } - rows, err := readTable(db, tempTableName, 100) + rows, err := readTable(db, dbName, tempTableName, 100) if err != nil { return err } @@ -526,7 +526,7 @@ INSERT INTO %s VALUES ` // testScan checks that we're fetching all the data from MySQL. -func testScan(db *sql.DB) error { +func testScan(db *sql.DB, dbName string) error { tempTableName, dropTableFunc := utils.CreateTemporaryTable(db, testScanCreateTableQuery) defer dropTableFunc() @@ -592,7 +592,7 @@ func testScan(db *sql.DB) error { for _, batchSize := range []int{1, 2, 5, 6, 24, 25, 26} { slog.Info(fmt.Sprintf("Testing scan with batch size of %d...", batchSize)) - rows, err := readTable(db, tempTableName, batchSize) + rows, err := readTable(db, dbName, tempTableName, batchSize) if err != nil { return err } diff --git a/sources/mysql/adapter/adapter.go b/sources/mysql/adapter/adapter.go index ccd129c1..55963b96 100644 --- a/sources/mysql/adapter/adapter.go +++ b/sources/mysql/adapter/adapter.go @@ -20,13 +20,14 @@ const defaultErrorRetries = 10 type mysqlAdapter struct { db *sql.DB + dbName string table mysql.Table columns []schema.Column fieldConverters []transformer.FieldConverter scannerCfg scan.ScannerConfig } -func NewMySQLAdapter(db *sql.DB, tableCfg config.MySQLTable) (mysqlAdapter, error) { +func NewMySQLAdapter(db *sql.DB, dbName string, tableCfg config.MySQLTable) (mysqlAdapter, error) { slog.Info("Loading metadata for table") table, err := mysql.LoadTable(db, tableCfg.Name) if err != nil { @@ -38,10 +39,10 @@ func NewMySQLAdapter(db *sql.DB, tableCfg config.MySQLTable) (mysqlAdapter, erro return mysqlAdapter{}, err } - return newMySQLAdapter(db, *table, columns, tableCfg.ToScannerConfig(defaultErrorRetries)) + return newMySQLAdapter(db, dbName, *table, columns, tableCfg.ToScannerConfig(defaultErrorRetries)) } -func newMySQLAdapter(db *sql.DB, table mysql.Table, columns []schema.Column, scannerCfg scan.ScannerConfig) (mysqlAdapter, error) { +func newMySQLAdapter(db *sql.DB, dbName string, table mysql.Table, columns []schema.Column, scannerCfg scan.ScannerConfig) (mysqlAdapter, error) { fieldConverters := make([]transformer.FieldConverter, len(columns)) for i, col := range columns { converter, err := valueConverterForType(col.Type, col.Opts) @@ -53,6 +54,7 @@ func newMySQLAdapter(db *sql.DB, table mysql.Table, columns []schema.Column, sca return mysqlAdapter{ db: db, + dbName: dbName, table: table, columns: columns, fieldConverters: fieldConverters, @@ -65,7 +67,7 @@ func (m mysqlAdapter) TableName() string { } func (m mysqlAdapter) TopicSuffix() string { - return strings.ReplaceAll(m.table.Name, `"`, ``) + return fmt.Sprintf("%s.%s", m.dbName, strings.ReplaceAll(m.table.Name, `"`, ``)) } func (m mysqlAdapter) FieldConverters() []transformer.FieldConverter { diff --git a/sources/mysql/adapter/adapter_test.go b/sources/mysql/adapter/adapter_test.go index 7249789a..5cefa00d 100644 --- a/sources/mysql/adapter/adapter_test.go +++ b/sources/mysql/adapter/adapter_test.go @@ -16,7 +16,7 @@ func TestMySQLAdapter_TableName(t *testing.T) { table := mysql.Table{ Name: "table1", } - adapter, err := newMySQLAdapter(nil, table, []schema.Column{}, scan.ScannerConfig{}) + adapter, err := newMySQLAdapter(nil, "foo", table, []schema.Column{}, scan.ScannerConfig{}) assert.NoError(t, err) assert.Equal(t, "table1", adapter.TableName()) } @@ -32,18 +32,18 @@ func TestMySQLAdapter_TopicSuffix(t *testing.T) { table: mysql.Table{ Name: "table1", }, - expected: "table1", + expected: "db.table1", }, { table: mysql.Table{ Name: `"PublicStatus"`, }, - expected: "PublicStatus", + expected: "db.PublicStatus", }, } for _, tc := range tcs { - adapter, err := newMySQLAdapter(nil, tc.table, []schema.Column{}, scan.ScannerConfig{}) + adapter, err := newMySQLAdapter(nil, "db", tc.table, []schema.Column{}, scan.ScannerConfig{}) assert.NoError(t, err) assert.Equal(t, tc.expected, adapter.TopicSuffix()) } diff --git a/sources/mysql/snapshot.go b/sources/mysql/snapshot.go index fd05d58c..57d66f69 100644 --- a/sources/mysql/snapshot.go +++ b/sources/mysql/snapshot.go @@ -47,10 +47,10 @@ func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter) error { } func (s Source) snapshotTable(ctx context.Context, writer kafkalib.BatchWriter, tableCfg config.MySQLTable) error { - logger := slog.With(slog.String("table", tableCfg.Name)) + logger := slog.With(slog.String("table", tableCfg.Name), slog.String("database", s.cfg.Database)) snapshotStartTime := time.Now() - adapter, err := adapter.NewMySQLAdapter(s.db, tableCfg) + adapter, err := adapter.NewMySQLAdapter(s.db, s.cfg.Database, tableCfg) if err != nil { return fmt.Errorf("failed to create MySQL adapter: %w", err) }