Skip to content

Commit

Permalink
[debezium] bytes type for org.apache.kafka.connect.data.Decimal (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Mar 17, 2024
1 parent da57cad commit 709c91c
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 5 deletions.
4 changes: 2 additions & 2 deletions integration_tests/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ const expectedPayloadTemplate = `{
"parameters": null
},
{
"type": "",
"type": "bytes",
"optional": false,
"default": null,
"field": "c_decimal",
Expand All @@ -228,7 +228,7 @@ const expectedPayloadTemplate = `{
}
},
{
"type": "",
"type": "bytes",
"optional": false,
"default": null,
"field": "c_numeric",
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ const expectedPayloadTemplate = `{
"parameters": null
},
{
"type": "",
"type": "bytes",
"optional": false,
"default": null,
"field": "c_money",
Expand All @@ -439,7 +439,7 @@ const expectedPayloadTemplate = `{
}
},
{
"type": "",
"type": "bytes",
"optional": false,
"default": null,
"field": "c_numeric",
Expand Down
1 change: 1 addition & 0 deletions lib/debezium/converters/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func NewDecimalConverter(scale int, precision *int) decimalConverter {
func (d decimalConverter) ToField(name string) transferDBZ.Field {
field := transferDBZ.Field{
FieldName: name,
Type: "bytes",
DebeziumType: string(transferDBZ.KafkaDecimalType),
Parameters: map[string]any{
"scale": fmt.Sprint(d.scale),
Expand Down
2 changes: 2 additions & 0 deletions lib/debezium/converters/decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestDecimalConverter_ToField(t *testing.T) {
// Without precision
converter := NewDecimalConverter(2, nil)
expected := debezium.Field{
Type: "bytes",
FieldName: "col",
DebeziumType: "org.apache.kafka.connect.data.Decimal",
Parameters: map[string]any{
Expand All @@ -27,6 +28,7 @@ func TestDecimalConverter_ToField(t *testing.T) {
// With precision
converter := NewDecimalConverter(2, ptr.ToInt(3))
expected := debezium.Field{
Type: "bytes",
FieldName: "col",
DebeziumType: "org.apache.kafka.connect.data.Decimal",
Parameters: map[string]any{
Expand Down
1 change: 1 addition & 0 deletions sources/mysql/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func TestValueConverterForType(t *testing.T) {
Precision: ptr.ToInt(5),
},
expected: debezium.Field{
Type: "bytes",
DebeziumType: "org.apache.kafka.connect.data.Decimal",
FieldName: colName,
Parameters: map[string]interface{}{
Expand Down
2 changes: 1 addition & 1 deletion sources/postgres/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestValueConverterForType_ToField(t *testing.T) {
Precision: 10,
},
expected: debezium.Field{
Type: "",
Type: "bytes",
FieldName: "numeric_col",
DebeziumType: string(debezium.KafkaDecimalType),
Parameters: map[string]any{"scale": "2", "connect.decimal.precision": "10"},
Expand Down
1 change: 1 addition & 0 deletions sources/postgres/adapter/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type MoneyConverter struct{}
func (MoneyConverter) ToField(name string) transferDbz.Field {
return transferDbz.Field{
FieldName: name,
Type: "bytes",
DebeziumType: string(transferDbz.KafkaDecimalType),
Parameters: map[string]any{
"scale": fmt.Sprint(moneyScale),
Expand Down
1 change: 1 addition & 0 deletions sources/postgres/adapter/converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func TestMoneyConverter_ToField(t *testing.T) {
converter := MoneyConverter{}
expected := transferDbz.Field{
FieldName: "col",
Type: "bytes",
DebeziumType: "org.apache.kafka.connect.data.Decimal",
Parameters: map[string]any{
"scale": "2",
Expand Down

0 comments on commit 709c91c

Please sign in to comment.