diff --git a/go/mysql/binlog/binlog_json.go b/go/mysql/binlog/binlog_json.go index 03bf604fb2d..73146abd06b 100644 --- a/go/mysql/binlog/binlog_json.go +++ b/go/mysql/binlog/binlog_json.go @@ -17,7 +17,9 @@ limitations under the License. package binlog import ( + "bytes" "encoding/binary" + "encoding/hex" "fmt" "math" "strconv" @@ -25,9 +27,12 @@ import ( "vitess.io/vitess/go/hack" "vitess.io/vitess/go/mysql/format" "vitess.io/vitess/go/mysql/json" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vterrors" + querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" ) /* @@ -44,6 +49,14 @@ https://github.com/shyiko/mysql-binlog-connector-java/pull/119/files https://github.com/noplay/python-mysql-replication/blob/175df28cc8b536a68522ff9b09dc5440adad6094/pymysqlreplication/packet.py */ +type jsonDiffOp uint8 + +const ( + jsonDiffOpReplace = jsonDiffOp(0) + jsonDiffOpInsert = jsonDiffOp(1) + jsonDiffOpRemove = jsonDiffOp(2) +) + // ParseBinaryJSON provides the parsing function from the mysql binary json // representation to a JSON value instance. func ParseBinaryJSON(data []byte) (*json.Value, error) { @@ -60,6 +73,62 @@ func ParseBinaryJSON(data []byte) (*json.Value, error) { return node, nil } +// ParseBinaryJSONDiff provides the parsing function from the MySQL JSON +// diff representation to an SQL expression. +func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { + log.Errorf("DEBUG: json diff data hex: %s, string: %s", hex.EncodeToString(data), string(data)) + pos := 0 + opType := jsonDiffOp(data[pos]) + pos++ + diff := bytes.Buffer{} + + switch opType { + case jsonDiffOpReplace: + diff.WriteString("JSON_REPLACE(") + case jsonDiffOpInsert: + diff.WriteString("JSON_INSERT(") + case jsonDiffOpRemove: + diff.WriteString("JSON_REMOVE(") + } + diff.WriteString("%s, ") // This will later be replaced by the field name + + log.Errorf("DEBUG: json diff opType: %d", opType) + + pathLen, readTo, ok := readLenEncInt(data, pos) + if !ok { + return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff path length") + } + pos = readTo + + log.Errorf("DEBUG: json diff path length: %d", pathLen) + + path := data[pos : uint64(pos)+pathLen] + pos += int(pathLen) + log.Errorf("DEBUG: json diff path: %s", string(path)) + diff.WriteString(fmt.Sprintf("'%s', ", path)) + + if opType == jsonDiffOpRemove { // No value for remove + diff.WriteString(")") + return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil + } + + valueLen, readTo, ok := readLenEncInt(data, pos) + if !ok { + return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff path length") + } + pos = readTo + log.Errorf("DEBUG: json diff value length: %d", valueLen) + + value, err := ParseBinaryJSON(data[pos : uint64(pos)+valueLen]) + if err != nil { + return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff value for path %s: %w", path, err) + } + log.Errorf("DEBUG: json diff value: %v", value) + diff.WriteString(fmt.Sprintf("%s)", value)) + + return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil +} + // jsonDataType has the values used in the mysql json binary representation to denote types. // We have string, literal(true/false/null), number, object or array types. // large object => doc size > 64K: you get pointers instead of inline values. @@ -315,7 +384,7 @@ func binparserOpaque(_ jsonDataType, data []byte, pos int) (node *json.Value, er precision := decimalData[0] scale := decimalData[1] metadata := (uint16(precision) << 8) + uint16(scale) - val, _, err := CellValue(decimalData, 2, TypeNewDecimal, metadata, &querypb.Field{Type: querypb.Type_DECIMAL}) + val, _, err := CellValue(decimalData, 2, TypeNewDecimal, metadata, &querypb.Field{Type: querypb.Type_DECIMAL}, false) if err != nil { return nil, err } @@ -394,3 +463,45 @@ func binparserObject(typ jsonDataType, data []byte, pos int) (node *json.Value, return json.NewObject(object), nil } + +func readLenEncInt(data []byte, pos int) (uint64, int, bool) { + if pos >= len(data) { + return 0, 0, false + } + + // reslice to avoid arithmetic below + data = data[pos:] + + switch data[0] { + case 0xfc: + // Encoded in the next 2 bytes. + if 2 >= len(data) { + return 0, 0, false + } + return uint64(data[1]) | + uint64(data[2])<<8, pos + 3, true + case 0xfd: + // Encoded in the next 3 bytes. + if 3 >= len(data) { + return 0, 0, false + } + return uint64(data[1]) | + uint64(data[2])<<8 | + uint64(data[3])<<16, pos + 4, true + case 0xfe: + // Encoded in the next 8 bytes. + if 8 >= len(data) { + return 0, 0, false + } + return uint64(data[1]) | + uint64(data[2])<<8 | + uint64(data[3])<<16 | + uint64(data[4])<<24 | + uint64(data[5])<<32 | + uint64(data[6])<<40 | + uint64(data[7])<<48 | + uint64(data[8])<<56, pos + 9, true + default: + return uint64(data[0]), pos + 1, true + } +} diff --git a/go/mysql/binlog/rbr.go b/go/mysql/binlog/rbr.go index 8b95b0daee9..9f92c18c530 100644 --- a/go/mysql/binlog/rbr.go +++ b/go/mysql/binlog/rbr.go @@ -26,9 +26,11 @@ import ( "time" "vitess.io/vitess/go/sqltypes" - querypb "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vterrors" + + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // ZeroTimestamp is the special value 0 for a timestamp. @@ -130,7 +132,7 @@ func CellLength(data []byte, pos int, typ byte, metadata uint16) (int, error) { uint32(data[pos+2])<<16| uint32(data[pos+3])<<24), nil default: - return 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unsupported blob/geometry metadata value %v (data: %v pos: %v)", metadata, data, pos) + return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported blob/geometry metadata value %v (data: %v pos: %v)", metadata, data, pos) } case TypeString: // This may do String, Enum, and Set. The type is in @@ -151,7 +153,7 @@ func CellLength(data []byte, pos int, typ byte, metadata uint16) (int, error) { return l + 1, nil default: - return 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unsupported type %v (data: %v pos: %v)", typ, data, pos) + return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported type %v (data: %v pos: %v)", typ, data, pos) } } @@ -176,7 +178,7 @@ func printTimestamp(v uint32) *bytes.Buffer { // byte to determine general shared aspects of types and the querypb.Field to // determine other info specifically about its underlying column (SQL column // type, column length, charset, etc) -func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.Field) (sqltypes.Value, int, error) { +func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.Field, partialJSON bool) (sqltypes.Value, int, error) { switch typ { case TypeTiny: if sqltypes.IsSigned(field.Type) { @@ -644,7 +646,7 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.F return sqltypes.MakeTrusted(querypb.Type_ENUM, strconv.AppendUint(nil, uint64(val), 10)), 2, nil default: - return sqltypes.NULL, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected enum size: %v", metadata&0xff) + return sqltypes.NULL, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected enum size: %v", metadata&0xff) } case TypeSet: @@ -672,21 +674,29 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.F uint32(data[pos+2])<<16 | uint32(data[pos+3])<<24) default: - return sqltypes.NULL, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unsupported blob metadata value %v (data: %v pos: %v)", metadata, data, pos) + return sqltypes.NULL, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported blob metadata value %v (data: %v pos: %v)", metadata, data, pos) } pos += int(metadata) // For JSON, we parse the data, and emit SQL. if typ == TypeJSON { - var err error jsonData := data[pos : pos+l] - jsonVal, err := ParseBinaryJSON(jsonData) - if err != nil { - panic(err) + if partialJSON { + log.Errorf("DEBUG: partialJSON cell value: %s", string(jsonData)) + val, err := ParseBinaryJSONDiff(jsonData) + if err != nil { + panic(err) + } + log.Errorf("DEBUG: decoded partialJSON cell value: %v", val) + return val, l + int(metadata), nil + } else { + jsonVal, err := ParseBinaryJSON(jsonData) + if err != nil { + panic(err) + } + jd := jsonVal.MarshalTo(nil) + return sqltypes.MakeTrusted(sqltypes.Expression, jd), l + int(metadata), nil } - d := jsonVal.MarshalTo(nil) - return sqltypes.MakeTrusted(sqltypes.Expression, - d), l + int(metadata), nil } return sqltypes.MakeTrusted(querypb.Type_VARBINARY, @@ -710,7 +720,7 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.F return sqltypes.MakeTrusted(querypb.Type_UINT16, strconv.AppendUint(nil, uint64(val), 10)), 2, nil default: - return sqltypes.NULL, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected enum size: %v", metadata&0xff) + return sqltypes.NULL, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected enum size: %v", metadata&0xff) } } if t == TypeSet { @@ -776,13 +786,13 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.F uint32(data[pos+2])<<16 | uint32(data[pos+3])<<24) default: - return sqltypes.NULL, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unsupported geometry metadata value %v (data: %v pos: %v)", metadata, data, pos) + return sqltypes.NULL, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported geometry metadata value %v (data: %v pos: %v)", metadata, data, pos) } pos += int(metadata) return sqltypes.MakeTrusted(querypb.Type_GEOMETRY, data[pos:pos+l]), l + int(metadata), nil default: - return sqltypes.NULL, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unsupported type %v", typ) + return sqltypes.NULL, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported type %v", typ) } } diff --git a/go/mysql/binlog/rbr_test.go b/go/mysql/binlog/rbr_test.go index 1dfaf90a33e..ce49e587e6c 100644 --- a/go/mysql/binlog/rbr_test.go +++ b/go/mysql/binlog/rbr_test.go @@ -550,7 +550,7 @@ func TestCellLengthAndData(t *testing.T) { } // Test CellValue. - out, l, err := CellValue(padded, 1, tcase.typ, tcase.metadata, &querypb.Field{Type: tcase.styp}) + out, l, err := CellValue(padded, 1, tcase.typ, tcase.metadata, &querypb.Field{Type: tcase.styp}, false) if err != nil || l != len(tcase.data) || out.Type() != tcase.out.Type() || !bytes.Equal(out.Raw(), tcase.out.Raw()) { t.Errorf("testcase cellData(%v,%v) returned unexpected result: %v %v %v, was expecting %v %v \nwant: %s\ngot: %s", tcase.typ, tcase.data, out, l, err, tcase.out, len(tcase.data), tcase.out.Raw(), out.Raw()) diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go index 5d472230d0e..76660f4ee90 100644 --- a/go/mysql/binlog_event.go +++ b/go/mysql/binlog_event.go @@ -86,9 +86,20 @@ type BinlogEvent interface { IsWriteRows() bool // IsUpdateRows returns true if this is a UPDATE_ROWS_EVENT. IsUpdateRows() bool + // IsPartialUpdateRows returs true if a partial JSON update event + // is found. These events are only seen in MySQL 8.0 if the mysqld + // instance has binlog_row_value_options=PARTIAL_JSON set. + IsPartialUpdateRows() bool // IsDeleteRows returns true if this is a DELETE_ROWS_EVENT. IsDeleteRows() bool + // IsPseudo is for custom implementations of GTID. + IsPseudo() bool + + // IsTransactionPayload returns true if a compressed transaction + // payload event is found (binlog_transaction_compression=ON). + IsTransactionPayload() bool + // Timestamp returns the timestamp from the event header. Timestamp() uint32 // ServerID returns the server ID from the event header. @@ -123,8 +134,8 @@ type BinlogEvent interface { TableMap(BinlogFormat) (*TableMap, error) // Rows returns a Rows struct representing data from a // {WRITE,UPDATE,DELETE}_ROWS_EVENT. This is only valid if - // IsWriteRows(), IsUpdateRows(), or IsDeleteRows() returns - // true. + // IsWriteRows(), IsUpdateRows(), IsPartialUpdateRows(), or + // IsDeleteRows() returns true. Rows(BinlogFormat, *TableMap) (Rows, error) // TransactionPayload returns a TransactionPayload type which provides // a GetNextEvent() method to iterate over the events contained within @@ -141,13 +152,6 @@ type BinlogEvent interface { // the same event and a nil checksum. StripChecksum(BinlogFormat) (ev BinlogEvent, checksum []byte, err error) - // IsPseudo is for custom implementations of GTID. - IsPseudo() bool - - // IsTransactionPayload returns true if a compressed transaction - // payload event is found (binlog_transaction_compression=ON). - IsTransactionPayload() bool - // Bytes returns the binary representation of the event Bytes() []byte } @@ -266,6 +270,11 @@ type Row struct { // It is only set for UPDATE and DELETE events. Identify []byte + // If this row represents a PartialUpdateRow event there will be a + // shared-image that sits between the before and after images that + // defines how the JSON column values are represented. + JSONPartialValues Bitmap + // Data is the raw data. // It is only set for WRITE and UPDATE events. Data []byte diff --git a/go/mysql/binlog_event_common.go b/go/mysql/binlog_event_common.go index c95873614f0..548875c44f7 100644 --- a/go/mysql/binlog_event_common.go +++ b/go/mysql/binlog_event_common.go @@ -187,6 +187,11 @@ func (ev binlogEvent) IsUpdateRows() bool { ev.Type() == eUpdateRowsEventV2 } +// IsPartialUpdateRows implements BinlogEvent.IsPartialUpdateRows(). +func (ev binlogEvent) IsPartialUpdateRows() bool { + return ev.Type() == ePartialUpdateRowsEvent +} + // IsDeleteRows implements BinlogEvent.IsDeleteRows(). // We do not support v0. func (ev binlogEvent) IsDeleteRows() bool { diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go index c71c8346964..b7e6ed9e0f2 100644 --- a/go/mysql/binlog_event_filepos.go +++ b/go/mysql/binlog_event_filepos.go @@ -203,6 +203,10 @@ func (ev filePosFakeEvent) IsUpdateRows() bool { return false } +func (ev filePosFakeEvent) IsPartialUpdateRows() bool { + return false +} + func (ev filePosFakeEvent) IsDeleteRows() bool { return false } diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go index 5844779de63..be9ae071ded 100644 --- a/go/mysql/binlog_event_mysql56_test.go +++ b/go/mysql/binlog_event_mysql56_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/replication" ) @@ -248,3 +249,111 @@ func TestMysql56SemiSyncAck(t *testing.T) { assert.True(t, e.IsQuery()) } } + +func TestMySQL56PartialUpdateRowsEvent(t *testing.T) { + // The mysqlbinlog -vvv --base64-output=decode-rows output for the following event: + // ### UPDATE `vt_commerce`.`customer` + // ### WHERE + // ### @1=1 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='alice@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3='{"salary": 100}' /* JSON meta=4 nullable=1 is_null=0 */ + // ### SET + // ### @1=1 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='alice@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */ + // ### UPDATE `vt_commerce`.`customer` + // ### WHERE + // ### @1=2 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='bob@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3='{"salary": 99}' /* JSON meta=4 nullable=1 is_null=0 */ + // ### SET + // ### @1=2 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='bob@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */ + // ### UPDATE `vt_commerce`.`customer` + // ### WHERE + // ### @1=3 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='charlie@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3='{"salary": 99}' /* JSON meta=4 nullable=1 is_null=0 */ + // ### SET + // ### @1=3 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='charlie@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */ + // ### UPDATE `vt_commerce`.`customer` + // ### WHERE + // ### @1=4 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='dan@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3='{"salary": 99}' /* JSON meta=4 nullable=1 is_null=0 */ + // ### SET + // ### @1=4 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='dan@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */ + // ### UPDATE `vt_commerce`.`customer` + // ### WHERE + // ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='eve@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3='{"salary": 100}' /* JSON meta=4 nullable=1 is_null=0 */ + // ### SET + // ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */ + // ### @2='eve@domain.com' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */ + // ### @3=JSON_INSERT(@3, '$.role', 'manager') /* JSON meta=4 nullable=1 is_null=0 */ + mysql56PartialUpdateRowEvent := NewMysql56BinlogEvent([]byte{0x67, 0xc4, 0x54, 0x67, 0x27, 0x1f, 0x91, 0x10, 0x76, 0x14, 0x02, 0x00, 0x00, 0x19, 0x69, + 0x00, 0x00, 0x00, 0x00, 0xb0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02, 0x00, 0x03, 0xff, 0xff, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x10, 0x61, 0x6c, 0x69, 0x63, 0x65, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x16, 0x00, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x15, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, 0x65, 0x78, 0x06, 0x66, 0x65, 0x6d, 0x61, 0x6c, 0x65, 0x01, 0x01, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x61, 0x6c, 0x69, 0x63, 0x65, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, + 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x62, 0x6f, 0x62, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x14, + 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x13, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, 0x65, 0x78, 0x04, 0x6d, 0x61, 0x6c, 0x65, 0x01, + 0x01, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x62, 0x6f, 0x62, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, + 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00, + 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x63, 0x68, 0x61, 0x72, 0x6c, 0x69, 0x65, 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, + 0x63, 0x6f, 0x6d, 0x14, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x13, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, 0x65, 0x78, 0x04, 0x6d, + 0x61, 0x6c, 0x65, 0x01, 0x01, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x63, 0x68, 0x61, 0x72, 0x6c, 0x69, 0x65, 0x40, 0x64, + 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07, + 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x64, 0x61, 0x6e, 0x40, 0x64, 0x6f, 0x6d, + 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x14, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x13, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, + 0x65, 0x78, 0x04, 0x6d, 0x61, 0x6c, 0x65, 0x01, 0x01, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x64, 0x61, 0x6e, 0x40, 0x64, + 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, 0x0c, 0x07, + 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x65, 0x76, 0x65, 0x40, 0x64, 0x6f, 0x6d, + 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x16, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x15, 0x00, 0x0b, 0x00, 0x03, 0x00, 0x0c, 0x0e, 0x00, 0x73, + 0x65, 0x78, 0x06, 0x66, 0x65, 0x6d, 0x61, 0x6c, 0x65, 0x01, 0x01, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x65, 0x76, 0x65, + 0x40, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x63, 0x6f, 0x6d, 0x12, 0x00, 0x00, 0x00, 0x01, 0x06, 0x24, 0x2e, 0x72, 0x6f, 0x6c, 0x65, 0x09, + 0x0c, 0x07, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72}) + + require.True(t, mysql56PartialUpdateRowEvent.IsPartialUpdateRows()) + + format := BinlogFormat{ + HeaderSizes: []byte{ + 0, 13, 0, 8, 0, 0, 0, 0, 4, 0, 4, 0, 0, 0, 98, 0, 4, 26, 8, 0, 0, 0, 8, 8, 8, 2, 0, 0, 0, 10, 10, 10, 42, 42, 0, 18, 52, 0, 10, 40, 0, + }, + ServerVersion: "8.0.40", + FormatVersion: 4, + HeaderLength: 19, + ChecksumAlgorithm: 1, + } + tm := &TableMap{ + Flags: 1, + Database: "vt_commerce", + Name: "customer", + Types: []byte{8, 15, 245}, + CanBeNull: Bitmap{ + data: []byte{6}, + count: 3, + }, + Metadata: []uint16{0, 128, 4}, + ColumnCollationIDs: []collations.ID{63}, + } + ev, err := mysql56PartialUpdateRowEvent.Rows(format, tm) + require.NoError(t, err) + + assert.Equal(t, 5, len(ev.Rows)) + require.NoError(t, err) + for i := range ev.Rows { + vals, err := ev.StringValuesForTests(tm, i) + require.NoError(t, err) + // The third column is the JSON column. + require.Equal(t, vals[2], `JSON_INSERT(%s, '$.role', "manager")`) + t.Logf("Rows: %v", vals) + } +} diff --git a/go/mysql/binlog_event_rbr.go b/go/mysql/binlog_event_rbr.go index d77b7bcb9a0..ea59a2a2247 100644 --- a/go/mysql/binlog_event_rbr.go +++ b/go/mysql/binlog_event_rbr.go @@ -21,6 +21,7 @@ import ( "vitess.io/vitess/go/mysql/binlog" "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vterrors" querypb "vitess.io/vitess/go/vt/proto/query" @@ -283,10 +284,10 @@ func readColumnCollationIDs(data []byte, pos, count int) ([]collations.ID, error func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) { typ := ev.Type() data := ev.Bytes()[f.HeaderLength:] - hasIdentify := typ == eUpdateRowsEventV1 || typ == eUpdateRowsEventV2 || + hasIdentify := typ == eUpdateRowsEventV1 || typ == eUpdateRowsEventV2 || typ == ePartialUpdateRowsEvent || typ == eDeleteRowsEventV1 || typ == eDeleteRowsEventV2 hasData := typ == eWriteRowsEventV1 || typ == eWriteRowsEventV2 || - typ == eUpdateRowsEventV1 || typ == eUpdateRowsEventV2 + typ == eUpdateRowsEventV1 || typ == ePartialUpdateRowsEvent || typ == eUpdateRowsEventV2 result := Rows{} pos := 6 @@ -297,7 +298,7 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) { pos += 2 // version=2 have extra data here. - if typ == eWriteRowsEventV2 || typ == eUpdateRowsEventV2 || typ == eDeleteRowsEventV2 { + if typ == eWriteRowsEventV2 || typ == eUpdateRowsEventV2 || typ == ePartialUpdateRowsEvent || typ == eDeleteRowsEventV2 { // This extraDataLength contains the 2 bytes length. extraDataLength := binary.LittleEndian.Uint16(data[pos : pos+2]) pos += int(extraDataLength) @@ -311,6 +312,7 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) { numIdentifyColumns := 0 numDataColumns := 0 + numJSONColumns := 0 if hasIdentify { // Bitmap of the columns used for identify. @@ -324,6 +326,15 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) { numDataColumns = result.DataColumns.BitCount() } + // For PartialUpdateRowsEvents, we need to know how many JSON columns there are. + if ev.Type() == ePartialUpdateRowsEvent { + for c := 0; c < int(columnCount); c++ { + if tm.Types[c] == binlog.TypeJSON { + numJSONColumns++ + } + } + } + // One row at a time. for pos < len(data) { row := Row{} @@ -358,6 +369,19 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) { row.Identify = data[startPos:pos] } + if ev.Type() == ePartialUpdateRowsEvent { + log.Errorf("DEBUG: PartialUpdateRowsEvent found with %d JSON columns", numJSONColumns) + // The first byte indicates whether or not any JSON values are partial. + // If it's 0 then there's nothing else to do. + partialJSON := uint8(data[pos]) + log.Errorf("DEBUG: PartialJSON: %d", partialJSON) + pos++ + if partialJSON == 1 { + row.JSONPartialValues, pos = newBitmap(data, pos, numJSONColumns) + log.Errorf("DEBUG: PartialUpdateRowsEvent: JSONPartialValues: %08b", row.JSONPartialValues) + } + } + if hasData { // Bitmap of columns that are null (amongst the ones that are present). row.NullColumns, pos = newBitmap(data, pos, numDataColumns) @@ -402,6 +426,7 @@ func (rs *Rows) StringValuesForTests(tm *TableMap, rowIndex int) ([]string, erro var result []string valueIndex := 0 + jsonIndex := 0 data := rs.Rows[rowIndex].Data pos := 0 for c := 0; c < rs.DataColumns.Count(); c++ { @@ -416,12 +441,18 @@ func (rs *Rows) StringValuesForTests(tm *TableMap, rowIndex int) ([]string, erro continue } - // We have real data - value, l, err := binlog.CellValue(data, pos, tm.Types[c], tm.Metadata[c], &querypb.Field{Type: querypb.Type_UINT64}) + partialJSON := false + if rs.Rows[rowIndex].JSONPartialValues.Count() > 0 && tm.Types[c] == binlog.TypeJSON { + partialJSON = rs.Rows[rowIndex].JSONPartialValues.Bit(jsonIndex) + jsonIndex++ + } + + // We have real data. + value, l, err := binlog.CellValue(data, pos, tm.Types[c], tm.Metadata[c], &querypb.Field{Type: querypb.Type_UINT64}, partialJSON) if err != nil { return nil, err } - result = append(result, value.ToString()) + result = append(result, value.RawStr()) pos += l valueIndex++ } @@ -452,7 +483,7 @@ func (rs *Rows) StringIdentifiesForTests(tm *TableMap, rowIndex int) ([]string, } // We have real data - value, l, err := binlog.CellValue(data, pos, tm.Types[c], tm.Metadata[c], &querypb.Field{Type: querypb.Type_UINT64}) + value, l, err := binlog.CellValue(data, pos, tm.Types[c], tm.Metadata[c], &querypb.Field{Type: querypb.Type_UINT64}, false) if err != nil { return nil, err } diff --git a/go/mysql/replication_constants.go b/go/mysql/replication_constants.go index 6b6e34b2333..27d0bd331ce 100644 --- a/go/mysql/replication_constants.go +++ b/go/mysql/replication_constants.go @@ -110,6 +110,9 @@ const ( //eViewChangeEvent = 37 //eXAPrepareLogEvent = 38 + // PartialUpdateRowsEvent when binlog_row_value_options=PARTIAL_JSON. + ePartialUpdateRowsEvent = 39 + // Transaction_payload_event when binlog_transaction_compression=ON. eTransactionPayloadEvent = 40 diff --git a/go/vt/binlog/binlog_streamer.go b/go/vt/binlog/binlog_streamer.go index d62fcc3a915..08e06ec803c 100644 --- a/go/vt/binlog/binlog_streamer.go +++ b/go/vt/binlog/binlog_streamer.go @@ -760,7 +760,7 @@ func writeValuesAsSQL(sql *sqlparser.TrackedBuffer, tce *tableCacheEntry, rs *my } // We have real data. - value, l, err := binlog.CellValue(data, pos, tce.tm.Types[c], tce.tm.Metadata[c], &querypb.Field{Type: tce.ti.Fields[c].Type}) + value, l, err := binlog.CellValue(data, pos, tce.tm.Types[c], tce.tm.Metadata[c], &querypb.Field{Type: tce.ti.Fields[c].Type}, false) if err != nil { return keyspaceIDCell, nil, err } @@ -825,7 +825,7 @@ func writeIdentifiersAsSQL(sql *sqlparser.TrackedBuffer, tce *tableCacheEntry, r sql.WriteByte('=') // We have real data. - value, l, err := binlog.CellValue(data, pos, tce.tm.Types[c], tce.tm.Metadata[c], &querypb.Field{Type: tce.ti.Fields[c].Type}) + value, l, err := binlog.CellValue(data, pos, tce.tm.Types[c], tce.tm.Metadata[c], &querypb.Field{Type: tce.ti.Fields[c].Type}, false) if err != nil { return keyspaceIDCell, nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 62d6166b5ca..39cfacd8c76 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -28,8 +28,10 @@ import ( "vitess.io/vitess/go/mysql/collations/colldata" vjson "vitess.io/vitess/go/mysql/json" "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/ptr" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -384,14 +386,23 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun var newVal *sqltypes.Value var err error if field.Type == querypb.Type_JSON { + log.Errorf("DEBUG: vplayer applyChange: field.Type == querypb.Type_JSON, val type: %v, vals[i]: %+v", vals[i].Type(), vals[i].RawStr()) if vals[i].IsNull() { // An SQL NULL and not an actual JSON value newVal = &sqltypes.NULL + } else if strings.HasPrefix(vals[i].RawStr(), `JSON_`) { // TODO: find a better / less hacky way to do this + // An SQL expression that can be converted to a JSON value such + // as JSON_INSERT(). + // This occurs e.g. when using partial JSON values as a result of + // mysqld using binlog-row-value-options=PARTIAL_JSON. + s := fmt.Sprintf(vals[i].RawStr(), field.Name) + newVal = ptr.Of(sqltypes.MakeTrusted(querypb.Type_EXPRESSION, []byte(s))) } else { // A JSON value (which may be a JSON null literal value) newVal, err = vjson.MarshalSQLValue(vals[i].Raw()) if err != nil { return nil, err } } + log.Errorf("DEBUG: vplayer applyChange: field.Type == querypb.Type_JSON, newVal: %+v", newVal) bindVar, err = tp.bindFieldVal(field, newVal) } else { bindVar, err = tp.bindFieldVal(field, &vals[i]) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 59db723ff2b..16b6db65b2d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -623,7 +623,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev if vevent != nil { vevents = append(vevents, vevent) } - case ev.IsWriteRows() || ev.IsDeleteRows() || ev.IsUpdateRows(): + case ev.IsWriteRows() || ev.IsDeleteRows() || ev.IsUpdateRows() || ev.IsPartialUpdateRows(): // The existence of before and after images can be used to // identify statement types. It's also possible that the // before and after images end up going to different shards. @@ -973,7 +973,7 @@ func (vs *vstreamer) processJournalEvent(vevents []*binlogdatapb.VEvent, plan *s } nextrow: for _, row := range rows.Rows { - afterOK, afterValues, _, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns) + afterOK, afterValues, _, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues) if err != nil { return nil, err } @@ -1011,11 +1011,14 @@ nextrow: func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) { rowChanges := make([]*binlogdatapb.RowChange, 0, len(rows.Rows)) for _, row := range rows.Rows { - beforeOK, beforeValues, _, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns) + // The BEFORE image does not have partial JSON values so we pass an empty bitmap. + beforeOK, beforeValues, _, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns, mysql.Bitmap{}) if err != nil { return nil, err } - afterOK, afterValues, partial, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns) + // The AFTER image is where we may have partial JSON values, as reflected in the + // row's JSONPartialValues bitmap. + afterOK, afterValues, partial, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues) if err != nil { return nil, err } @@ -1081,13 +1084,14 @@ func (vs *vstreamer) rebuildPlans() error { // - true, if row needs to be skipped because of workflow filter rules // - data values, array of one value per column // - true, if the row image was partial (i.e. binlog_row_image=noblob and dml doesn't update one or more blob/text columns) -func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataColumns, nullColumns mysql.Bitmap) (bool, []sqltypes.Value, bool, error) { +func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataColumns, nullColumns mysql.Bitmap, jsonPartialValues mysql.Bitmap) (bool, []sqltypes.Value, bool, error) { if len(data) == 0 { return false, nil, false, nil } values := make([]sqltypes.Value, dataColumns.Count()) charsets := make([]collations.ID, len(values)) valueIndex := 0 + jsonIndex := 0 pos := 0 partial := false for colNum := 0; colNum < dataColumns.Count(); colNum++ { @@ -1103,12 +1107,20 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo valueIndex++ continue } - value, l, err := mysqlbinlog.CellValue(data, pos, plan.TableMap.Types[colNum], plan.TableMap.Metadata[colNum], plan.Table.Fields[colNum]) + partialJSON := false + if jsonPartialValues.Count() > 0 && plan.Table.Fields[colNum].Type == querypb.Type_JSON { + partialJSON = jsonPartialValues.Bit(jsonIndex) + log.Errorf("DEBUG: extractRowAndFilter: table: %s, column: %v, partialJSON: %v", plan.Table.Name, plan.Table.Fields[colNum], partialJSON) + jsonIndex++ + } + value, l, err := mysqlbinlog.CellValue(data, pos, plan.TableMap.Types[colNum], plan.TableMap.Metadata[colNum], plan.Table.Fields[colNum], partialJSON) if err != nil { log.Errorf("extractRowAndFilter: %s, table: %s, colNum: %d, fields: %+v, current values: %+v", err, plan.Table.Name, colNum, plan.Table.Fields, values) return false, nil, false, err } + log.Errorf("DEBUG: extractRowAndFilter: table: %s, column: %v, type: %v, value: %v", + plan.Table.Name, plan.Table.Fields[colNum], plan.TableMap.Types[colNum], value) pos += l if !value.IsNull() { // ENUMs and SETs require no special handling if they are NULL