From 7be99a3ba3a8416ccbcf9847c1361003b850db0a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 9 Dec 2024 10:44:03 -0500 Subject: [PATCH] Get N diffs working Signed-off-by: Matt Lord --- go/mysql/binlog/binlog_json.go | 92 +++++++------- go/mysql/binlog_event_mysql56_test.go | 172 ++++++++++++++------------ go/mysql/binlog_event_rbr.go | 3 + 3 files changed, 143 insertions(+), 124 deletions(-) diff --git a/go/mysql/binlog/binlog_json.go b/go/mysql/binlog/binlog_json.go index 5a32365fcc9..6f02456e495 100644 --- a/go/mysql/binlog/binlog_json.go +++ b/go/mysql/binlog/binlog_json.go @@ -74,53 +74,61 @@ func ParseBinaryJSON(data []byte) (*json.Value, error) { // ParseBinaryJSONDiff provides the parsing function from the MySQL JSON // diff representation to an SQL expression. func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { - pos := 0 - opType := jsonDiffOp(data[pos]) - pos++ diff := bytes.Buffer{} + diff.Grow(int(float32(len(data)) * 1.5)) + pos := 0 + outer := false + innerStr := "" - switch opType { - case jsonDiffOpReplace: - diff.WriteString("JSON_REPLACE(") - case jsonDiffOpInsert: - diff.WriteString("JSON_INSERT(") - case jsonDiffOpRemove: - diff.WriteString("JSON_REMOVE(") - } - diff.WriteString("%s, ") // This will later be replaced by the field name - - pathLen, readTo, ok := readLenEncInt(data, pos) - if !ok { - return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff path length") - } - pos = readTo - - path := data[pos : uint64(pos)+pathLen] - pos += int(pathLen) - // We have to specify the unicode character set for the strings we - // use in the expression as the connection can be using a different - // character set (e.g. vreplication always uses set names binary). - diff.WriteString(fmt.Sprintf("_utf8mb4'%s', ", path)) - - if opType == jsonDiffOpRemove { // No value for remove - diff.WriteString(")") - return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil - } + for pos < len(data) { + if outer { + innerStr = diff.String() + diff.Reset() + } + opType := jsonDiffOp(data[pos]) + pos++ + switch opType { + case jsonDiffOpReplace: + diff.WriteString("JSON_REPLACE(") + case jsonDiffOpInsert: + diff.WriteString("JSON_INSERT(") + case jsonDiffOpRemove: + diff.WriteString("JSON_REMOVE(") + } + if outer { + diff.WriteString(innerStr) + diff.WriteString(", ") + } else { // Only the inner most function has the field name + diff.WriteString("%s, ") // This will later be replaced by the field name + } - valueLen, readTo, ok := readLenEncInt(data, pos) - if !ok { - return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff path length") - } - pos = readTo + pathLen, readTo := readVariableLength(data, pos) + pos = readTo + path := data[pos : pos+pathLen] + pos += pathLen + // We have to specify the unicode character set for the strings we + // use in the expression as the connection can be using a different + // character set (e.g. vreplication always uses set names binary). + diff.WriteString(fmt.Sprintf("_utf8mb4'%s'", path)) + + if opType == jsonDiffOpRemove { // No value for remove + diff.WriteString(")") + } else { + valueLen, readTo := readVariableLength(data, pos) + pos = readTo + value, err := ParseBinaryJSON(data[pos : pos+valueLen]) + if err != nil { + return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff value for path %s: %w", path, err) + } + pos += valueLen + if value.Type() == json.TypeString { + diff.WriteString(", _utf8mb4") + } + diff.WriteString(fmt.Sprintf("%s)", value)) + } - value, err := ParseBinaryJSON(data[pos : uint64(pos)+valueLen]) - if err != nil { - return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff value for path %s: %w", path, err) - } - if value.Type() == json.TypeString { - diff.WriteString("_utf8mb4") + outer = true } - diff.WriteString(fmt.Sprintf("%s)", value)) return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil } diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go index 4a61da04a20..b7725e69691 100644 --- a/go/mysql/binlog_event_mysql56_test.go +++ b/go/mysql/binlog_event_mysql56_test.go @@ -251,78 +251,6 @@ func TestMysql56SemiSyncAck(t *testing.T) { } func TestMySQL56PartialUpdateRowsEvent(t *testing.T) { - // The mysqlbinlog -vvv --base64-output=decode-rows output for the following event: - // ### UPDATE `vt_commerce`.`customer` - // ### WHERE - // ### @1=1 /* LONGINT meta=0 nullable=0 is_null=0 */ - // ### @2='alice@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ - // ### @3='{"salary": 100}' /* JSON meta=4 nullable=1 is_null=0 */ - // ### SET - // ### @1=1 /* LONGINT meta=0 nullable=0 is_null=0 */ - // ### @2='alice@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ - // ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */ - // ### UPDATE `vt_commerce`.`customer` - // ### WHERE - // ### @1=2 /* LONGINT meta=0 nullable=0 is_null=0 */ - // ### @2='bob@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ - // ### @3='{"salary": 99}' /* JSON meta=4 nullable=1 is_null=0 */ - // ### SET - // ### @1=2 /* LONGINT meta=0 nullable=0 is_null=0 */ - // ### @2='bob@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ - // ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */ - // ### UPDATE `vt_commerce`.`customer` - // ### WHERE - // ### @1=3 /* LONGINT meta=0 nullable=0 is_null=0 */ - // ### @2='charlie@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ - // ### @3='{"salary": 99}' /* JSON meta=4 nullable=1 is_null=0 */ - // ### SET - // ### @1=3 /* LONGINT meta=0 nullable=0 is_null=0 */ - // ### @2='charlie@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ - // ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */ - // ### UPDATE `vt_commerce`.`customer` - // ### WHERE - // ### @1=4 /* LONGINT meta=0 nullable=0 is_null=0 */ - // ### @2='dan@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ - // ### @3='{"salary": 99}' /* JSON meta=4 nullable=1 is_null=0 */ - // ### SET - // ### @1=4 /* LONGINT meta=0 nullable=0 is_null=0 */ - // ### @2='dan@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ - // ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */ - // ### UPDATE `vt_commerce`.`customer` - // ### WHERE - // ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */ - // ### @2='eve@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ - // ### @3='{"salary": 100}' /* JSON meta=4 nullable=1 is_null=0 */ - // ### SET - // ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */ - // ### @2='eve@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ - // ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */ - mysql56PartialUpdateRowEvent := NewMysql56BinlogEvent([]byte{0x67, 0xc4, 0x54, 0x67, 0x27, 0x1f, 0x91, 0x10, 0x76, 0x14, 0x02, 0x00, 0x00, 0x19, 0x69, - 0x00, 0x00, 0x00, 0x00, 0xb0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02, 0x00, 0x03, 0xff, 0xff, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x10, 0x61, 0x6c, 0x69, 0x63, 0x65, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x16, 0x00, 0x00, 0x00, 0x00, - 0x01, 0x00, 0x15, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, 0x65, 0x78, 0x06, 0x66, 0x65, 0x6d, 0x61, 0x6c, 0x65, 0x01, 0x01, 0x00, - 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x61, 0x6c, 0x69, 0x63, 0x65, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, - 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00, - 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x62, 0x6f, 0x62, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x14, - 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x13, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, 0x65, 0x78, 0x04, 0x6d, 0x61, 0x6c, 0x65, 0x01, - 0x01, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x62, 0x6f, 0x62, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, - 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00, - 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x63, 0x68, 0x61, 0x72, 0x6c, 0x69, 0x65, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, - 0x63, 0x6f, 0x6d, 0x14, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x13, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, 0x65, 0x78, 0x04, 0x6d, - 0x61, 0x6c, 0x65, 0x01, 0x01, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x63, 0x68, 0x61, 0x72, 0x6c, 0x69, 0x65, 0x40, 0x64, - 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07, - 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x64, 0x61, 0x6e, 0x40, 0x64, 0x6f, 0x6d, - 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x14, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x13, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, - 0x65, 0x78, 0x04, 0x6d, 0x61, 0x6c, 0x65, 0x01, 0x01, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x64, 0x61, 0x6e, 0x40, 0x64, - 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07, - 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x65, 0x76, 0x65, 0x40, 0x64, 0x6f, 0x6d, - 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x16, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x15, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, - 0x65, 0x78, 0x06, 0x66, 0x65, 0x6d, 0x61, 0x6c, 0x65, 0x01, 0x01, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x65, 0x76, 0x65, - 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, - 0x0c, 0x07, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72}) - - require.True(t, mysql56PartialUpdateRowEvent.IsPartialUpdateRows()) - format := BinlogFormat{ HeaderSizes: []byte{ 0, 13, 0, 8, 0, 0, 0, 0, 4, 0, 4, 0, 0, 0, 98, 0, 4, 26, 8, 0, 0, 0, 8, 8, 8, 2, 0, 0, 0, 10, 10, 10, 42, 42, 0, 18, 52, 0, 10, 40, 0, @@ -344,16 +272,96 @@ func TestMySQL56PartialUpdateRowsEvent(t *testing.T) { Metadata: []uint16{0, 128, 4}, ColumnCollationIDs: []collations.ID{63}, } - ev, err := mysql56PartialUpdateRowEvent.Rows(format, tm) - require.NoError(t, err) - assert.Equal(t, 5, len(ev.Rows)) - require.NoError(t, err) - for i := range ev.Rows { - vals, err := ev.StringValuesForTests(tm, i) - require.NoError(t, err) - // The third column is the JSON column. - require.Equal(t, `JSON_INSERT(%s, _utf8mb4'$.role', _utf8mb4"manager")`, vals[2]) - t.Logf("Rows: %v", vals) + testCases := []struct { + name string + rawEvent []byte + want string + }{ + { + name: "REMOVE and REPLACE", + // The mysqlbinlog -vvv --base64-output=decode-rows output for the following event: + // ### UPDATE `vt_commerce`.`customer` + // ### WHERE + // ### @1=1 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='alice@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3='{"day": "friday", "role": "manager", "color": "red", "salary": 100, "favorite_color": "black"}' /* JSON meta=4 nullable=1 is_null=0 */ + // ### SET + // ### @1=1 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='alice@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3=JSON_REMOVE( + // ### JSON_REPLACE(@3, '$.day', 'monday'), + // ### '$.favorite_color') /* JSON meta=4 nullable=1 is_null=0 */ + // ### UPDATE `vt_commerce`.`customer` + // ### WHERE + // ### @1=2 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='bob@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3='{"day": "friday", "role": "manager", "color": "red", "salary": 99, "favorite_color": "black"}' /* JSON meta=4 nullable=1 is_null=0 */ + // ### SET + // ### @1=2 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='bob@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3=JSON_REMOVE( + // ### JSON_REPLACE(@3, '$.day', 'monday'), + // ### '$.favorite_color') /* JSON meta=4 nullable=1 is_null=0 */ + // ### UPDATE `vt_commerce`.`customer` + // ### WHERE + // ### @1=3 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='charlie@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3='{"day": "friday", "role": "manager", "color": "red", "salary": 99, "favorite_color": "black"}' /* JSON meta=4 nullable=1 is_null=0 */ + // ### SET + // ### @1=3 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='charlie@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3=JSON_REMOVE( + // ### JSON_REPLACE(@3, '$.day', 'monday'), + // ### '$.favorite_color') /* JSON meta=4 nullable=1 is_null=0 */ + // ### UPDATE `vt_commerce`.`customer` + // ### WHERE + // ### @1=4 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='dan@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3='{"day": "friday", "role": "manager", "color": "red", "salary": 99, "favorite_color": "black"}' /* JSON meta=4 nullable=1 is_null=0 */ + // ### SET + // ### @1=4 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='dan@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3=JSON_REMOVE( + // ### JSON_REPLACE(@3, '$.day', 'monday'), + // ### '$.favorite_color') /* JSON meta=4 nullable=1 is_null=0 */ + // ### UPDATE `vt_commerce`.`customer` + // ### WHERE + // ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='eve@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3='{"day": "friday", "role": "manager", "color": "red", "salary": 100, "favorite_color": "black"}' /* JSON meta=4 nullable=1 is_null=0 */ + // ### SET + // ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='eve@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3=JSON_REMOVE( + // ### JSON_REPLACE(@3, '$.day', 'monday'), + // ### '$.favorite_color') /* JSON meta=4 nullable=1 is_null=0 */ + rawEvent: []byte{ + 227, 240, 86, 103, 39, 74, 58, 208, 33, 225, 3, 0, 0, 173, 122, 0, 0, 0, 0, 176, 0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 255, 255, 0, 1, 0, 0, 0, 0, + 0, 0, 0, 16, 97, 108, 105, 99, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 97, 0, 0, 0, 0, 5, 0, 96, 0, 39, 0, 3, 0, 42, 0, 4, + 0, 46, 0, 5, 0, 51, 0, 6, 0, 57, 0, 14, 0, 12, 71, 0, 12, 78, 0, 12, 86, 0, 5, 100, 0, 12, 90, 0, 100, 97, 121, 114, 111, 108, 101, 99, 111, 108, 111, 114, 115, 97, 108, 97, 114, 121, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 6, 102, 114, 105, 100, 97, 121, 7, 109, 97, 110, 97, 103, 101, 114, 3, 114, 101, 100, 5, 98, 108, 97, 99, 107, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 16, 97, 108, 105, 99, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 34, 0, 0, 0, 0, 5, 36, 46, 100, 97, 121, 8, 12, 6, 109, 111, 110, 100, 97, 121, 2, 16, 36, 46, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 0, 2, 0, 0, 0, 0, 0, 0, 0, 14, 98, 111, 98, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 97, 0, 0, 0, 0, 5, 0, 96, 0, 39, 0, 3, 0, 42, 0, 4, 0, 46, 0, 5, 0, 51, 0, 6, 0, 57, 0, 14, 0, 12, 71, 0, 12, 78, 0, 12, 86, 0, 5, 99, 0, 12, 90, 0, 100, 97, 121, 114, 111, 108, 101, 99, 111, 108, 111, 114, 115, 97, 108, 97, 114, 121, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 6, 102, 114, 105, 100, 97, 121, 7, 109, 97, 110, 97, 103, 101, 114, 3, 114, 101, 100, 5, 98, 108, 97, 99, 107, 1, 1, 0, 2, 0, 0, 0, 0, 0, 0, 0, 14, 98, 111, 98, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 34, 0, 0, 0, 0, 5, 36, 46, 100, 97, 121, 8, 12, 6, 109, 111, 110, 100, 97, 121, 2, 16, 36, 46, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 0, 3, 0, 0, 0, 0, 0, 0, 0, 18, 99, 104, 97, 114, 108, 105, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 97, 0, 0, 0, 0, 5, 0, 96, 0, 39, 0, 3, 0, 42, 0, 4, 0, 46, 0, 5, 0, 51, 0, 6, 0, 57, 0, 14, 0, 12, 71, 0, 12, 78, 0, 12, 86, 0, 5, 99, 0, 12, 90, 0, 100, 97, 121, 114, 111, 108, 101, 99, 111, 108, 111, 114, 115, 97, 108, 97, 114, 121, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 6, 102, 114, 105, 100, 97, 121, 7, 109, 97, 110, 97, 103, 101, 114, 3, 114, 101, 100, 5, 98, 108, 97, 99, 107, 1, 1, 0, 3, 0, 0, 0, 0, 0, 0, 0, 18, 99, 104, 97, 114, 108, 105, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 34, 0, 0, 0, 0, 5, 36, 46, 100, 97, 121, 8, 12, 6, 109, 111, 110, 100, 97, 121, 2, 16, 36, 46, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 0, 4, 0, 0, 0, 0, 0, 0, 0, 14, 100, 97, 110, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 97, 0, 0, 0, 0, 5, 0, 96, 0, 39, 0, 3, 0, 42, 0, 4, 0, 46, 0, 5, 0, 51, 0, 6, 0, 57, 0, 14, 0, 12, 71, 0, 12, 78, 0, 12, 86, 0, 5, 99, 0, 12, 90, 0, 100, 97, 121, 114, 111, 108, 101, 99, 111, 108, 111, 114, 115, 97, 108, 97, 114, 121, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 6, 102, 114, 105, 100, 97, 121, 7, 109, 97, 110, 97, 103, 101, 114, 3, 114, 101, 100, 5, 98, 108, 97, 99, 107, 1, 1, 0, 4, 0, 0, 0, 0, 0, 0, 0, 14, 100, 97, 110, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 34, 0, 0, 0, 0, 5, 36, 46, 100, 97, 121, 8, 12, 6, 109, 111, 110, 100, 97, 121, 2, 16, 36, 46, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 0, 5, 0, 0, 0, 0, 0, 0, 0, 14, 101, 118, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 97, 0, 0, 0, 0, 5, 0, 96, 0, 39, 0, 3, 0, 42, 0, 4, 0, 46, 0, 5, 0, 51, 0, 6, 0, 57, 0, 14, 0, 12, 71, 0, 12, 78, 0, 12, 86, 0, 5, 100, 0, 12, 90, 0, 100, 97, 121, 114, 111, 108, 101, 99, 111, 108, 111, 114, 115, 97, 108, 97, 114, 121, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 6, 102, 114, 105, 100, 97, 121, 7, 109, 97, 110, 97, 103, 101, 114, 3, 114, 101, 100, 5, 98, 108, 97, 99, 107, 1, 1, 0, 5, 0, 0, 0, 0, 0, 0, 0, 14, 101, 118, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 34, 0, 0, 0, 0, 5, 36, 46, 100, 97, 121, 8, 12, 6, 109, 111, 110, 100, 97, 121, 2, 16, 36, 46, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, + }, + want: "JSON_REMOVE(JSON_REPLACE(%s, _utf8mb4'$.day', _utf8mb4\"monday\"), _utf8mb4'$.favorite_color')", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mysql56PartialUpdateRowEvent := NewMysql56BinlogEvent(tc.rawEvent) + require.True(t, mysql56PartialUpdateRowEvent.IsPartialUpdateRows()) + + ev, err := mysql56PartialUpdateRowEvent.Rows(format, tm) + require.NoError(t, err) + + assert.Equal(t, 5, len(ev.Rows)) + require.NoError(t, err) + for i := range ev.Rows { + vals, err := ev.StringValuesForTests(tm, i) + require.NoError(t, err) + // The third column is the JSON column. + require.Equal(t, `JSON_REMOVE(JSON_REPLACE(%s, _utf8mb4'$.day', _utf8mb4"monday"), _utf8mb4'$.favorite_color')`, vals[2]) + t.Logf("Rows: %v", vals) + } + }) } } diff --git a/go/mysql/binlog_event_rbr.go b/go/mysql/binlog_event_rbr.go index 0e0ad0c458f..259d0441751 100644 --- a/go/mysql/binlog_event_rbr.go +++ b/go/mysql/binlog_event_rbr.go @@ -287,6 +287,9 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) { typ == eDeleteRowsEventV1 || typ == eDeleteRowsEventV2 hasData := typ == eWriteRowsEventV1 || typ == eWriteRowsEventV2 || typ == eUpdateRowsEventV1 || typ == ePartialUpdateRowsEvent || typ == eUpdateRowsEventV2 + //if typ == ePartialUpdateRowsEvent { + // log.Errorf("DEBUG: PartialUpdateRowsEvent bytes: %v", ev.Bytes()) + //} result := Rows{} pos := 6