diff --git a/integration_tests/mysql/main.go b/integration_tests/mysql/main.go index 2051231a..20c02197 100644 --- a/integration_tests/mysql/main.go +++ b/integration_tests/mysql/main.go @@ -217,7 +217,7 @@ const expectedPayloadTemplate = `{ "parameters": null }, { - "type": "", + "type": "bytes", "optional": false, "default": null, "field": "c_decimal", @@ -228,7 +228,7 @@ const expectedPayloadTemplate = `{ } }, { - "type": "", + "type": "bytes", "optional": false, "default": null, "field": "c_numeric", diff --git a/integration_tests/postgres/main.go b/integration_tests/postgres/main.go index ab0cea94..3d9d2123 100644 --- a/integration_tests/postgres/main.go +++ b/integration_tests/postgres/main.go @@ -429,7 +429,7 @@ const expectedPayloadTemplate = `{ "parameters": null }, { - "type": "", + "type": "bytes", "optional": false, "default": null, "field": "c_money", @@ -439,7 +439,7 @@ const expectedPayloadTemplate = `{ } }, { - "type": "", + "type": "bytes", "optional": false, "default": null, "field": "c_numeric", diff --git a/lib/debezium/converters/decimal.go b/lib/debezium/converters/decimal.go index 3fb27c70..186d86f5 100644 --- a/lib/debezium/converters/decimal.go +++ b/lib/debezium/converters/decimal.go @@ -20,6 +20,7 @@ func NewDecimalConverter(scale int, precision *int) decimalConverter { func (d decimalConverter) ToField(name string) transferDBZ.Field { field := transferDBZ.Field{ FieldName: name, + Type: "bytes", DebeziumType: string(transferDBZ.KafkaDecimalType), Parameters: map[string]any{ "scale": fmt.Sprint(d.scale), diff --git a/lib/debezium/converters/decimal_test.go b/lib/debezium/converters/decimal_test.go index e21bae88..a0617897 100644 --- a/lib/debezium/converters/decimal_test.go +++ b/lib/debezium/converters/decimal_test.go @@ -15,6 +15,7 @@ func TestDecimalConverter_ToField(t *testing.T) { // Without precision converter := NewDecimalConverter(2, nil) expected := debezium.Field{ + Type: "bytes", FieldName: "col", DebeziumType: "org.apache.kafka.connect.data.Decimal", Parameters: map[string]any{ @@ -27,6 +28,7 @@ func TestDecimalConverter_ToField(t *testing.T) { // With precision converter := NewDecimalConverter(2, ptr.ToInt(3)) expected := debezium.Field{ + Type: "bytes", FieldName: "col", DebeziumType: "org.apache.kafka.connect.data.Decimal", Parameters: map[string]any{ diff --git a/sources/mysql/adapter/adapter_test.go b/sources/mysql/adapter/adapter_test.go index e13d0355..7249789a 100644 --- a/sources/mysql/adapter/adapter_test.go +++ b/sources/mysql/adapter/adapter_test.go @@ -139,6 +139,7 @@ func TestValueConverterForType(t *testing.T) { Precision: ptr.ToInt(5), }, expected: debezium.Field{ + Type: "bytes", DebeziumType: "org.apache.kafka.connect.data.Decimal", FieldName: colName, Parameters: map[string]interface{}{ diff --git a/sources/postgres/adapter/adapter_test.go b/sources/postgres/adapter/adapter_test.go index e4e7586a..27ad7249 100644 --- a/sources/postgres/adapter/adapter_test.go +++ b/sources/postgres/adapter/adapter_test.go @@ -98,7 +98,7 @@ func TestValueConverterForType_ToField(t *testing.T) { Precision: 10, }, expected: debezium.Field{ - Type: "", + Type: "bytes", FieldName: "numeric_col", DebeziumType: string(debezium.KafkaDecimalType), Parameters: map[string]any{"scale": "2", "connect.decimal.precision": "10"}, diff --git a/sources/postgres/adapter/converters.go b/sources/postgres/adapter/converters.go index d9f8b8b1..19fdf753 100644 --- a/sources/postgres/adapter/converters.go +++ b/sources/postgres/adapter/converters.go @@ -19,6 +19,7 @@ type MoneyConverter struct{} func (MoneyConverter) ToField(name string) transferDbz.Field { return transferDbz.Field{ FieldName: name, + Type: "bytes", DebeziumType: string(transferDbz.KafkaDecimalType), Parameters: map[string]any{ "scale": fmt.Sprint(moneyScale), diff --git a/sources/postgres/adapter/converters_test.go b/sources/postgres/adapter/converters_test.go index 1d978922..c136c0e1 100644 --- a/sources/postgres/adapter/converters_test.go +++ b/sources/postgres/adapter/converters_test.go @@ -16,6 +16,7 @@ func TestMoneyConverter_ToField(t *testing.T) { converter := MoneyConverter{} expected := transferDbz.Field{ FieldName: "col", + Type: "bytes", DebeziumType: "org.apache.kafka.connect.data.Decimal", Parameters: map[string]any{ "scale": "2",