From 512725109f5e156a83dabda3bfd1d694082a44b9 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 26 Sep 2024 12:48:01 -0700 Subject: [PATCH 1/2] Adding MySQL. --- config/mysql.go | 7 +++++++ config/mysql_test.go | 11 +++++++++++ sources/mysql/adapter/adapter.go | 7 +++++++ 3 files changed, 25 insertions(+) diff --git a/config/mysql.go b/config/mysql.go index 11f67b31..fede9503 100644 --- a/config/mysql.go +++ b/config/mysql.go @@ -39,6 +39,8 @@ type MySQLTable struct { OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"` OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"` ExcludeColumns []string `yaml:"excludeColumns,omitempty"` + // IncludeColumns - List of columns that should be included in the change event record. + IncludeColumns []string `yaml:"includeColumns,omitempty"` } func (m *MySQLTable) GetBatchSize() uint { @@ -91,6 +93,11 @@ func (m *MySQL) Validate() error { if table.Name == "" { return fmt.Errorf("table name must be passed in") } + + // You should not be able to filter and exclude columns at the same time + if len(table.ExcludeColumns) > 0 && len(table.IncludeColumns) > 0 { + return fmt.Errorf("cannot exclude and include columns at the same time") + } } return nil diff --git a/config/mysql_test.go b/config/mysql_test.go index 7bcce090..75372b5c 100644 --- a/config/mysql_test.go +++ b/config/mysql_test.go @@ -96,6 +96,17 @@ func TestMySQL_Validate(t *testing.T) { c.Tables = append(c.Tables, &MySQLTable{}) assert.ErrorContains(t, c.Validate(), "table name must be passed in") } + { + // exclude and include at the same time + c := createValidConfig() + c.Tables = append(c.Tables, &MySQLTable{ + Name: "foo", + IncludeColumns: []string{"foo"}, + ExcludeColumns: []string{"bar"}, + }) + + assert.ErrorContains(t, c.Validate(), "cannot exclude and include columns at the same time") + } } func TestMySQL_ToDSN(t *testing.T) { diff --git a/sources/mysql/adapter/adapter.go b/sources/mysql/adapter/adapter.go index 565a8647..793557ed 100644 --- a/sources/mysql/adapter/adapter.go +++ b/sources/mysql/adapter/adapter.go @@ -33,11 +33,18 @@ func NewMySQLAdapter(db *sql.DB, dbName string, tableCfg config.MySQLTable) (MyS return MySQLAdapter{}, fmt.Errorf("failed to load metadata for table %q: %w", tableCfg.Name, err) } + // Exclude columns (if any) from the table metadata columns, err := column.FilterOutExcludedColumns(table.Columns, tableCfg.ExcludeColumns, table.PrimaryKeys) if err != nil { return MySQLAdapter{}, err } + // Include columns (if any) from the table metadata + columns, err = column.FilterForIncludedColumns(columns, tableCfg.IncludeColumns, table.PrimaryKeys) + if err != nil { + return MySQLAdapter{}, err + } + return newMySQLAdapter(db, dbName, *table, columns, tableCfg.ToScannerConfig(defaultErrorRetries)) } From 1e9d7da61e5549387961277ddbf185d4ecaf6854 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 26 Sep 2024 12:49:45 -0700 Subject: [PATCH 2/2] Implement MSSQL. --- config/mssql.go | 7 +++++++ config/mssql_test.go | 20 ++++++++++++++++++++ sources/mssql/adapter/adapter.go | 7 +++++++ 3 files changed, 34 insertions(+) diff --git a/config/mssql.go b/config/mssql.go index e39e2539..aa88ef5d 100644 --- a/config/mssql.go +++ b/config/mssql.go @@ -29,6 +29,8 @@ type MSSQLTable struct { OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"` OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"` ExcludeColumns []string `yaml:"excludeColumns,omitempty"` + // IncludeColumns - List of columns that should be included in the change event record. + IncludeColumns []string `yaml:"includeColumns,omitempty"` } func (m *MSSQL) ToDSN() string { @@ -95,6 +97,11 @@ func (m *MSSQL) Validate() error { if stringutil.Empty(table.Name, table.Schema) { return fmt.Errorf("table name and schema must be passed in") } + + // You should not be able to filter and exclude columns at the same time + if len(table.ExcludeColumns) > 0 && len(table.IncludeColumns) > 0 { + return fmt.Errorf("cannot exclude and include columns at the same time") + } } return nil diff --git a/config/mssql_test.go b/config/mssql_test.go index 54decdfa..3704169e 100644 --- a/config/mssql_test.go +++ b/config/mssql_test.go @@ -129,6 +129,26 @@ func TestMSSQL_Validate(t *testing.T) { assert.ErrorContains(t, m.Validate(), "table name and schema must be passed in") } + { + // Exclude and include columns at the same time + m := &MSSQL{ + Host: "host", + Port: 1, + Username: "username", + Password: "password", + Database: "database", + Tables: []*MSSQLTable{ + { + Name: "name", + Schema: "schema", + IncludeColumns: []string{"foo"}, + ExcludeColumns: []string{"bar"}, + }, + }, + } + + assert.ErrorContains(t, m.Validate(), "cannot exclude and include columns at the same time") + } { // Valid m := &MSSQL{ diff --git a/sources/mssql/adapter/adapter.go b/sources/mssql/adapter/adapter.go index 25f104b1..1e7cc738 100644 --- a/sources/mssql/adapter/adapter.go +++ b/sources/mssql/adapter/adapter.go @@ -31,11 +31,18 @@ func NewMSSQLAdapter(db *sql.DB, dbName string, tableCfg config.MSSQLTable) (MSS return MSSQLAdapter{}, fmt.Errorf("failed to load metadata for table %s.%s: %w", tableCfg.Schema, tableCfg.Name, err) } + // Exclude columns (if any) from the table metadata columns, err := column.FilterOutExcludedColumns(table.Columns(), tableCfg.ExcludeColumns, table.PrimaryKeys()) if err != nil { return MSSQLAdapter{}, err } + // Include columns (if any) from the table metadata + columns, err = column.FilterForIncludedColumns(columns, tableCfg.IncludeColumns, table.PrimaryKeys()) + if err != nil { + return MSSQLAdapter{}, err + } + fieldConverters := make([]transformer.FieldConverter, len(columns)) for i, col := range columns { converter, err := valueConverterForType(col.Type, col.Opts)