Skip to content

Commit

Permalink
[debezium] Return []byte from decimal converters (#302)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Mar 18, 2024
1 parent 709c91c commit 98cfb38
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 46 deletions.
2 changes: 1 addition & 1 deletion integration_tests/postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ const expectedPayloadTemplate = `{
"c_money": "T30t",
"c_numeric": "AYHN",
"c_numeric_variable": {
"scale": "5",
"scale": 5,
"value": "QX3UWQ=="
},
"c_numrange": "[11.1,22.2)",
Expand Down
20 changes: 9 additions & 11 deletions lib/debezium/converters/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func (d decimalConverter) Convert(value any) (any, error) {
if !isOk {
return nil, fmt.Errorf("expected string got %T with value: %v", value, value)
}

return debezium.EncodeDecimalToBase64(castValue, d.scale)
return debezium.EncodeDecimalToBytes(castValue, d.scale), nil
}

type VariableNumericConverter struct{}
Expand All @@ -53,21 +52,20 @@ func (VariableNumericConverter) ToField(name string) transferDBZ.Field {
}
}

type VariableScaleDecimal struct {
Scale int32 `json:"scale"`
Value []byte `json:"value"`
}

func (VariableNumericConverter) Convert(value any) (any, error) {
stringValue, ok := value.(string)
if !ok {
return nil, fmt.Errorf("expected string got %T with value: %v", value, value)
}

scale := debezium.GetScale(stringValue)

encodedValue, err := debezium.EncodeDecimalToBase64(stringValue, scale)
if err != nil {
return nil, fmt.Errorf("failed to encode decimal to b64: %w", err)
}

return map[string]string{
"scale": fmt.Sprint(scale),
"value": encodedValue,
return VariableScaleDecimal{
Scale: int32(scale),
Value: debezium.EncodeDecimalToBytes(stringValue, scale),
}, nil
}
17 changes: 8 additions & 9 deletions lib/debezium/converters/decimal_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package converters

import (
"encoding/base64"
"fmt"
"strconv"
"testing"

"github.com/artie-labs/transfer/lib/debezium"
Expand Down Expand Up @@ -45,8 +45,9 @@ func TestDecimalConverter_Convert(t *testing.T) {
{
converted, err := converter.Convert("1.23")
assert.NoError(t, err)

actualValue, err := converter.ToField("").DecodeDecimal(fmt.Sprint(converted))
bytes, ok := converted.([]byte)
assert.True(t, ok)
actualValue, err := converter.ToField("").DecodeDecimal(base64.StdEncoding.EncodeToString(bytes))
assert.NoError(t, err)
assert.Equal(t, "1.23", fmt.Sprint(actualValue))
}
Expand All @@ -72,14 +73,12 @@ func TestVariableNumericConverter_Convert(t *testing.T) {
// Happy path
converted, err := converter.Convert("12.34")
assert.NoError(t, err)
convertedMap, ok := converted.(map[string]string)
convertedMap, ok := converted.(VariableScaleDecimal)
assert.True(t, ok)
assert.Equal(t, map[string]string{"scale": "2", "value": "BNI="}, convertedMap)

scale, err := strconv.Atoi(convertedMap["scale"])
assert.NoError(t, err)
assert.Equal(t, VariableScaleDecimal{Scale: 2, Value: []byte{0x4, 0xd2}}, convertedMap)

actualValue, err := NewDecimalConverter(scale, nil).ToField("").DecodeDecimal(convertedMap["value"])
decimalConverter := NewDecimalConverter(int(convertedMap.Scale), nil).ToField("")
actualValue, err := decimalConverter.DecodeDecimal(base64.StdEncoding.EncodeToString(convertedMap.Value))
assert.NoError(t, err)
assert.Equal(t, "12.34", fmt.Sprint(actualValue))
}
Expand Down
8 changes: 2 additions & 6 deletions lib/debezium/numeric.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package debezium

import (
"encoding/base64"
"math/big"
"strings"
)
Expand All @@ -21,7 +20,7 @@ func GetScale(value string) int {
return scale
}

func EncodeDecimalToBase64(value string, scale int) (string, error) {
func EncodeDecimalToBytes(value string, scale int) []byte {
scaledValue := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil)
bigFloatValue := new(big.Float)
bigFloatValue.SetString(value)
Expand Down Expand Up @@ -56,8 +55,5 @@ func EncodeDecimalToBase64(value string, scale int) (string, error) {
data = append([]byte{0x00}, data...)
}
}

// Encode to base64
encoded := base64.StdEncoding.EncodeToString(data)
return encoded, nil
return data
}
6 changes: 3 additions & 3 deletions lib/debezium/numeric_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package debezium

import (
"encoding/base64"
"testing"

"github.com/artie-labs/transfer/lib/debezium"
Expand Down Expand Up @@ -101,15 +102,14 @@ func TestEncodeDecimalToBase64(t *testing.T) {
}

for _, tc := range tcs {
actualEncodedValue, err := EncodeDecimalToBase64(tc.value, tc.scale)
assert.NoError(t, err, tc.name)
actualEncodedValue := EncodeDecimalToBytes(tc.value, tc.scale)
field := debezium.Field{
Parameters: map[string]any{
"scale": tc.scale,
},
}

actualValue, err := field.DecodeDecimal(actualEncodedValue)
actualValue, err := field.DecodeDecimal(base64.StdEncoding.EncodeToString(actualEncodedValue))
assert.NoError(t, err, tc.name)
assert.Equal(t, tc.value, actualValue.String(), tc.name)
}
Expand Down
9 changes: 6 additions & 3 deletions sources/postgres/adapter/adapter_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package adapter

import (
"fmt"
"encoding/base64"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/artie-labs/reader/lib/debezium/converters"
"github.com/artie-labs/reader/lib/postgres"
"github.com/artie-labs/reader/lib/postgres/schema"
"github.com/artie-labs/transfer/lib/debezium"
Expand Down Expand Up @@ -229,7 +230,7 @@ func TestValueConverterForType_Convert(t *testing.T) {
name: "numeric (postgres.Numeric) - variable numeric",
col: schema.Column{Name: "variable_numeric_col", Type: schema.VariableNumeric},
value: "123.98",
expectedValue: map[string]string{"scale": "2", "value": "MG4="},
expectedValue: converters.VariableScaleDecimal{Scale: 2, Value: []byte{0x30, 0x6e}},
},
{
name: "string",
Expand Down Expand Up @@ -261,8 +262,10 @@ func TestValueConverterForType_Convert(t *testing.T) {
} else {
assert.NoError(t, actualErr, tc.name)
if tc.numericValue {
bytes, ok := actualValue.([]byte)
assert.True(t, ok)
field := converter.ToField(tc.col.Name)
val, err := field.DecodeDecimal(fmt.Sprint(actualValue))
val, err := field.DecodeDecimal(base64.StdEncoding.EncodeToString(bytes))
assert.NoError(t, err, tc.name)
assert.Equal(t, tc.expectedValue, val.String(), tc.name)
} else {
Expand Down
7 changes: 1 addition & 6 deletions sources/postgres/adapter/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@ func (MoneyConverter) ToField(name string) transferDbz.Field {

func (MoneyConverter) Convert(value any) (any, error) {
stringValue := stringutil.ParseMoneyIntoString(fmt.Sprint(value))

stringValue, err := debezium.EncodeDecimalToBase64(stringValue, moneyScale)
if err != nil {
return nil, fmt.Errorf("failed to encode decimal to b64: %w", err)
}
return stringValue, nil
return debezium.EncodeDecimalToBytes(stringValue, moneyScale), nil
}

type PgTimeConverter struct{}
Expand Down
16 changes: 9 additions & 7 deletions sources/postgres/adapter/converters_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package adapter

import (
"encoding/base64"
"math"
"testing"
"time"
Expand All @@ -26,10 +27,11 @@ func TestMoneyConverter_ToField(t *testing.T) {
}

func TestMoneyConverter_Convert(t *testing.T) {
decimalField := converters.NewDecimalConverter(moneyScale, nil).ToField("")
decodeValue := func(value any) string {
stringValue, ok := value.(string)
bytes, ok := value.([]byte)
assert.True(t, ok)
val, err := converters.NewDecimalConverter(moneyScale, nil).ToField("").DecodeDecimal(stringValue)
val, err := decimalField.DecodeDecimal(base64.StdEncoding.EncodeToString(bytes))
assert.NoError(t, err)
return val.String()
}
Expand All @@ -39,35 +41,35 @@ func TestMoneyConverter_Convert(t *testing.T) {
// int
converted, err := converter.Convert(1234)
assert.NoError(t, err)
assert.Equal(t, "AeII", converted)
assert.Equal(t, []byte{0x1, 0xe2, 0x8}, converted)
assert.Equal(t, "1234.00", decodeValue(converted))
}
{
// float
converted, err := converter.Convert(1234.56)
assert.NoError(t, err)
assert.Equal(t, "AeJA", converted)
assert.Equal(t, []byte{0x1, 0xe2, 0x40}, converted)
assert.Equal(t, "1234.56", decodeValue(converted))
}
{
// string
converted, err := converter.Convert("1234.56")
assert.NoError(t, err)
assert.Equal(t, "AeJA", converted)
assert.Equal(t, []byte{0x1, 0xe2, 0x40}, converted)
assert.Equal(t, "1234.56", decodeValue(converted))
}
{
// string with $ and comma
converted, err := converter.Convert("$1,234.567")
assert.NoError(t, err)
assert.Equal(t, "AeJA", converted)
assert.Equal(t, []byte{0x1, 0xe2, 0x40}, converted)
assert.Equal(t, "1234.56", decodeValue(converted))
}
{
// string with $, comma, and no cents
converted, err := converter.Convert("$1000,234")
assert.NoError(t, err)
assert.Equal(t, "BfY8aA==", converted)
assert.Equal(t, []byte{0x5, 0xf6, 0x3c, 0x68}, converted)
assert.Equal(t, "1000234.00", decodeValue(converted))
}
}
Expand Down

0 comments on commit 98cfb38

Please sign in to comment.