diff --git a/integration_tests/mysql/main.go b/integration_tests/mysql/main.go index c7eb44f6..a7cbc824 100644 --- a/integration_tests/mysql/main.go +++ b/integration_tests/mysql/main.go @@ -104,7 +104,8 @@ CREATE TABLE %s ( c_text TEXT, c_enum ENUM('x-small', 'small', 'medium', 'large', 'x-large'), c_set SET('one', 'two', 'three'), - c_json JSON + c_json JSON, + c_point POINT ) ` @@ -171,7 +172,9 @@ INSERT INTO %s VALUES ( -- c_set 'one,two', -- c_json - '{"key1": "value1", "key2": "value2"}' + '{"key1": "value1", "key2": "value2"}', + -- c_point + POINT(12.34, 56.78) ) ` @@ -435,6 +438,14 @@ const expectedPayloadTemplate = `{ "field": "c_json", "name": "io.debezium.data.Json", "parameters": null + }, + { + "type": "struct", + "optional": false, + "default": null, + "field": "c_point", + "name": "io.debezium.data.geometry.Point", + "parameters": null } ], "optional": false, @@ -464,6 +475,10 @@ const expectedPayloadTemplate = `{ "c_mediumint": 3, "c_mediumint_unsigned": 4, "c_numeric": "AN3M", + "c_point": { + "x": 12.34, + "y": 56.78 + }, "c_set": "one,two", "c_smallint": 2, "c_smallint_unsigned": 3, diff --git a/lib/mysql/schema/convert.go b/lib/mysql/schema/convert.go index 3cfafe7f..66208477 100644 --- a/lib/mysql/schema/convert.go +++ b/lib/mysql/schema/convert.go @@ -1,7 +1,9 @@ package schema import ( + "encoding/binary" "fmt" + "math" "time" ) @@ -112,6 +114,33 @@ func ConvertValue(value any, colType DataType) (any, error) { default: return nil, fmt.Errorf("expected []byte got %T for value: %v", value, value) } + case Point: + bytes, ok := value.([]byte) + if !ok { + return nil, fmt.Errorf("expected []byte got %T for value: %v", value, value) + } + + // Byte format is https://dev.mysql.com/doc/refman/8.4/en/gis-data-formats.html#:~:text=the%20OpenGIS%20specification.-,Internal%20Geometry%20Storage%20Format,-MySQL%20stores%20geometry + if len(bytes) != 25 { + return nil, fmt.Errorf("expected []byte with length 25, length is %d", len(bytes)) + } + + if srid := binary.LittleEndian.Uint32(bytes[0:4]); srid != 0 { + return nil, fmt.Errorf("expected SRID to be 0, SRID is %d", srid) + } + + if byteOrder := bytes[5]; byteOrder != 1 { + return nil, fmt.Errorf("expected byte order to be 1 (little-endian), byte order is %d", byteOrder) + } + + if integerType := binary.LittleEndian.Uint32(bytes[5:9]); integerType != 1 { + return nil, fmt.Errorf("expected integer type 1 (POINT), got %d", integerType) + } + + return map[string]any{ + "x": math.Float64frombits(binary.LittleEndian.Uint64(bytes[9:17])), + "y": math.Float64frombits(binary.LittleEndian.Uint64(bytes[17:25])), + }, nil } return nil, fmt.Errorf("could not convert DataType(%d) %T value: %v", colType, value, value) diff --git a/lib/mysql/schema/convert_test.go b/lib/mysql/schema/convert_test.go index 195d9acc..6c2f3b19 100644 --- a/lib/mysql/schema/convert_test.go +++ b/lib/mysql/schema/convert_test.go @@ -1,12 +1,21 @@ package schema import ( + "encoding/base64" "testing" "time" "github.com/stretchr/testify/assert" ) +func mustDecodeBase64(value string) []byte { + result, err := base64.StdEncoding.DecodeString(value) + if err != nil { + panic(err) + } + return result +} + func TestConvertValue(t *testing.T) { tests := []struct { name string @@ -249,6 +258,24 @@ func TestConvertValue(t *testing.T) { value: []byte(`{"foo": "bar", "baz": "1234"}`), expected: `{"foo": "bar", "baz": "1234"}`, }, + { + name: "point - zero values", + dataType: Point, + value: mustDecodeBase64("AAAAAAEBAAAAAAAAAAAAAAAAAAAAAAAAAA=="), + expected: map[string]any{"x": 0.0, "y": 0.0}, + }, + { + name: "point - positive values", + dataType: Point, + value: mustDecodeBase64("AAAAAAEBAAAArkfhehSuKECkcD0K12NMQA=="), + expected: map[string]any{"x": 12.34, "y": 56.78}, + }, + { + name: "point - negative values", + dataType: Point, + value: mustDecodeBase64("AAAAAAEBAAAASOF6FK5IocDD9ShcjzmqwA=="), + expected: map[string]any{"x": -2212.34, "y": -3356.78}, + }, } for _, tc := range tests { diff --git a/lib/mysql/schema/schema.go b/lib/mysql/schema/schema.go index cd5806af..e7dde779 100644 --- a/lib/mysql/schema/schema.go +++ b/lib/mysql/schema/schema.go @@ -52,6 +52,8 @@ const ( Set // JSON JSON + // Misc + Point ) type Opts struct { @@ -213,6 +215,8 @@ func parseColumnDataType(originalS string) (DataType, *Opts, error) { return Set, nil, nil case "json": return JSON, nil, nil + case "point": + return Point, nil, nil default: return -1, nil, fmt.Errorf("unknown data type: %q", originalS) } diff --git a/sources/mysql/adapter/adapter.go b/sources/mysql/adapter/adapter.go index c11748d1..427d5656 100644 --- a/sources/mysql/adapter/adapter.go +++ b/sources/mysql/adapter/adapter.go @@ -121,6 +121,8 @@ func valueConverterForType(d schema.DataType, opts *schema.Opts) (converters.Val return converters.EnumSetConverter{}, nil case schema.JSON: return converters.JSONConverter{}, nil + case schema.Point: + return converters.NewPointConverter(), nil } return nil, fmt.Errorf("unable get value converter for DataType(%d)", d) }