Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Column allowlist for MySQL and MSSQL #501

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type MSSQLTable struct {
OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"`
OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"`
ExcludeColumns []string `yaml:"excludeColumns,omitempty"`
// IncludeColumns - List of columns that should be included in the change event record.
IncludeColumns []string `yaml:"includeColumns,omitempty"`
}

func (m *MSSQL) ToDSN() string {
Expand Down Expand Up @@ -95,6 +97,11 @@ func (m *MSSQL) Validate() error {
if stringutil.Empty(table.Name, table.Schema) {
return fmt.Errorf("table name and schema must be passed in")
}

// You should not be able to filter and exclude columns at the same time
if len(table.ExcludeColumns) > 0 && len(table.IncludeColumns) > 0 {
return fmt.Errorf("cannot exclude and include columns at the same time")
}
}

return nil
Expand Down
20 changes: 20 additions & 0 deletions config/mssql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,26 @@ func TestMSSQL_Validate(t *testing.T) {

assert.ErrorContains(t, m.Validate(), "table name and schema must be passed in")
}
{
// Exclude and include columns at the same time
m := &MSSQL{
Host: "host",
Port: 1,
Username: "username",
Password: "password",
Database: "database",
Tables: []*MSSQLTable{
{
Name: "name",
Schema: "schema",
IncludeColumns: []string{"foo"},
ExcludeColumns: []string{"bar"},
},
},
}

assert.ErrorContains(t, m.Validate(), "cannot exclude and include columns at the same time")
}
{
// Valid
m := &MSSQL{
Expand Down
7 changes: 7 additions & 0 deletions config/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type MySQLTable struct {
OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"`
OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"`
ExcludeColumns []string `yaml:"excludeColumns,omitempty"`
// IncludeColumns - List of columns that should be included in the change event record.
IncludeColumns []string `yaml:"includeColumns,omitempty"`
}

func (m *MySQLTable) GetBatchSize() uint {
Expand Down Expand Up @@ -91,6 +93,11 @@ func (m *MySQL) Validate() error {
if table.Name == "" {
return fmt.Errorf("table name must be passed in")
}

// You should not be able to filter and exclude columns at the same time
if len(table.ExcludeColumns) > 0 && len(table.IncludeColumns) > 0 {
return fmt.Errorf("cannot exclude and include columns at the same time")
}
}

return nil
Expand Down
11 changes: 11 additions & 0 deletions config/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ func TestMySQL_Validate(t *testing.T) {
c.Tables = append(c.Tables, &MySQLTable{})
assert.ErrorContains(t, c.Validate(), "table name must be passed in")
}
{
// exclude and include at the same time
c := createValidConfig()
c.Tables = append(c.Tables, &MySQLTable{
Name: "foo",
IncludeColumns: []string{"foo"},
ExcludeColumns: []string{"bar"},
})

assert.ErrorContains(t, c.Validate(), "cannot exclude and include columns at the same time")
}
}

func TestMySQL_ToDSN(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions sources/mssql/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ func NewMSSQLAdapter(db *sql.DB, dbName string, tableCfg config.MSSQLTable) (MSS
return MSSQLAdapter{}, fmt.Errorf("failed to load metadata for table %s.%s: %w", tableCfg.Schema, tableCfg.Name, err)
}

// Exclude columns (if any) from the table metadata
columns, err := column.FilterOutExcludedColumns(table.Columns(), tableCfg.ExcludeColumns, table.PrimaryKeys())
if err != nil {
return MSSQLAdapter{}, err
}

// Include columns (if any) from the table metadata
columns, err = column.FilterForIncludedColumns(columns, tableCfg.IncludeColumns, table.PrimaryKeys())
if err != nil {
return MSSQLAdapter{}, err
}

fieldConverters := make([]transformer.FieldConverter, len(columns))
for i, col := range columns {
converter, err := valueConverterForType(col.Type, col.Opts)
Expand Down
7 changes: 7 additions & 0 deletions sources/mysql/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ func NewMySQLAdapter(db *sql.DB, dbName string, tableCfg config.MySQLTable) (MyS
return MySQLAdapter{}, fmt.Errorf("failed to load metadata for table %q: %w", tableCfg.Name, err)
}

// Exclude columns (if any) from the table metadata
columns, err := column.FilterOutExcludedColumns(table.Columns, tableCfg.ExcludeColumns, table.PrimaryKeys)
if err != nil {
return MySQLAdapter{}, err
}

// Include columns (if any) from the table metadata
columns, err = column.FilterForIncludedColumns(columns, tableCfg.IncludeColumns, table.PrimaryKeys)
if err != nil {
return MySQLAdapter{}, err
}

return newMySQLAdapter(db, dbName, *table, columns, tableCfg.ToScannerConfig(defaultErrorRetries))
}

Expand Down
Loading