diff --git a/lib/postgres/columns.go b/lib/postgres/columns.go index 2ad52a69..6c8916ac 100644 --- a/lib/postgres/columns.go +++ b/lib/postgres/columns.go @@ -42,7 +42,7 @@ func (t *Table) RetrieveColumns(db *sql.DB) error { slog.String("colKind", colKind), ) } else { - t.Config.Fields.AddField(colName, dataType, opts) + t.Fields.AddField(colName, dataType, opts) } } @@ -60,7 +60,7 @@ func (t *Table) RetrieveColumns(db *sql.DB) error { for _, column := range columns { // Add to original columns before mutation t.OriginalColumns = append(t.OriginalColumns, column) - columnKind := t.Config.Fields.GetDataType(column) + columnKind := t.Fields.GetDataType(column) t.ColumnsCastedForScanning = append(t.ColumnsCastedForScanning, castColumn(column, columnKind)) } diff --git a/lib/postgres/config.go b/lib/postgres/config.go deleted file mode 100644 index 6d03d7de..00000000 --- a/lib/postgres/config.go +++ /dev/null @@ -1,13 +0,0 @@ -package postgres - -import ( - "github.com/artie-labs/reader/lib/postgres/debezium" -) - -type Config struct { - Fields *debezium.Fields -} - -func NewPostgresConfig() *Config { - return &Config{Fields: debezium.NewFields()} -} diff --git a/lib/postgres/config_test.go b/lib/postgres/config_test.go index 80fe2f59..9ace91fa 100644 --- a/lib/postgres/config_test.go +++ b/lib/postgres/config_test.go @@ -5,6 +5,8 @@ import ( "github.com/artie-labs/transfer/lib/debezium" "github.com/stretchr/testify/assert" + + pgDebezium "github.com/artie-labs/reader/lib/postgres/debezium" ) func TestPostgresConfig_Complete(t *testing.T) { @@ -147,15 +149,15 @@ func TestPostgresConfig_Complete(t *testing.T) { } for _, testCase := range testCases { - cfg := NewPostgresConfig() + fields := pgDebezium.NewFields() // TODO: Add test for hstore dataType, opts := colKindToDataType(testCase.colKind, nil, nil, nil) - cfg.Fields.AddField(testCase.colName, dataType, opts) + fields.AddField(testCase.colName, dataType, opts) - actualEscCol := castColumn(testCase.colName, cfg.Fields.GetDataType(testCase.colName)) + actualEscCol := castColumn(testCase.colName, fields.GetDataType(testCase.colName)) assert.Equal(t, testCase.expectedEscColString, actualEscCol, testCase.name) - field, isOk := cfg.Fields.GetField(testCase.colName) + field, isOk := fields.GetField(testCase.colName) assert.True(t, isOk, testCase.name) assert.Equal(t, testCase.expectedField, field, testCase.name) } diff --git a/lib/postgres/message.go b/lib/postgres/message.go index 2e823f6c..08f7ac52 100644 --- a/lib/postgres/message.go +++ b/lib/postgres/message.go @@ -58,7 +58,7 @@ func (m *MessageBuilder) Next() ([]lib.RawMessage, error) { payload, err := debezium.NewPayload(&debezium.NewArgs{ TableName: m.table.Name, Columns: m.table.OriginalColumns, - Fields: m.table.Config.Fields, + Fields: m.table.Fields, RowData: row, }) if err != nil { diff --git a/lib/postgres/parse.go b/lib/postgres/parse.go index ef914afd..1cd1ef74 100644 --- a/lib/postgres/parse.go +++ b/lib/postgres/parse.go @@ -39,13 +39,13 @@ func NewValueWrapper(value interface{}) ValueWrapper { } } -func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) { +func ParseValue(fields *debezium.Fields, args ParseValueArgs) (ValueWrapper, error) { // If the value is nil, or already parsed - just return. if args.Value() == nil || args.ValueWrapper.parsed { return args.ValueWrapper, nil } - colKind := c.Fields.GetDataType(args.ColName) + colKind := fields.GetDataType(args.ColName) switch colKind { case debezium.Geometry, debezium.Geography: valString, isOk := args.Value().(string) diff --git a/lib/postgres/parse_test.go b/lib/postgres/parse_test.go index 3fd805ec..89f54ef0 100644 --- a/lib/postgres/parse_test.go +++ b/lib/postgres/parse_test.go @@ -5,8 +5,9 @@ import ( "time" "github.com/artie-labs/transfer/lib/ptr" - "github.com/stretchr/testify/assert" + + pgDebezium "github.com/artie-labs/reader/lib/postgres/debezium" ) func TestParse(t *testing.T) { @@ -94,11 +95,11 @@ func TestParse(t *testing.T) { } for _, tc := range tcs { - cfg := NewPostgresConfig() + fields := pgDebezium.NewFields() dataType, opts := colKindToDataType(tc.colKind, nil, nil, tc.udtName) - cfg.Fields.AddField(tc.colName, dataType, opts) + fields.AddField(tc.colName, dataType, opts) - value, err := cfg.ParseValue(ParseValueArgs{ + value, err := ParseValue(fields, ParseValueArgs{ ColName: tc.colName, ValueWrapper: tc.value, ParseTime: tc.parseTime, @@ -112,7 +113,7 @@ func TestParse(t *testing.T) { // if there are no errors, let's iterate over this a few times to make sure it's deterministic. for i := 0; i < 5; i++ { - value, err = cfg.ParseValue(ParseValueArgs{ + value, err = ParseValue(fields, ParseValueArgs{ ColName: tc.colName, ValueWrapper: value, ParseTime: tc.parseTime, diff --git a/lib/postgres/scan.go b/lib/postgres/scan.go index 9dab131d..3024b2b8 100644 --- a/lib/postgres/scan.go +++ b/lib/postgres/scan.go @@ -56,8 +56,8 @@ func (s *scanner) scan(errorAttempts int) ([]map[string]interface{}, error) { secondWhereClause = queries.GreaterThanEqualTo } - startKeys := s.primaryKeys.KeysToValueList(s.table.Config.Fields.GetOptionalSchema(), false) - endKeys := s.primaryKeys.KeysToValueList(s.table.Config.Fields.GetOptionalSchema(), true) + startKeys := s.primaryKeys.KeysToValueList(s.table.Fields.GetOptionalSchema(), false) + endKeys := s.primaryKeys.KeysToValueList(s.table.Fields.GetOptionalSchema(), true) query := queries.ScanTableQuery(queries.ScanTableQueryArgs{ Schema: s.table.Schema, @@ -111,7 +111,7 @@ func (s *scanner) scan(errorAttempts int) ([]map[string]interface{}, error) { row := make(map[string]ValueWrapper) for k, v := range values { colName := columns[k] - value, err := s.table.Config.ParseValue(ParseValueArgs{ + value, err := ParseValue(s.table.Fields, ParseValueArgs{ ColName: colName, ValueWrapper: ValueWrapper{ Value: v, @@ -131,7 +131,7 @@ func (s *scanner) scan(errorAttempts int) ([]map[string]interface{}, error) { // Update the starting key so that the next scan will pick off where we last left off. for _, pk := range s.primaryKeys.Keys() { - val, err := s.table.Config.ParseValue(ParseValueArgs{ + val, err := ParseValue(s.table.Fields, ParseValueArgs{ ColName: pk, ValueWrapper: lastRow[pk], ParseTime: true, diff --git a/lib/postgres/table.go b/lib/postgres/table.go index 62c37c54..d9f93cce 100644 --- a/lib/postgres/table.go +++ b/lib/postgres/table.go @@ -10,6 +10,7 @@ import ( "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/lib/postgres/debezium" "github.com/artie-labs/reader/lib/postgres/primary_key" "github.com/artie-labs/reader/lib/postgres/queries" ) @@ -23,7 +24,7 @@ type Table struct { // TODO: `OriginalColumns` and `ColumnsCastedForScanning` can be merged later to be more concise. OriginalColumns []string ColumnsCastedForScanning []string - Config *Config + Fields *debezium.Fields OptionalPrimaryKeyValStart string OptionalPrimaryKeyValEnd string @@ -34,7 +35,7 @@ func NewTable(cfgTable *config.PostgreSQLTable) *Table { Name: cfgTable.Name, Schema: cfgTable.Schema, PrimaryKeys: primary_key.NewKeys(), - Config: NewPostgresConfig(), + Fields: debezium.NewFields(), OptionalPrimaryKeyValStart: cfgTable.OptionalPrimaryKeyValStart, OptionalPrimaryKeyValEnd: cfgTable.OptionalPrimaryKeyValEnd, } @@ -108,7 +109,7 @@ func (t *Table) FindStartAndEndPrimaryKeys(db *sql.DB) error { } for idx, maxValue := range values { - val, err := t.Config.ParseValue(ParseValueArgs{ + val, err := ParseValue(t.Fields, ParseValueArgs{ ColName: keys[idx], ValueWrapper: ValueWrapper{ Value: maxValue, @@ -143,7 +144,7 @@ func (t *Table) FindStartAndEndPrimaryKeys(db *sql.DB) error { } for idx, minValue := range minValues { - val, err := t.Config.ParseValue(ParseValueArgs{ + val, err := ParseValue(t.Fields, ParseValueArgs{ ColName: keys[idx], ValueWrapper: ValueWrapper{ Value: minValue,