From ce9ad846d4d114d950477e1fd33f2005fd00f027 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 2 Feb 2024 17:19:18 -0800 Subject: [PATCH] [Postgres] Fix native parsing (#78) --- lib/debezium/numeric_test.go | 5 ++++ lib/postgres/parse.go | 40 ++++++++++++++++---------------- lib/postgres/parse/parse.go | 8 +++---- lib/postgres/parse/parse_test.go | 12 +++++----- lib/postgres/parse_test.go | 20 ++-------------- lib/postgres/scan.go | 2 +- 6 files changed, 37 insertions(+), 50 deletions(-) diff --git a/lib/debezium/numeric_test.go b/lib/debezium/numeric_test.go index 165265fb..110936c3 100644 --- a/lib/debezium/numeric_test.go +++ b/lib/debezium/numeric_test.go @@ -93,6 +93,11 @@ func TestEncodeDecimalToBase64(t *testing.T) { value: "-81.76254098", scale: 8, }, + { + name: "amount", + value: "6408.355", + scale: 3, + }, } for _, tc := range tcs { diff --git a/lib/postgres/parse.go b/lib/postgres/parse.go index c74ca6ba..86c198d0 100644 --- a/lib/postgres/parse.go +++ b/lib/postgres/parse.go @@ -48,24 +48,24 @@ func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) { colKind := c.Fields.GetDataType(args.ColName) switch colKind { case debezium.Geometry: - valBytes, isOk := args.Value().([]byte) + valString, isOk := args.Value().(string) if !isOk { - return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte type for geometry", args.Value()) + return NewValueWrapper(nil), fmt.Errorf("value: %v not of string type for geometry", args.Value()) } - geometry, err := parse.ToGeography(valBytes) + geometry, err := parse.ToGeography([]byte(valString)) if err != nil { return NewValueWrapper(nil), fmt.Errorf("failed to parse geometry, err: %v", err) } return NewValueWrapper(geometry), nil case debezium.Point: - valBytes, isOk := args.Value().([]byte) + valString, isOk := args.Value().(string) if !isOk { - return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte type for POINT", args.Value()) + return NewValueWrapper(nil), fmt.Errorf("value: %v not of string type for POINT", args.Value()) } - point, err := parse.ToPoint(valBytes) + point, err := parse.ToPoint(valString) if err != nil { return NewValueWrapper(nil), fmt.Errorf("failed to parse POINT, err: %v", err) } @@ -74,11 +74,11 @@ func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) { case debezium.Bit: // This will be 0 (false) or 1 (true) - valBytes, isOk := args.Value().([]byte) + valString, isOk := args.Value().(string) if isOk { - return NewValueWrapper(string(valBytes) == "1"), nil + return NewValueWrapper(valString == "1"), nil } - return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte type for bit", args.Value()) + return NewValueWrapper(nil), fmt.Errorf("value: %v not of string type for bit", args.Value()) case debezium.JSON: // Debezium sends JSON as a JSON string byteSlice, isByteSlice := args.Value().([]byte) @@ -87,13 +87,13 @@ func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) { } return NewValueWrapper(string(byteSlice)), nil - case debezium.VariableNumeric, debezium.Numeric: - byteSlice, isByteSlice := args.ValueWrapper.Value.([]byte) - if isByteSlice { - return NewValueWrapper(string(byteSlice)), nil + case debezium.Numeric, debezium.VariableNumeric: + stringVal, isStringVal := args.Value().(string) + if isStringVal { + return NewValueWrapper(stringVal), nil } - return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte type for VariableNumeric", args.Value()) + return NewValueWrapper(nil), fmt.Errorf("value: %v not of string type for Numeric or VariableNumeric", args.Value()) case debezium.Array: var arr []interface{} if reflect.TypeOf(args.Value()).Kind() == reflect.Slice { @@ -107,12 +107,12 @@ func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) { } return NewValueWrapper(arr), nil case debezium.UUID: - byteSlice, isOk := args.Value().([]byte) + stringVal, isOk := args.Value().(string) if !isOk { - return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte() type", args.Value()) + return NewValueWrapper(nil), fmt.Errorf("value: %v not of string type", args.Value()) } - _uuid, err := uuid.ParseBytes(byteSlice) + _uuid, err := uuid.Parse(stringVal) if err != nil { return NewValueWrapper(nil), fmt.Errorf("failed to cast uuid into *uuid.UUID, err: %v", err) } @@ -134,12 +134,12 @@ func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) { return NewValueWrapper(jsonMap), nil case debezium.UserDefinedText: - byteSlice, isOk := args.Value().([]byte) + stringSlice, isOk := args.Value().(string) if !isOk { - return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte() type", args.Value()) + return NewValueWrapper(nil), fmt.Errorf("value: %v not of slice type", args.Value()) } - return NewValueWrapper(string(byteSlice)), nil + return NewValueWrapper(stringSlice), nil default: // This is needed because we need to cast the time.Time object into a string for pagination. if args.ParseTime { diff --git a/lib/postgres/parse/parse.go b/lib/postgres/parse/parse.go index fdde097a..529b0d2d 100644 --- a/lib/postgres/parse/parse.go +++ b/lib/postgres/parse/parse.go @@ -19,15 +19,13 @@ func (p *Point) ToMap() map[string]interface{} { } } -func ToPoint(data []byte) (*Point, error) { - dataString := string(data) - - if !(strings.HasPrefix(dataString, "(") && strings.HasSuffix(dataString, ")")) { +func ToPoint(data string) (*Point, error) { + if !(strings.HasPrefix(data, "(") && strings.HasSuffix(data, ")")) { return nil, fmt.Errorf("invalid point format") } // Trim `(` and `)` - trimmed := strings.Trim(dataString, "()") + trimmed := strings.Trim(data, "()") // Split the string by the comma parts := strings.Split(trimmed, ",") diff --git a/lib/postgres/parse/parse_test.go b/lib/postgres/parse/parse_test.go index fc063897..34fd5f10 100644 --- a/lib/postgres/parse/parse_test.go +++ b/lib/postgres/parse/parse_test.go @@ -9,7 +9,7 @@ import ( func TestToPoint(t *testing.T) { type _tc struct { name string - input []byte + input string output *Point expectError bool } @@ -17,27 +17,27 @@ func TestToPoint(t *testing.T) { tcs := []_tc{ { name: "Valid point", - input: []byte("(2.2945,48.8584)"), + input: "(2.2945,48.8584)", output: &Point{X: 2.2945, Y: 48.8584}, }, { name: "Invalid format", - input: []byte("2.2945,48.8584"), + input: "2.2945,48.8584", expectError: true, }, { name: "Invalid X coordinate", - input: []byte("(abc,48.8584)"), + input: "(abc,48.8584)", expectError: true, }, { name: "Invalid Y coordinate", - input: []byte("(2.2945,xyz)"), + input: "(2.2945,xyz)", expectError: true, }, { name: "Empty input", - input: []byte(""), + input: "", expectError: true, }, } diff --git a/lib/postgres/parse_test.go b/lib/postgres/parse_test.go index ae6d6dbe..19adfd14 100644 --- a/lib/postgres/parse_test.go +++ b/lib/postgres/parse_test.go @@ -22,7 +22,7 @@ func TestParse(t *testing.T) { colName: "bit_test (true)", colKind: "bit", value: ValueWrapper{ - Value: []byte("1"), + Value: "1", }, expectedValue: true, }, @@ -30,18 +30,10 @@ func TestParse(t *testing.T) { colName: "bit_test (false)", colKind: "bit", value: ValueWrapper{ - Value: []byte("0"), + Value: "0", }, expectedValue: false, }, - { - colName: "numeric_test", - colKind: "numeric", - value: ValueWrapper{ - Value: []byte{49, 48, 48, 48, 49, 49}, - }, - expectedValue: "100011", - }, { colName: "foo", colKind: "ARRAY", @@ -58,14 +50,6 @@ func TestParse(t *testing.T) { }, expectedValue: "hello", }, - { - colName: "uuid (errors out when it's not already parsed)", - colKind: "uuid", - value: ValueWrapper{ - Value: "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", - }, - expectErr: true, - }, { colName: "uuid (already parsed, so skip parsing)", colKind: "uuid", diff --git a/lib/postgres/scan.go b/lib/postgres/scan.go index b1700d9c..de28a966 100644 --- a/lib/postgres/scan.go +++ b/lib/postgres/scan.go @@ -78,7 +78,7 @@ func (s *scanner) scan(errorAttempts int) ([]map[string]interface{}, error) { slog.Info(fmt.Sprintf("Query looks like: %v", query)) rows, err := s.db.Query(query) if err != nil { - if attemptsLeft := (s.errorRetries - errorAttempts); attemptsLeft > 0 { + if attemptsLeft := s.errorRetries - errorAttempts; attemptsLeft > 0 { sleepMs := lib.JitterMs(jitterBaseMs, jitterMaxMs, errorAttempts) slog.Info(fmt.Sprintf("We still have %v attempts", attemptsLeft), slog.Int("sleepMs", sleepMs), slog.Any("err", err)) time.Sleep(time.Duration(sleepMs) * time.Millisecond)