From 1e9d7da61e5549387961277ddbf185d4ecaf6854 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 26 Sep 2024 12:49:45 -0700 Subject: [PATCH] Implement MSSQL. --- config/mssql.go | 7 +++++++ config/mssql_test.go | 20 ++++++++++++++++++++ sources/mssql/adapter/adapter.go | 7 +++++++ 3 files changed, 34 insertions(+) diff --git a/config/mssql.go b/config/mssql.go index e39e2539..aa88ef5d 100644 --- a/config/mssql.go +++ b/config/mssql.go @@ -29,6 +29,8 @@ type MSSQLTable struct { OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"` OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"` ExcludeColumns []string `yaml:"excludeColumns,omitempty"` + // IncludeColumns - List of columns that should be included in the change event record. + IncludeColumns []string `yaml:"includeColumns,omitempty"` } func (m *MSSQL) ToDSN() string { @@ -95,6 +97,11 @@ func (m *MSSQL) Validate() error { if stringutil.Empty(table.Name, table.Schema) { return fmt.Errorf("table name and schema must be passed in") } + + // You should not be able to filter and exclude columns at the same time + if len(table.ExcludeColumns) > 0 && len(table.IncludeColumns) > 0 { + return fmt.Errorf("cannot exclude and include columns at the same time") + } } return nil diff --git a/config/mssql_test.go b/config/mssql_test.go index 54decdfa..3704169e 100644 --- a/config/mssql_test.go +++ b/config/mssql_test.go @@ -129,6 +129,26 @@ func TestMSSQL_Validate(t *testing.T) { assert.ErrorContains(t, m.Validate(), "table name and schema must be passed in") } + { + // Exclude and include columns at the same time + m := &MSSQL{ + Host: "host", + Port: 1, + Username: "username", + Password: "password", + Database: "database", + Tables: []*MSSQLTable{ + { + Name: "name", + Schema: "schema", + IncludeColumns: []string{"foo"}, + ExcludeColumns: []string{"bar"}, + }, + }, + } + + assert.ErrorContains(t, m.Validate(), "cannot exclude and include columns at the same time") + } { // Valid m := &MSSQL{ diff --git a/sources/mssql/adapter/adapter.go b/sources/mssql/adapter/adapter.go index 25f104b1..1e7cc738 100644 --- a/sources/mssql/adapter/adapter.go +++ b/sources/mssql/adapter/adapter.go @@ -31,11 +31,18 @@ func NewMSSQLAdapter(db *sql.DB, dbName string, tableCfg config.MSSQLTable) (MSS return MSSQLAdapter{}, fmt.Errorf("failed to load metadata for table %s.%s: %w", tableCfg.Schema, tableCfg.Name, err) } + // Exclude columns (if any) from the table metadata columns, err := column.FilterOutExcludedColumns(table.Columns(), tableCfg.ExcludeColumns, table.PrimaryKeys()) if err != nil { return MSSQLAdapter{}, err } + // Include columns (if any) from the table metadata + columns, err = column.FilterForIncludedColumns(columns, tableCfg.IncludeColumns, table.PrimaryKeys()) + if err != nil { + return MSSQLAdapter{}, err + } + fieldConverters := make([]transformer.FieldConverter, len(columns)) for i, col := range columns { converter, err := valueConverterForType(col.Type, col.Opts)