Skip to content

Commit

Permalink
Merge branch 'master' into default-values
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jan 13, 2025
2 parents 9802856 + 0872788 commit 8638a30
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 10 deletions.
2 changes: 1 addition & 1 deletion integration_tests/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func readTable(db *sql.DB, dbName, tableName string, batchSize int) ([]kafkalib.
BatchSize: uint(batchSize),
}

dbzAdapter, err := adapter.NewMySQLAdapter(db, dbName, tableCfg)
dbzAdapter, err := adapter.NewMySQLAdapter(db, dbName, tableCfg, nil)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions lib/mysql/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func GetCreateTableDDL(db *sql.DB, table string) (string, error) {
return createTableDDL, nil
}

func ParseColumnDataType(originalS string, defaultValue *string, optionalSQLMode []string) (DataType, *Opts, error) {
func ParseColumnDataType(originalS string, defaultValue *string, sqlMode []string) (DataType, *Opts, error) {
// Preserve the original value, so we can return the error message without the actual value being mutated.
s := strings.ToLower(originalS)
var metadata string
Expand Down Expand Up @@ -190,7 +190,7 @@ func ParseColumnDataType(originalS string, defaultValue *string, optionalSQLMode
return Float, nil, nil
case "real":
// https://dev.mysql.com/doc/refman/8.4/en/sql-mode.html#sqlmode_real_as_float
if slices.Contains(optionalSQLMode, "REAL_AS_FLOAT") {
if slices.Contains(sqlMode, "REAL_AS_FLOAT") {
return Float, nil, nil
}
return Double, nil, nil
Expand Down
7 changes: 6 additions & 1 deletion lib/mysql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Table struct {
PrimaryKeys []string
}

func LoadTable(db *sql.DB, name string) (*Table, error) {
func LoadTable(db *sql.DB, name string, sqlMode []string) (*Table, error) {
tbl := &Table{
Name: name,
}
Expand All @@ -43,8 +43,13 @@ func LoadTable(db *sql.DB, name string) (*Table, error) {
}

for _, col := range createTableResult.GetColumns() {
<<<<<<< HEAD

Check failure on line 46 in lib/mysql/table.go

View workflow job for this annotation

GitHub Actions / test

syntax error: unexpected <<, expected }

Check failure on line 46 in lib/mysql/table.go

View workflow job for this annotation

GitHub Actions / MySQL

syntax error: unexpected <<, expected }
// TODO: Pass in sql mode
dataType, opts, err := schema.ParseColumnDataType(col.DataType, col.DefaultValue, nil)
=======

Check failure on line 49 in lib/mysql/table.go

View workflow job for this annotation

GitHub Actions / test

syntax error: unexpected ==, expected }

Check failure on line 49 in lib/mysql/table.go

View workflow job for this annotation

GitHub Actions / MySQL

syntax error: unexpected ==, expected }
dataType, opts, err := schema.ParseColumnDataType(col.DataType, sqlMode)

>>>>>>> master
if err != nil {
return nil, fmt.Errorf("failed to parse column data type: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions sources/mysql/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ type MySQLAdapter struct {
scannerCfg scan.ScannerConfig
}

func NewMySQLAdapter(db *sql.DB, dbName string, tableCfg config.MySQLTable) (MySQLAdapter, error) {
func NewMySQLAdapter(db *sql.DB, dbName string, tableCfg config.MySQLTable, sqlMode []string) (MySQLAdapter, error) {
slog.Info("Loading metadata for table")
table, err := mysql.LoadTable(db, tableCfg.Name)
table, err := mysql.LoadTable(db, tableCfg.Name, sqlMode)
if err != nil {
return MySQLAdapter{}, fmt.Errorf("failed to load metadata for table %q: %w", tableCfg.Name, err)
}
Expand Down
6 changes: 5 additions & 1 deletion sources/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,9 @@ func Load(ctx context.Context, cfg config.MySQL) (sources.Source, bool, error) {
return stream, true, nil
}

return &Snapshot{cfg: cfg, db: db}, false, nil
return &Snapshot{
cfg: cfg,
db: db,
sqlMode: settings.SQLMode,
}, false, nil
}
7 changes: 4 additions & 3 deletions sources/mysql/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
)

type Snapshot struct {
cfg config.MySQL
db *sql.DB
cfg config.MySQL
db *sql.DB
sqlMode []string
}

func (s Snapshot) Close() error {
Expand All @@ -40,7 +41,7 @@ func (s Snapshot) snapshotTable(ctx context.Context, writer writers.Writer, tabl
logger := slog.With(slog.String("table", tableCfg.Name), slog.String("database", s.cfg.Database))
snapshotStartTime := time.Now()

dbzAdapter, err := adapter.NewMySQLAdapter(s.db, s.cfg.Database, tableCfg)
dbzAdapter, err := adapter.NewMySQLAdapter(s.db, s.cfg.Database, tableCfg, s.sqlMode)
if err != nil {
return fmt.Errorf("failed to create MySQL adapter: %w", err)
}
Expand Down

0 comments on commit 8638a30

Please sign in to comment.