Skip to content

Commit

Permalink
[Postgres] Fix native parsing (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Feb 3, 2024
1 parent 00524c0 commit ce9ad84
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 50 deletions.
5 changes: 5 additions & 0 deletions lib/debezium/numeric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func TestEncodeDecimalToBase64(t *testing.T) {
value: "-81.76254098",
scale: 8,
},
{
name: "amount",
value: "6408.355",
scale: 3,
},
}

for _, tc := range tcs {
Expand Down
40 changes: 20 additions & 20 deletions lib/postgres/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,24 @@ func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) {
colKind := c.Fields.GetDataType(args.ColName)
switch colKind {
case debezium.Geometry:
valBytes, isOk := args.Value().([]byte)
valString, isOk := args.Value().(string)
if !isOk {
return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte type for geometry", args.Value())
return NewValueWrapper(nil), fmt.Errorf("value: %v not of string type for geometry", args.Value())
}

geometry, err := parse.ToGeography(valBytes)
geometry, err := parse.ToGeography([]byte(valString))
if err != nil {
return NewValueWrapper(nil), fmt.Errorf("failed to parse geometry, err: %v", err)
}

return NewValueWrapper(geometry), nil
case debezium.Point:
valBytes, isOk := args.Value().([]byte)
valString, isOk := args.Value().(string)
if !isOk {
return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte type for POINT", args.Value())
return NewValueWrapper(nil), fmt.Errorf("value: %v not of string type for POINT", args.Value())
}

point, err := parse.ToPoint(valBytes)
point, err := parse.ToPoint(valString)
if err != nil {
return NewValueWrapper(nil), fmt.Errorf("failed to parse POINT, err: %v", err)
}
Expand All @@ -74,11 +74,11 @@ func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) {

case debezium.Bit:
// This will be 0 (false) or 1 (true)
valBytes, isOk := args.Value().([]byte)
valString, isOk := args.Value().(string)
if isOk {
return NewValueWrapper(string(valBytes) == "1"), nil
return NewValueWrapper(valString == "1"), nil
}
return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte type for bit", args.Value())
return NewValueWrapper(nil), fmt.Errorf("value: %v not of string type for bit", args.Value())
case debezium.JSON:
// Debezium sends JSON as a JSON string
byteSlice, isByteSlice := args.Value().([]byte)
Expand All @@ -87,13 +87,13 @@ func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) {
}

return NewValueWrapper(string(byteSlice)), nil
case debezium.VariableNumeric, debezium.Numeric:
byteSlice, isByteSlice := args.ValueWrapper.Value.([]byte)
if isByteSlice {
return NewValueWrapper(string(byteSlice)), nil
case debezium.Numeric, debezium.VariableNumeric:
stringVal, isStringVal := args.Value().(string)
if isStringVal {
return NewValueWrapper(stringVal), nil
}

return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte type for VariableNumeric", args.Value())
return NewValueWrapper(nil), fmt.Errorf("value: %v not of string type for Numeric or VariableNumeric", args.Value())
case debezium.Array:
var arr []interface{}
if reflect.TypeOf(args.Value()).Kind() == reflect.Slice {
Expand All @@ -107,12 +107,12 @@ func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) {
}
return NewValueWrapper(arr), nil
case debezium.UUID:
byteSlice, isOk := args.Value().([]byte)
stringVal, isOk := args.Value().(string)
if !isOk {
return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte() type", args.Value())
return NewValueWrapper(nil), fmt.Errorf("value: %v not of string type", args.Value())
}

_uuid, err := uuid.ParseBytes(byteSlice)
_uuid, err := uuid.Parse(stringVal)
if err != nil {
return NewValueWrapper(nil), fmt.Errorf("failed to cast uuid into *uuid.UUID, err: %v", err)
}
Expand All @@ -134,12 +134,12 @@ func (c *Config) ParseValue(args ParseValueArgs) (ValueWrapper, error) {

return NewValueWrapper(jsonMap), nil
case debezium.UserDefinedText:
byteSlice, isOk := args.Value().([]byte)
stringSlice, isOk := args.Value().(string)
if !isOk {
return NewValueWrapper(nil), fmt.Errorf("value: %v not of []byte() type", args.Value())
return NewValueWrapper(nil), fmt.Errorf("value: %v not of slice type", args.Value())
}

return NewValueWrapper(string(byteSlice)), nil
return NewValueWrapper(stringSlice), nil
default:
// This is needed because we need to cast the time.Time object into a string for pagination.
if args.ParseTime {
Expand Down
8 changes: 3 additions & 5 deletions lib/postgres/parse/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ func (p *Point) ToMap() map[string]interface{} {
}
}

func ToPoint(data []byte) (*Point, error) {
dataString := string(data)

if !(strings.HasPrefix(dataString, "(") && strings.HasSuffix(dataString, ")")) {
func ToPoint(data string) (*Point, error) {
if !(strings.HasPrefix(data, "(") && strings.HasSuffix(data, ")")) {
return nil, fmt.Errorf("invalid point format")
}

// Trim `(` and `)`
trimmed := strings.Trim(dataString, "()")
trimmed := strings.Trim(data, "()")

// Split the string by the comma
parts := strings.Split(trimmed, ",")
Expand Down
12 changes: 6 additions & 6 deletions lib/postgres/parse/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,35 @@ import (
func TestToPoint(t *testing.T) {
type _tc struct {
name string
input []byte
input string
output *Point
expectError bool
}

tcs := []_tc{
{
name: "Valid point",
input: []byte("(2.2945,48.8584)"),
input: "(2.2945,48.8584)",
output: &Point{X: 2.2945, Y: 48.8584},
},
{
name: "Invalid format",
input: []byte("2.2945,48.8584"),
input: "2.2945,48.8584",
expectError: true,
},
{
name: "Invalid X coordinate",
input: []byte("(abc,48.8584)"),
input: "(abc,48.8584)",
expectError: true,
},
{
name: "Invalid Y coordinate",
input: []byte("(2.2945,xyz)"),
input: "(2.2945,xyz)",
expectError: true,
},
{
name: "Empty input",
input: []byte(""),
input: "",
expectError: true,
},
}
Expand Down
20 changes: 2 additions & 18 deletions lib/postgres/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,18 @@ func TestParse(t *testing.T) {
colName: "bit_test (true)",
colKind: "bit",
value: ValueWrapper{
Value: []byte("1"),
Value: "1",
},
expectedValue: true,
},
{
colName: "bit_test (false)",
colKind: "bit",
value: ValueWrapper{
Value: []byte("0"),
Value: "0",
},
expectedValue: false,
},
{
colName: "numeric_test",
colKind: "numeric",
value: ValueWrapper{
Value: []byte{49, 48, 48, 48, 49, 49},
},
expectedValue: "100011",
},
{
colName: "foo",
colKind: "ARRAY",
Expand All @@ -58,14 +50,6 @@ func TestParse(t *testing.T) {
},
expectedValue: "hello",
},
{
colName: "uuid (errors out when it's not already parsed)",
colKind: "uuid",
value: ValueWrapper{
Value: "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
},
expectErr: true,
},
{
colName: "uuid (already parsed, so skip parsing)",
colKind: "uuid",
Expand Down
2 changes: 1 addition & 1 deletion lib/postgres/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *scanner) scan(errorAttempts int) ([]map[string]interface{}, error) {
slog.Info(fmt.Sprintf("Query looks like: %v", query))
rows, err := s.db.Query(query)
if err != nil {
if attemptsLeft := (s.errorRetries - errorAttempts); attemptsLeft > 0 {
if attemptsLeft := s.errorRetries - errorAttempts; attemptsLeft > 0 {
sleepMs := lib.JitterMs(jitterBaseMs, jitterMaxMs, errorAttempts)
slog.Info(fmt.Sprintf("We still have %v attempts", attemptsLeft), slog.Int("sleepMs", sleepMs), slog.Any("err", err))
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
Expand Down

0 comments on commit ce9ad84

Please sign in to comment.