Skip to content

Commit

Permalink
Improving our MySQL source work for additional data types (#209)
Browse files Browse the repository at this point in the history
Co-authored-by: Nathan Villaescusa <[email protected]>
  • Loading branch information
Tang8330 and nathan-artie authored Feb 27, 2024
1 parent 65b6283 commit 753aebd
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 24 deletions.
37 changes: 17 additions & 20 deletions lib/debezium/converters/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,41 @@ package converters

import (
"fmt"
transferDBZ "github.com/artie-labs/transfer/lib/debezium"

"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/reader/lib/debezium"
)

type decimalConverter struct {
scale *int
scale int
precision *int
}

func NewDecimalConverter(scale, precision *int) decimalConverter {
func NewDecimalConverter(scale int, precision *int) decimalConverter {
return decimalConverter{scale: scale, precision: precision}
}

func (d decimalConverter) ToField(name string) debezium.Field {
field := debezium.Field{
func (d decimalConverter) ToField(name string) transferDBZ.Field {
field := transferDBZ.Field{
FieldName: name,
DebeziumType: string(debezium.KafkaDecimalType),
DebeziumType: string(transferDBZ.KafkaDecimalType),
Parameters: map[string]any{
"scale": fmt.Sprint(d.scale),
},
}

if d.scale != nil || d.precision != nil {
field.Parameters = make(map[string]any)

if d.scale != nil {
field.Parameters["scale"] = fmt.Sprint(*d.scale)
}

if d.precision != nil {
field.Parameters[debezium.KafkaDecimalPrecisionKey] = fmt.Sprint(*d.precision)
}
if d.precision != nil {
field.Parameters[transferDBZ.KafkaDecimalPrecisionKey] = fmt.Sprint(*d.precision)
}

return field
}

func (decimalConverter) Convert(value any) (any, error) {
func (d decimalConverter) Convert(value any) (any, error) {
castValue, isOk := value.(string)
if isOk {
return castValue, nil
if !isOk {
return nil, fmt.Errorf("expected string got %T with value: %v", value, value)
}
return nil, fmt.Errorf("expected string got %T with value: %v", value, value)

return debezium.EncodeDecimalToBase64(castValue, d.scale)
}
50 changes: 50 additions & 0 deletions lib/debezium/converters/decimal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package converters

import (
"fmt"
"testing"

"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/stretchr/testify/assert"
)

func TestDecimalConverter_ToField(t *testing.T) {
{
// Without precision
converter := NewDecimalConverter(2, nil)
expected := debezium.Field{
FieldName: "col",
DebeziumType: "org.apache.kafka.connect.data.Decimal",
Parameters: map[string]any{
"scale": "2",
},
}
assert.Equal(t, expected, converter.ToField("col"))
}
{
// With precision
converter := NewDecimalConverter(2, ptr.ToInt(3))
expected := debezium.Field{
FieldName: "col",
DebeziumType: "org.apache.kafka.connect.data.Decimal",
Parameters: map[string]any{
"connect.decimal.precision": "3",
"scale": "2",
},
}
assert.Equal(t, expected, converter.ToField("col"))
}
}

func TestDecimalConverter_Convert(t *testing.T) {
converter := NewDecimalConverter(2, nil)
{
converted, err := converter.Convert("1.23")
assert.NoError(t, err)

actualValue, err := converter.ToField("").DecodeDecimal(fmt.Sprint(converted))
assert.NoError(t, err)
assert.Equal(t, "1.23", fmt.Sprint(actualValue))
}
}
12 changes: 12 additions & 0 deletions lib/mysql/schema/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ func ConvertValue(value any, colType DataType) (any, error) {
return nil, fmt.Errorf("bit value is invalid: %v", value)
}
return castValue[0] == 1, nil
case Boolean:
castVal, ok := value.(int64)
if !ok {
return nil, fmt.Errorf("expected int64 got %T for value: %v", value, value)
}
if castVal > 1 || castVal < 0 {
return nil, fmt.Errorf("boolean value not in [0, 1]: %v", value)
}
return castVal == 1, nil
case TinyInt,
SmallInt,
MediumInt,
Expand Down Expand Up @@ -82,6 +91,9 @@ func ConvertValue(value any, colType DataType) (any, error) {
Char,
Varchar,
Text,
TinyText,
MediumText,
LongText,
Enum,
Set,
JSON:
Expand Down
24 changes: 24 additions & 0 deletions lib/mysql/schema/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,30 @@ func TestConvertValue(t *testing.T) {
value: []byte{byte(1), byte(1)},
expectedErr: "bit value is invalid",
},
{
name: "boolean - 0",
dataType: Boolean,
value: int64(0),
expected: false,
},
{
name: "boolean - 1",
dataType: Boolean,
value: int64(1),
expected: true,
},
{
name: "boolean - -2",
dataType: Boolean,
value: int64(-2),
expectedErr: "boolean value not in [0, 1]: -2",
},
{
name: "boolean - 2",
dataType: Boolean,
value: int64(2),
expectedErr: "boolean value not in [0, 1]: 2",
},
{
name: "tiny int",
dataType: TinyInt,
Expand Down
18 changes: 17 additions & 1 deletion lib/mysql/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
Double
// Bit-Value Type
Bit
Boolean
// Date and Time Data Types
Date
DateTime
Expand All @@ -40,6 +41,9 @@ const (
Varbinary
Blob
Text
TinyText
MediumText
LongText
Enum
Set
// JSON
Expand Down Expand Up @@ -117,9 +121,14 @@ func parseColumnDataType(s string) (DataType, *Opts, error) {
metadata = s[parenIndex+1 : len(s)-1]
s = s[:parenIndex]
}

switch s {
case "tinyint":
// Boolean, bool are aliases for tinyint(1)
if metadata == "1" {
return Boolean, nil, nil
}

return TinyInt, nil, nil
case "smallint":
return SmallInt, nil, nil
Expand Down Expand Up @@ -177,13 +186,20 @@ func parseColumnDataType(s string) (DataType, *Opts, error) {
return Blob, nil, nil
case "text":
return Text, nil, nil
case "tinytext":
return TinyText, nil, nil
case "mediumtext":
return MediumText, nil, nil
case "longtext":
return LongText, nil, nil
case "enum":
return Enum, nil, nil
case "set":
return Set, nil, nil
case "json":
return JSON, nil, nil
default:
slog.Warn("Unknown data type", slog.String("type", s))
return InvalidDataType, nil, nil
}
}
Expand Down
4 changes: 4 additions & 0 deletions lib/mysql/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func TestParseColumnDataType(t *testing.T) {
input: "int",
expectedType: Int,
},
{
input: "tinyint(1)",
expectedType: Boolean,
},
{
input: "varchar(255)",
expectedType: Varchar,
Expand Down
10 changes: 7 additions & 3 deletions sources/mysql/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (m mysqlAdapter) ConvertRowToDebezium(row map[string]any) (map[string]any,

func valueConverterForType(d schema.DataType, opts *schema.Opts) (converters.ValueConverter, error) {
switch d {
case schema.Bit:
case schema.Bit, schema.Boolean:
return converters.BooleanPassthrough{}, nil
case schema.TinyInt, schema.SmallInt:
return converters.Int16Passthrough{}, nil
Expand All @@ -76,8 +76,12 @@ func valueConverterForType(d schema.DataType, opts *schema.Opts) (converters.Val
case schema.Double:
return converters.DoublePassthrough{}, nil
case schema.Decimal:
return converters.NewDecimalConverter(opts.Scale, opts.Precision), nil
case schema.Char, schema.Text, schema.Varchar:
if opts.Scale == nil {
return nil, fmt.Errorf("scale is required for decimal type")
}

return converters.NewDecimalConverter(*opts.Scale, opts.Precision), nil
case schema.Char, schema.Text, schema.Varchar, schema.TinyText, schema.MediumText, schema.LongText:
return converters.StringPassthrough{}, nil
case schema.Binary, schema.Varbinary, schema.Blob:
return converters.BytesPassthrough{}, nil
Expand Down

0 comments on commit 753aebd

Please sign in to comment.