From 753aebd074d3d0ff7f24d173448e8d415c8b977b Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 27 Feb 2024 11:52:13 -0800 Subject: [PATCH] Improving our MySQL source work for additional data types (#209) Co-authored-by: Nathan Villaescusa --- lib/debezium/converters/decimal.go | 37 +++++++++--------- lib/debezium/converters/decimal_test.go | 50 +++++++++++++++++++++++++ lib/mysql/schema/convert.go | 12 ++++++ lib/mysql/schema/convert_test.go | 24 ++++++++++++ lib/mysql/schema/schema.go | 18 ++++++++- lib/mysql/schema/schema_test.go | 4 ++ sources/mysql/adapter/adapter.go | 10 +++-- 7 files changed, 131 insertions(+), 24 deletions(-) create mode 100644 lib/debezium/converters/decimal_test.go diff --git a/lib/debezium/converters/decimal.go b/lib/debezium/converters/decimal.go index 4b075478..42e5b6c6 100644 --- a/lib/debezium/converters/decimal.go +++ b/lib/debezium/converters/decimal.go @@ -2,44 +2,41 @@ package converters import ( "fmt" + transferDBZ "github.com/artie-labs/transfer/lib/debezium" - "github.com/artie-labs/transfer/lib/debezium" + "github.com/artie-labs/reader/lib/debezium" ) type decimalConverter struct { - scale *int + scale int precision *int } -func NewDecimalConverter(scale, precision *int) decimalConverter { +func NewDecimalConverter(scale int, precision *int) decimalConverter { return decimalConverter{scale: scale, precision: precision} } -func (d decimalConverter) ToField(name string) debezium.Field { - field := debezium.Field{ +func (d decimalConverter) ToField(name string) transferDBZ.Field { + field := transferDBZ.Field{ FieldName: name, - DebeziumType: string(debezium.KafkaDecimalType), + DebeziumType: string(transferDBZ.KafkaDecimalType), + Parameters: map[string]any{ + "scale": fmt.Sprint(d.scale), + }, } - if d.scale != nil || d.precision != nil { - field.Parameters = make(map[string]any) - - if d.scale != nil { - field.Parameters["scale"] = fmt.Sprint(*d.scale) - } - - if d.precision != nil { - field.Parameters[debezium.KafkaDecimalPrecisionKey] = fmt.Sprint(*d.precision) - } + if d.precision != nil { + field.Parameters[transferDBZ.KafkaDecimalPrecisionKey] = fmt.Sprint(*d.precision) } return field } -func (decimalConverter) Convert(value any) (any, error) { +func (d decimalConverter) Convert(value any) (any, error) { castValue, isOk := value.(string) - if isOk { - return castValue, nil + if !isOk { + return nil, fmt.Errorf("expected string got %T with value: %v", value, value) } - return nil, fmt.Errorf("expected string got %T with value: %v", value, value) + + return debezium.EncodeDecimalToBase64(castValue, d.scale) } diff --git a/lib/debezium/converters/decimal_test.go b/lib/debezium/converters/decimal_test.go new file mode 100644 index 00000000..085c0153 --- /dev/null +++ b/lib/debezium/converters/decimal_test.go @@ -0,0 +1,50 @@ +package converters + +import ( + "fmt" + "testing" + + "github.com/artie-labs/transfer/lib/debezium" + "github.com/artie-labs/transfer/lib/ptr" + "github.com/stretchr/testify/assert" +) + +func TestDecimalConverter_ToField(t *testing.T) { + { + // Without precision + converter := NewDecimalConverter(2, nil) + expected := debezium.Field{ + FieldName: "col", + DebeziumType: "org.apache.kafka.connect.data.Decimal", + Parameters: map[string]any{ + "scale": "2", + }, + } + assert.Equal(t, expected, converter.ToField("col")) + } + { + // With precision + converter := NewDecimalConverter(2, ptr.ToInt(3)) + expected := debezium.Field{ + FieldName: "col", + DebeziumType: "org.apache.kafka.connect.data.Decimal", + Parameters: map[string]any{ + "connect.decimal.precision": "3", + "scale": "2", + }, + } + assert.Equal(t, expected, converter.ToField("col")) + } +} + +func TestDecimalConverter_Convert(t *testing.T) { + converter := NewDecimalConverter(2, nil) + { + converted, err := converter.Convert("1.23") + assert.NoError(t, err) + + actualValue, err := converter.ToField("").DecodeDecimal(fmt.Sprint(converted)) + assert.NoError(t, err) + assert.Equal(t, "1.23", fmt.Sprint(actualValue)) + } +} diff --git a/lib/mysql/schema/convert.go b/lib/mysql/schema/convert.go index fa34663a..e3bd8aef 100644 --- a/lib/mysql/schema/convert.go +++ b/lib/mysql/schema/convert.go @@ -24,6 +24,15 @@ func ConvertValue(value any, colType DataType) (any, error) { return nil, fmt.Errorf("bit value is invalid: %v", value) } return castValue[0] == 1, nil + case Boolean: + castVal, ok := value.(int64) + if !ok { + return nil, fmt.Errorf("expected int64 got %T for value: %v", value, value) + } + if castVal > 1 || castVal < 0 { + return nil, fmt.Errorf("boolean value not in [0, 1]: %v", value) + } + return castVal == 1, nil case TinyInt, SmallInt, MediumInt, @@ -82,6 +91,9 @@ func ConvertValue(value any, colType DataType) (any, error) { Char, Varchar, Text, + TinyText, + MediumText, + LongText, Enum, Set, JSON: diff --git a/lib/mysql/schema/convert_test.go b/lib/mysql/schema/convert_test.go index f063ac5b..92cb2d4d 100644 --- a/lib/mysql/schema/convert_test.go +++ b/lib/mysql/schema/convert_test.go @@ -45,6 +45,30 @@ func TestConvertValue(t *testing.T) { value: []byte{byte(1), byte(1)}, expectedErr: "bit value is invalid", }, + { + name: "boolean - 0", + dataType: Boolean, + value: int64(0), + expected: false, + }, + { + name: "boolean - 1", + dataType: Boolean, + value: int64(1), + expected: true, + }, + { + name: "boolean - -2", + dataType: Boolean, + value: int64(-2), + expectedErr: "boolean value not in [0, 1]: -2", + }, + { + name: "boolean - 2", + dataType: Boolean, + value: int64(2), + expectedErr: "boolean value not in [0, 1]: 2", + }, { name: "tiny int", dataType: TinyInt, diff --git a/lib/mysql/schema/schema.go b/lib/mysql/schema/schema.go index 572e9129..cf0b9643 100644 --- a/lib/mysql/schema/schema.go +++ b/lib/mysql/schema/schema.go @@ -27,6 +27,7 @@ const ( Double // Bit-Value Type Bit + Boolean // Date and Time Data Types Date DateTime @@ -40,6 +41,9 @@ const ( Varbinary Blob Text + TinyText + MediumText + LongText Enum Set // JSON @@ -117,9 +121,14 @@ func parseColumnDataType(s string) (DataType, *Opts, error) { metadata = s[parenIndex+1 : len(s)-1] s = s[:parenIndex] } - + switch s { case "tinyint": + // Boolean, bool are aliases for tinyint(1) + if metadata == "1" { + return Boolean, nil, nil + } + return TinyInt, nil, nil case "smallint": return SmallInt, nil, nil @@ -177,6 +186,12 @@ func parseColumnDataType(s string) (DataType, *Opts, error) { return Blob, nil, nil case "text": return Text, nil, nil + case "tinytext": + return TinyText, nil, nil + case "mediumtext": + return MediumText, nil, nil + case "longtext": + return LongText, nil, nil case "enum": return Enum, nil, nil case "set": @@ -184,6 +199,7 @@ func parseColumnDataType(s string) (DataType, *Opts, error) { case "json": return JSON, nil, nil default: + slog.Warn("Unknown data type", slog.String("type", s)) return InvalidDataType, nil, nil } } diff --git a/lib/mysql/schema/schema_test.go b/lib/mysql/schema/schema_test.go index e1f1f326..b7c86b8d 100644 --- a/lib/mysql/schema/schema_test.go +++ b/lib/mysql/schema/schema_test.go @@ -26,6 +26,10 @@ func TestParseColumnDataType(t *testing.T) { input: "int", expectedType: Int, }, + { + input: "tinyint(1)", + expectedType: Boolean, + }, { input: "varchar(255)", expectedType: Varchar, diff --git a/sources/mysql/adapter/adapter.go b/sources/mysql/adapter/adapter.go index ad91238c..a0254309 100644 --- a/sources/mysql/adapter/adapter.go +++ b/sources/mysql/adapter/adapter.go @@ -63,7 +63,7 @@ func (m mysqlAdapter) ConvertRowToDebezium(row map[string]any) (map[string]any, func valueConverterForType(d schema.DataType, opts *schema.Opts) (converters.ValueConverter, error) { switch d { - case schema.Bit: + case schema.Bit, schema.Boolean: return converters.BooleanPassthrough{}, nil case schema.TinyInt, schema.SmallInt: return converters.Int16Passthrough{}, nil @@ -76,8 +76,12 @@ func valueConverterForType(d schema.DataType, opts *schema.Opts) (converters.Val case schema.Double: return converters.DoublePassthrough{}, nil case schema.Decimal: - return converters.NewDecimalConverter(opts.Scale, opts.Precision), nil - case schema.Char, schema.Text, schema.Varchar: + if opts.Scale == nil { + return nil, fmt.Errorf("scale is required for decimal type") + } + + return converters.NewDecimalConverter(*opts.Scale, opts.Precision), nil + case schema.Char, schema.Text, schema.Varchar, schema.TinyText, schema.MediumText, schema.LongText: return converters.StringPassthrough{}, nil case schema.Binary, schema.Varbinary, schema.Blob: return converters.BytesPassthrough{}, nil