Skip to content

Commit

Permalink
Kill postgres table Config struct (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Feb 14, 2024
1 parent 86ffd1f commit 3ea8e78
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 35 deletions.
4 changes: 2 additions & 2 deletions lib/postgres/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (t *Table) RetrieveColumns(db *sql.DB) error {
slog.String("colKind", colKind),
)
} else {
t.Config.Fields.AddField(colName, dataType, opts)
t.Fields.AddField(colName, dataType, opts)
}
}

Expand All @@ -60,7 +60,7 @@ func (t *Table) RetrieveColumns(db *sql.DB) error {
for _, column := range columns {
// Add to original columns before mutation
t.OriginalColumns = append(t.OriginalColumns, column)
columnKind := t.Config.Fields.GetDataType(column)
columnKind := t.Fields.GetDataType(column)
t.ColumnsCastedForScanning = append(t.ColumnsCastedForScanning, castColumn(column, columnKind))
}

Expand Down
13 changes: 0 additions & 13 deletions lib/postgres/config.go

This file was deleted.

10 changes: 6 additions & 4 deletions lib/postgres/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

"github.com/artie-labs/transfer/lib/debezium"
"github.com/stretchr/testify/assert"

pgDebezium "github.com/artie-labs/reader/lib/postgres/debezium"
)

func TestPostgresConfig_Complete(t *testing.T) {
Expand Down Expand Up @@ -147,15 +149,15 @@ func TestPostgresConfig_Complete(t *testing.T) {
}

for _, testCase := range testCases {
cfg := NewPostgresConfig()
fields := pgDebezium.NewFields()
// TODO: Add test for hstore
dataType, opts := colKindToDataType(testCase.colKind, nil, nil, nil)
cfg.Fields.AddField(testCase.colName, dataType, opts)
fields.AddField(testCase.colName, dataType, opts)

actualEscCol := castColumn(testCase.colName, cfg.Fields.GetDataType(testCase.colName))
actualEscCol := castColumn(testCase.colName, fields.GetDataType(testCase.colName))
assert.Equal(t, testCase.expectedEscColString, actualEscCol, testCase.name)

field, isOk := cfg.Fields.GetField(testCase.colName)
field, isOk := fields.GetField(testCase.colName)
assert.True(t, isOk, testCase.name)
assert.Equal(t, testCase.expectedField, field, testCase.name)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/postgres/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (m *MessageBuilder) Next() ([]lib.RawMessage, error) {
payload, err := debezium.NewPayload(&debezium.NewArgs{
TableName: m.table.Name,
Columns: m.table.OriginalColumns,
Fields: m.table.Config.Fields,
Fields: m.table.Fields,
RowData: row,
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions lib/postgres/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func NewValueWrapper(value interface{}) ValueWrapper {
}
}

func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) {
func ParseValue(fields *debezium.Fields, args ParseValueArgs) (ValueWrapper, error) {
// If the value is nil, or already parsed - just return.
if args.Value() == nil || args.ValueWrapper.parsed {
return args.ValueWrapper, nil
}

colKind := c.Fields.GetDataType(args.ColName)
colKind := fields.GetDataType(args.ColName)
switch colKind {
case debezium.Geometry, debezium.Geography:
valString, isOk := args.Value().(string)
Expand Down
11 changes: 6 additions & 5 deletions lib/postgres/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"time"

"github.com/artie-labs/transfer/lib/ptr"

"github.com/stretchr/testify/assert"

pgDebezium "github.com/artie-labs/reader/lib/postgres/debezium"
)

func TestParse(t *testing.T) {
Expand Down Expand Up @@ -94,11 +95,11 @@ func TestParse(t *testing.T) {
}

for _, tc := range tcs {
cfg := NewPostgresConfig()
fields := pgDebezium.NewFields()
dataType, opts := colKindToDataType(tc.colKind, nil, nil, tc.udtName)
cfg.Fields.AddField(tc.colName, dataType, opts)
fields.AddField(tc.colName, dataType, opts)

value, err := cfg.ParseValue(ParseValueArgs{
value, err := ParseValue(fields, ParseValueArgs{
ColName: tc.colName,
ValueWrapper: tc.value,
ParseTime: tc.parseTime,
Expand All @@ -112,7 +113,7 @@ func TestParse(t *testing.T) {

// if there are no errors, let's iterate over this a few times to make sure it's deterministic.
for i := 0; i < 5; i++ {
value, err = cfg.ParseValue(ParseValueArgs{
value, err = ParseValue(fields, ParseValueArgs{
ColName: tc.colName,
ValueWrapper: value,
ParseTime: tc.parseTime,
Expand Down
8 changes: 4 additions & 4 deletions lib/postgres/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (s *scanner) scan(errorAttempts int) ([]map[string]interface{}, error) {
secondWhereClause = queries.GreaterThanEqualTo
}

startKeys := s.primaryKeys.KeysToValueList(s.table.Config.Fields.GetOptionalSchema(), false)
endKeys := s.primaryKeys.KeysToValueList(s.table.Config.Fields.GetOptionalSchema(), true)
startKeys := s.primaryKeys.KeysToValueList(s.table.Fields.GetOptionalSchema(), false)
endKeys := s.primaryKeys.KeysToValueList(s.table.Fields.GetOptionalSchema(), true)

query := queries.ScanTableQuery(queries.ScanTableQueryArgs{
Schema: s.table.Schema,
Expand Down Expand Up @@ -111,7 +111,7 @@ func (s *scanner) scan(errorAttempts int) ([]map[string]interface{}, error) {
row := make(map[string]ValueWrapper)
for k, v := range values {
colName := columns[k]
value, err := s.table.Config.ParseValue(ParseValueArgs{
value, err := ParseValue(s.table.Fields, ParseValueArgs{
ColName: colName,
ValueWrapper: ValueWrapper{
Value: v,
Expand All @@ -131,7 +131,7 @@ func (s *scanner) scan(errorAttempts int) ([]map[string]interface{}, error) {

// Update the starting key so that the next scan will pick off where we last left off.
for _, pk := range s.primaryKeys.Keys() {
val, err := s.table.Config.ParseValue(ParseValueArgs{
val, err := ParseValue(s.table.Fields, ParseValueArgs{
ColName: pk,
ValueWrapper: lastRow[pk],
ParseTime: true,
Expand Down
9 changes: 5 additions & 4 deletions lib/postgres/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/artie-labs/transfer/lib/ptr"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/postgres/debezium"
"github.com/artie-labs/reader/lib/postgres/primary_key"
"github.com/artie-labs/reader/lib/postgres/queries"
)
Expand All @@ -23,7 +24,7 @@ type Table struct {
// TODO: `OriginalColumns` and `ColumnsCastedForScanning` can be merged later to be more concise.
OriginalColumns []string
ColumnsCastedForScanning []string
Config *Config
Fields *debezium.Fields

OptionalPrimaryKeyValStart string
OptionalPrimaryKeyValEnd string
Expand All @@ -34,7 +35,7 @@ func NewTable(cfgTable *config.PostgreSQLTable) *Table {
Name: cfgTable.Name,
Schema: cfgTable.Schema,
PrimaryKeys: primary_key.NewKeys(),
Config: NewPostgresConfig(),
Fields: debezium.NewFields(),
OptionalPrimaryKeyValStart: cfgTable.OptionalPrimaryKeyValStart,
OptionalPrimaryKeyValEnd: cfgTable.OptionalPrimaryKeyValEnd,
}
Expand Down Expand Up @@ -108,7 +109,7 @@ func (t *Table) FindStartAndEndPrimaryKeys(db *sql.DB) error {
}

for idx, maxValue := range values {
val, err := t.Config.ParseValue(ParseValueArgs{
val, err := ParseValue(t.Fields, ParseValueArgs{
ColName: keys[idx],
ValueWrapper: ValueWrapper{
Value: maxValue,
Expand Down Expand Up @@ -143,7 +144,7 @@ func (t *Table) FindStartAndEndPrimaryKeys(db *sql.DB) error {
}

for idx, minValue := range minValues {
val, err := t.Config.ParseValue(ParseValueArgs{
val, err := ParseValue(t.Fields, ParseValueArgs{
ColName: keys[idx],
ValueWrapper: ValueWrapper{
Value: minValue,
Expand Down

0 comments on commit 3ea8e78

Please sign in to comment.