Skip to content

Commit

Permalink
WiP
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Dec 9, 2024
1 parent 747a61c commit a44e7e5
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 44 deletions.
115 changes: 113 additions & 2 deletions go/mysql/binlog/binlog_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@ limitations under the License.
package binlog

import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"math"
"strconv"

"vitess.io/vitess/go/hack"
"vitess.io/vitess/go/mysql/format"
"vitess.io/vitess/go/mysql/json"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)

/*
Expand All @@ -44,6 +49,14 @@ https://github.com/shyiko/mysql-binlog-connector-java/pull/119/files
https://github.com/noplay/python-mysql-replication/blob/175df28cc8b536a68522ff9b09dc5440adad6094/pymysqlreplication/packet.py
*/

type jsonDiffOp uint8

const (
jsonDiffOpReplace = jsonDiffOp(0)
jsonDiffOpInsert = jsonDiffOp(1)
jsonDiffOpRemove = jsonDiffOp(2)
)

// ParseBinaryJSON provides the parsing function from the mysql binary json
// representation to a JSON value instance.
func ParseBinaryJSON(data []byte) (*json.Value, error) {
Expand All @@ -60,6 +73,62 @@ func ParseBinaryJSON(data []byte) (*json.Value, error) {
return node, nil
}

// ParseBinaryJSONDiff provides the parsing function from the MySQL JSON
// diff representation to an SQL expression.
func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) {
log.Errorf("DEBUG: json diff data hex: %s, string: %s", hex.EncodeToString(data), string(data))
pos := 0
opType := jsonDiffOp(data[pos])
pos++
diff := bytes.Buffer{}

switch opType {
case jsonDiffOpReplace:
diff.WriteString("JSON_REPLACE(")
case jsonDiffOpInsert:
diff.WriteString("JSON_INSERT(")
case jsonDiffOpRemove:
diff.WriteString("JSON_REMOVE(")
}
diff.WriteString("%s, ") // This will later be replaced by the field name

log.Errorf("DEBUG: json diff opType: %d", opType)

pathLen, readTo, ok := readLenEncInt(data, pos)
if !ok {
return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff path length")
}
pos = readTo

log.Errorf("DEBUG: json diff path length: %d", pathLen)

path := data[pos : uint64(pos)+pathLen]
pos += int(pathLen)
log.Errorf("DEBUG: json diff path: %s", string(path))
diff.WriteString(fmt.Sprintf("'%s', ", path))

if opType == jsonDiffOpRemove { // No value for remove
diff.WriteString(")")
return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil
}

valueLen, readTo, ok := readLenEncInt(data, pos)
if !ok {
return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff path length")
}
pos = readTo
log.Errorf("DEBUG: json diff value length: %d", valueLen)

value, err := ParseBinaryJSON(data[pos : uint64(pos)+valueLen])
if err != nil {
return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff value for path %s: %w", path, err)
}
log.Errorf("DEBUG: json diff value: %v", value)
diff.WriteString(fmt.Sprintf("%s)", value))

return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil
}

// jsonDataType has the values used in the mysql json binary representation to denote types.
// We have string, literal(true/false/null), number, object or array types.
// large object => doc size > 64K: you get pointers instead of inline values.
Expand Down Expand Up @@ -315,7 +384,7 @@ func binparserOpaque(_ jsonDataType, data []byte, pos int) (node *json.Value, er
precision := decimalData[0]
scale := decimalData[1]
metadata := (uint16(precision) << 8) + uint16(scale)
val, _, err := CellValue(decimalData, 2, TypeNewDecimal, metadata, &querypb.Field{Type: querypb.Type_DECIMAL})
val, _, err := CellValue(decimalData, 2, TypeNewDecimal, metadata, &querypb.Field{Type: querypb.Type_DECIMAL}, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -394,3 +463,45 @@ func binparserObject(typ jsonDataType, data []byte, pos int) (node *json.Value,

return json.NewObject(object), nil
}

func readLenEncInt(data []byte, pos int) (uint64, int, bool) {
if pos >= len(data) {
return 0, 0, false
}

// reslice to avoid arithmetic below
data = data[pos:]

switch data[0] {
case 0xfc:
// Encoded in the next 2 bytes.
if 2 >= len(data) {
return 0, 0, false
}
return uint64(data[1]) |
uint64(data[2])<<8, pos + 3, true
case 0xfd:
// Encoded in the next 3 bytes.
if 3 >= len(data) {
return 0, 0, false
}
return uint64(data[1]) |
uint64(data[2])<<8 |
uint64(data[3])<<16, pos + 4, true
case 0xfe:
// Encoded in the next 8 bytes.
if 8 >= len(data) {
return 0, 0, false
}
return uint64(data[1]) |
uint64(data[2])<<8 |
uint64(data[3])<<16 |
uint64(data[4])<<24 |
uint64(data[5])<<32 |
uint64(data[6])<<40 |
uint64(data[7])<<48 |
uint64(data[8])<<56, pos + 9, true
default:
return uint64(data[0]), pos + 1, true
}
}
44 changes: 27 additions & 17 deletions go/mysql/binlog/rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
"time"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// ZeroTimestamp is the special value 0 for a timestamp.
Expand Down Expand Up @@ -130,7 +132,7 @@ func CellLength(data []byte, pos int, typ byte, metadata uint16) (int, error) {
uint32(data[pos+2])<<16|
uint32(data[pos+3])<<24), nil
default:
return 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unsupported blob/geometry metadata value %v (data: %v pos: %v)", metadata, data, pos)
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported blob/geometry metadata value %v (data: %v pos: %v)", metadata, data, pos)
}
case TypeString:
// This may do String, Enum, and Set. The type is in
Expand All @@ -151,7 +153,7 @@ func CellLength(data []byte, pos int, typ byte, metadata uint16) (int, error) {
return l + 1, nil

default:
return 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unsupported type %v (data: %v pos: %v)", typ, data, pos)
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported type %v (data: %v pos: %v)", typ, data, pos)
}
}

Expand All @@ -176,7 +178,7 @@ func printTimestamp(v uint32) *bytes.Buffer {
// byte to determine general shared aspects of types and the querypb.Field to
// determine other info specifically about its underlying column (SQL column
// type, column length, charset, etc)
func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.Field) (sqltypes.Value, int, error) {
func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.Field, partialJSON bool) (sqltypes.Value, int, error) {
switch typ {
case TypeTiny:
if sqltypes.IsSigned(field.Type) {
Expand Down Expand Up @@ -644,7 +646,7 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.F
return sqltypes.MakeTrusted(querypb.Type_ENUM,
strconv.AppendUint(nil, uint64(val), 10)), 2, nil
default:
return sqltypes.NULL, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected enum size: %v", metadata&0xff)
return sqltypes.NULL, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected enum size: %v", metadata&0xff)
}

case TypeSet:
Expand Down Expand Up @@ -672,21 +674,29 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.F
uint32(data[pos+2])<<16 |
uint32(data[pos+3])<<24)
default:
return sqltypes.NULL, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unsupported blob metadata value %v (data: %v pos: %v)", metadata, data, pos)
return sqltypes.NULL, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported blob metadata value %v (data: %v pos: %v)", metadata, data, pos)
}
pos += int(metadata)

// For JSON, we parse the data, and emit SQL.
if typ == TypeJSON {
var err error
jsonData := data[pos : pos+l]
jsonVal, err := ParseBinaryJSON(jsonData)
if err != nil {
panic(err)
if partialJSON {
log.Errorf("DEBUG: partialJSON cell value: %s", string(jsonData))
val, err := ParseBinaryJSONDiff(jsonData)
if err != nil {
panic(err)
}
log.Errorf("DEBUG: decoded partialJSON cell value: %v", val)
return val, l + int(metadata), nil
} else {
jsonVal, err := ParseBinaryJSON(jsonData)
if err != nil {
panic(err)
}
jd := jsonVal.MarshalTo(nil)
return sqltypes.MakeTrusted(sqltypes.Expression, jd), l + int(metadata), nil
}
d := jsonVal.MarshalTo(nil)
return sqltypes.MakeTrusted(sqltypes.Expression,
d), l + int(metadata), nil
}

return sqltypes.MakeTrusted(querypb.Type_VARBINARY,
Expand All @@ -710,7 +720,7 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.F
return sqltypes.MakeTrusted(querypb.Type_UINT16,
strconv.AppendUint(nil, uint64(val), 10)), 2, nil
default:
return sqltypes.NULL, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected enum size: %v", metadata&0xff)
return sqltypes.NULL, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected enum size: %v", metadata&0xff)
}
}
if t == TypeSet {
Expand Down Expand Up @@ -776,13 +786,13 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.F
uint32(data[pos+2])<<16 |
uint32(data[pos+3])<<24)
default:
return sqltypes.NULL, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unsupported geometry metadata value %v (data: %v pos: %v)", metadata, data, pos)
return sqltypes.NULL, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported geometry metadata value %v (data: %v pos: %v)", metadata, data, pos)
}
pos += int(metadata)
return sqltypes.MakeTrusted(querypb.Type_GEOMETRY,
data[pos:pos+l]), l + int(metadata), nil

default:
return sqltypes.NULL, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "unsupported type %v", typ)
return sqltypes.NULL, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported type %v", typ)
}
}
2 changes: 1 addition & 1 deletion go/mysql/binlog/rbr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func TestCellLengthAndData(t *testing.T) {
}

// Test CellValue.
out, l, err := CellValue(padded, 1, tcase.typ, tcase.metadata, &querypb.Field{Type: tcase.styp})
out, l, err := CellValue(padded, 1, tcase.typ, tcase.metadata, &querypb.Field{Type: tcase.styp}, false)
if err != nil || l != len(tcase.data) || out.Type() != tcase.out.Type() || !bytes.Equal(out.Raw(), tcase.out.Raw()) {
t.Errorf("testcase cellData(%v,%v) returned unexpected result: %v %v %v, was expecting %v %v <nil>\nwant: %s\ngot: %s",
tcase.typ, tcase.data, out, l, err, tcase.out, len(tcase.data), tcase.out.Raw(), out.Raw())
Expand Down
27 changes: 18 additions & 9 deletions go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,20 @@ type BinlogEvent interface {
IsWriteRows() bool
// IsUpdateRows returns true if this is a UPDATE_ROWS_EVENT.
IsUpdateRows() bool
// IsPartialUpdateRows returs true if a partial JSON update event
// is found. These events are only seen in MySQL 8.0 if the mysqld
// instance has binlog_row_value_options=PARTIAL_JSON set.
IsPartialUpdateRows() bool
// IsDeleteRows returns true if this is a DELETE_ROWS_EVENT.
IsDeleteRows() bool

// IsPseudo is for custom implementations of GTID.
IsPseudo() bool

// IsTransactionPayload returns true if a compressed transaction
// payload event is found (binlog_transaction_compression=ON).
IsTransactionPayload() bool

// Timestamp returns the timestamp from the event header.
Timestamp() uint32
// ServerID returns the server ID from the event header.
Expand Down Expand Up @@ -123,8 +134,8 @@ type BinlogEvent interface {
TableMap(BinlogFormat) (*TableMap, error)
// Rows returns a Rows struct representing data from a
// {WRITE,UPDATE,DELETE}_ROWS_EVENT. This is only valid if
// IsWriteRows(), IsUpdateRows(), or IsDeleteRows() returns
// true.
// IsWriteRows(), IsUpdateRows(), IsPartialUpdateRows(), or
// IsDeleteRows() returns true.
Rows(BinlogFormat, *TableMap) (Rows, error)
// TransactionPayload returns a TransactionPayload type which provides
// a GetNextEvent() method to iterate over the events contained within
Expand All @@ -141,13 +152,6 @@ type BinlogEvent interface {
// the same event and a nil checksum.
StripChecksum(BinlogFormat) (ev BinlogEvent, checksum []byte, err error)

// IsPseudo is for custom implementations of GTID.
IsPseudo() bool

// IsTransactionPayload returns true if a compressed transaction
// payload event is found (binlog_transaction_compression=ON).
IsTransactionPayload() bool

// Bytes returns the binary representation of the event
Bytes() []byte
}
Expand Down Expand Up @@ -266,6 +270,11 @@ type Row struct {
// It is only set for UPDATE and DELETE events.
Identify []byte

// If this row represents a PartialUpdateRow event there will be a
// shared-image that sits between the before and after images that
// defines how the JSON column values are represented.
JSONPartialValues Bitmap

// Data is the raw data.
// It is only set for WRITE and UPDATE events.
Data []byte
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/binlog_event_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ func (ev binlogEvent) IsUpdateRows() bool {
ev.Type() == eUpdateRowsEventV2
}

// IsPartialUpdateRows implements BinlogEvent.IsPartialUpdateRows().
func (ev binlogEvent) IsPartialUpdateRows() bool {
return ev.Type() == ePartialUpdateRowsEvent
}

// IsDeleteRows implements BinlogEvent.IsDeleteRows().
// We do not support v0.
func (ev binlogEvent) IsDeleteRows() bool {
Expand Down
4 changes: 4 additions & 0 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func (ev filePosFakeEvent) IsUpdateRows() bool {
return false
}

func (ev filePosFakeEvent) IsPartialUpdateRows() bool {
return false
}

func (ev filePosFakeEvent) IsDeleteRows() bool {
return false
}
Expand Down
Loading

0 comments on commit a44e7e5

Please sign in to comment.