diff --git a/lib/rdbms/column/column.go b/lib/rdbms/column/column.go index 0a79405e..42fc0c1a 100644 --- a/lib/rdbms/column/column.go +++ b/lib/rdbms/column/column.go @@ -49,3 +49,18 @@ func FilterOutExcludedColumns[T ~int, O any](columns []Column[T, O], excludeName } return result, nil } + +// FilterForIncludedColumns returns a list of columns including only those that match `includeNames`. +func FilterForIncludedColumns[T ~int, O any](columns []Column[T, O], includeNames []string) ([]Column[T, O], error) { + if len(includeNames) == 0 { + return columns, nil + } + + var result []Column[T, O] + for _, column := range columns { + if slices.Contains(includeNames, column.Name) { + result = append(result, column) + } + } + return result, nil +} diff --git a/lib/rdbms/column/column_test.go b/lib/rdbms/column/column_test.go index 405382f3..32672cc3 100644 --- a/lib/rdbms/column/column_test.go +++ b/lib/rdbms/column/column_test.go @@ -158,3 +158,24 @@ func TestFilterOutExcludedColumns(t *testing.T) { assert.ErrorContains(t, err, `cannot exclude primary key column "bar"`) } } + +func TestFilterForIncludedColumns(t *testing.T) { + { + // Empty `includeNames` + value, err := FilterForIncludedColumns([]mockColumn{{Name: "foo"}}, []string{}) + assert.NoError(t, err) + assert.Equal(t, value, []mockColumn{{Name: "foo"}}) + } + { + // Non-empty `includeNames`, included column is not in list + value, err := FilterForIncludedColumns([]mockColumn{{Name: "foo"}}, []string{"bar"}) + assert.NoError(t, err) + assert.Equal(t, value, []mockColumn(nil)) + } + { + // Non-empty `includeNames`, included column is in list + value, err := FilterForIncludedColumns([]mockColumn{{Name: "foo"}, {Name: "bar"}}, []string{"bar"}) + assert.NoError(t, err) + assert.Equal(t, value, []mockColumn{{Name: "bar"}}) + } +} diff --git a/sources/postgres/adapter/adapter.go b/sources/postgres/adapter/adapter.go index 96e0bf9d..5914c770 100644 --- a/sources/postgres/adapter/adapter.go +++ b/sources/postgres/adapter/adapter.go @@ -31,11 +31,18 @@ func NewPostgresAdapter(db *sql.DB, tableCfg config.PostgreSQLTable) (PostgresAd return PostgresAdapter{}, fmt.Errorf("failed to load metadata for table %s.%s: %w", tableCfg.Schema, tableCfg.Name, err) } + // Exclude columns (if any) from the table metadata columns, err := column.FilterOutExcludedColumns(table.Columns, tableCfg.ExcludeColumns, table.PrimaryKeys) if err != nil { return PostgresAdapter{}, err } + // Include columns (if any) from the table metadata + columns, err = column.FilterForIncludedColumns(columns, tableCfg.IncludeColumns) + if err != nil { + return PostgresAdapter{}, err + } + fieldConverters := make([]transformer.FieldConverter, len(columns)) for i, col := range columns { converter, err := valueConverterForType(col.Type, col.Opts)