Skip to content

Commit

Permalink
Implement MSSQL.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 26, 2024
1 parent 5127251 commit 1e9d7da
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 0 deletions.
7 changes: 7 additions & 0 deletions config/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type MSSQLTable struct {
OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"`
OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"`
ExcludeColumns []string `yaml:"excludeColumns,omitempty"`
// IncludeColumns - List of columns that should be included in the change event record.
IncludeColumns []string `yaml:"includeColumns,omitempty"`
}

func (m *MSSQL) ToDSN() string {
Expand Down Expand Up @@ -95,6 +97,11 @@ func (m *MSSQL) Validate() error {
if stringutil.Empty(table.Name, table.Schema) {
return fmt.Errorf("table name and schema must be passed in")
}

// You should not be able to filter and exclude columns at the same time
if len(table.ExcludeColumns) > 0 && len(table.IncludeColumns) > 0 {
return fmt.Errorf("cannot exclude and include columns at the same time")
}
}

return nil
Expand Down
20 changes: 20 additions & 0 deletions config/mssql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,26 @@ func TestMSSQL_Validate(t *testing.T) {

assert.ErrorContains(t, m.Validate(), "table name and schema must be passed in")
}
{
// Exclude and include columns at the same time
m := &MSSQL{
Host: "host",
Port: 1,
Username: "username",
Password: "password",
Database: "database",
Tables: []*MSSQLTable{
{
Name: "name",
Schema: "schema",
IncludeColumns: []string{"foo"},
ExcludeColumns: []string{"bar"},
},
},
}

assert.ErrorContains(t, m.Validate(), "cannot exclude and include columns at the same time")
}
{
// Valid
m := &MSSQL{
Expand Down
7 changes: 7 additions & 0 deletions sources/mssql/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ func NewMSSQLAdapter(db *sql.DB, dbName string, tableCfg config.MSSQLTable) (MSS
return MSSQLAdapter{}, fmt.Errorf("failed to load metadata for table %s.%s: %w", tableCfg.Schema, tableCfg.Name, err)
}

// Exclude columns (if any) from the table metadata
columns, err := column.FilterOutExcludedColumns(table.Columns(), tableCfg.ExcludeColumns, table.PrimaryKeys())
if err != nil {
return MSSQLAdapter{}, err
}

// Include columns (if any) from the table metadata
columns, err = column.FilterForIncludedColumns(columns, tableCfg.IncludeColumns, table.PrimaryKeys())
if err != nil {
return MSSQLAdapter{}, err
}

fieldConverters := make([]transformer.FieldConverter, len(columns))
for i, col := range columns {
converter, err := valueConverterForType(col.Type, col.Opts)
Expand Down

0 comments on commit 1e9d7da

Please sign in to comment.