Skip to content

Commit

Permalink
Get edge cases working
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Dec 11, 2024
1 parent 6e288df commit f2a5445
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 22 deletions.
9 changes: 8 additions & 1 deletion go/mysql/binlog/binlog_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,19 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) {
diff.WriteString("JSON_INSERT(")
case jsonDiffOpRemove:
diff.WriteString("JSON_REMOVE(")
default:
// Can be a literal JSON null.
js, err := ParseBinaryJSON(data)
if err == nil && js.Type() == json.TypeNull {
return sqltypes.MakeTrusted(sqltypes.Expression, js.MarshalTo(nil)), nil
}
return sqltypes.Value{}, fmt.Errorf("invalid JSON diff operation: %d", opType)
}
if outer {
diff.WriteString(innerStr)
diff.WriteString(", ")
} else { // Only the inner most function has the field name
diff.WriteString("%s, ") // This will later be replaced by the field name
diff.WriteString("`%s`, ") // This will later be replaced by the field name
}

pathLen, readTo := readVariableLength(data, pos)
Expand Down
5 changes: 3 additions & 2 deletions go/mysql/binlog/rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,9 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.F
if err != nil {
panic(err)
}
jd := jsonVal.MarshalTo(nil)
return sqltypes.MakeTrusted(sqltypes.Expression, jd), l + int(metadata), nil
d := jsonVal.MarshalTo(nil)
return sqltypes.MakeTrusted(sqltypes.Expression,
d), l + int(metadata), nil
}

return sqltypes.MakeTrusted(querypb.Type_VARBINARY,
Expand Down
37 changes: 30 additions & 7 deletions go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func TestMySQL56PartialUpdateRowsEvent(t *testing.T) {
97, 110, 97, 103, 101, 114,
},
numRows: 5,
want: "JSON_INSERT(%s, _utf8mb4'$.role', _utf8mb4\"manager\")",
want: "JSON_INSERT(`%s`, _utf8mb4'$.role', _utf8mb4\"manager\")",
},
{
// The mysqlbinlog -vvv --base64-output=decode-rows output for the following event:
Expand All @@ -373,7 +373,7 @@ func TestMySQL56PartialUpdateRowsEvent(t *testing.T) {
},
name: "REPLACE",
numRows: 1,
want: "JSON_REPLACE(%s, _utf8mb4'$.role', _utf8mb4\"IC\")",
want: "JSON_REPLACE(`%s`, _utf8mb4'$.role', _utf8mb4\"IC\")",
},
{
name: "REMOVE",
Expand All @@ -394,7 +394,7 @@ func TestMySQL56PartialUpdateRowsEvent(t *testing.T) {
111, 108, 101, 115, 97, 108, 97, 114, 121, 7, 109, 97, 110, 97, 103, 101, 114, 1, 1, 0, 2, 0, 0, 0, 0, 0, 0, 0, 14, 98, 111, 98, 64, 100, 111, 109,
97, 105, 110, 46, 99, 111, 109, 10, 0, 0, 0, 2, 8, 36, 46, 115, 97, 108, 97, 114, 121,
},
want: "JSON_REMOVE(%s, _utf8mb4'$.salary')",
want: "JSON_REMOVE(`%s`, _utf8mb4'$.salary')",
},
{
name: "REMOVE and REPLACE",
Expand Down Expand Up @@ -486,7 +486,7 @@ func TestMySQL56PartialUpdateRowsEvent(t *testing.T) {
100, 97, 121, 2, 16, 36, 46, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114,
},
numRows: 5,
want: "JSON_REMOVE(JSON_REPLACE(%s, _utf8mb4'$.day', _utf8mb4\"monday\"), _utf8mb4'$.favorite_color')",
want: "JSON_REMOVE(JSON_REPLACE(`%s`, _utf8mb4'$.day', _utf8mb4\"monday\"), _utf8mb4'$.favorite_color')",
},
{
name: "INSERT and REMOVE and REPLACE",
Expand Down Expand Up @@ -514,7 +514,7 @@ func TestMySQL56PartialUpdateRowsEvent(t *testing.T) {
1, 7, 36, 46, 104, 111, 98, 98, 121, 8, 12, 6, 115, 107, 105, 105, 110, 103,
},
numRows: 1,
want: "JSON_INSERT(JSON_REMOVE(JSON_REPLACE(%s, _utf8mb4'$.day', _utf8mb4\"tuesday\"), _utf8mb4'$.favorite_color'), _utf8mb4'$.hobby', _utf8mb4\"skiing\")",
want: "JSON_INSERT(JSON_REMOVE(JSON_REPLACE(`%s`, _utf8mb4'$.day', _utf8mb4\"tuesday\"), _utf8mb4'$.favorite_color'), _utf8mb4'$.hobby', _utf8mb4\"skiing\")",
},
{
name: "REPLACE with null",
Expand All @@ -535,7 +535,7 @@ func TestMySQL56PartialUpdateRowsEvent(t *testing.T) {
109, 97, 105, 110, 46, 99, 111, 109, 13, 0, 0, 0, 0, 8, 36, 46, 115, 97, 108, 97, 114, 121, 2, 4, 0,
},
numRows: 1,
want: "JSON_REPLACE(%s, _utf8mb4'$.salary', null)",
want: "JSON_REPLACE(`%s`, _utf8mb4'$.salary', null)",
},
{
name: "REPLACE 2 paths",
Expand All @@ -557,7 +557,30 @@ func TestMySQL56PartialUpdateRowsEvent(t *testing.T) {
105, 110, 46, 99, 111, 109, 27, 0, 0, 0, 0, 8, 36, 46, 115, 97, 108, 97, 114, 121, 3, 5, 110, 0, 0, 6, 36, 46, 114, 111, 108, 101, 4, 12, 2, 73, 67,
},
numRows: 1,
want: "JSON_REPLACE(JSON_REPLACE(%s, _utf8mb4'$.salary', 110), _utf8mb4'$.role', _utf8mb4\"IC\")",
want: "JSON_REPLACE(JSON_REPLACE(`%s`, _utf8mb4'$.salary', 110), _utf8mb4'$.role', _utf8mb4\"IC\")",
},
{
name: "JSON null",
// The mysqlbinlog -vvv --base64-output=decode-rows output for the following event:
// ### UPDATE `vt_commerce`.`customer`
// ### WHERE
// ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='{"day": "friday", "role": "manager", "color": "red", "salary": 100, "favorite_color": "black"}' /* JSON meta=4 nullable=1 is_null=0 */
// ### SET
// ### @1=5 /* LONGINT meta=0 nullable=0 is_null=0 */
// ### @2='[email protected]' /* VARSTRING(128) meta=128 nullable=1 is_null=0 */
// ### @3='null' /* JSON meta=4 nullable=1 is_null=0 */
rawEvent: []byte{
109, 200, 88, 103, 39, 57, 91, 186, 0, 194, 0, 0, 0, 0, 0, 0, 0, 0, 0, 178, 0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 7, 7, 0, 5, 0, 0, 0, 0, 0, 0, 0, 17, 110,
101, 119, 101, 118, 101, 64, 100, 111, 109, 97, 105, 110, 46, 99, 111, 109, 97, 0, 0, 0, 0, 5, 0, 96, 0, 39, 0, 3, 0, 42, 0, 4, 0, 46, 0, 5, 0, 51,
0, 6, 0, 57, 0, 14, 0, 12, 71, 0, 12, 78, 0, 12, 86, 0, 5, 100, 0, 12, 90, 0, 100, 97, 121, 114, 111, 108, 101, 99, 111, 108, 111, 114, 115, 97,
108, 97, 114, 121, 102, 97, 118, 111, 114, 105, 116, 101, 95, 99, 111, 108, 111, 114, 6, 102, 114, 105, 100, 97, 121, 7, 109, 97, 110, 97, 103, 101,
114, 3, 114, 101, 100, 5, 98, 108, 97, 99, 107, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 17, 110, 101, 119, 101, 118, 101, 64, 100, 111, 109, 97, 105, 110, 46,
99, 111, 109, 2, 0, 0, 0, 4, 0,
},
numRows: 1,
want: "null",
},
}

Expand Down
9 changes: 7 additions & 2 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
// Confirm that the 0 scale decimal field, dec80, is replicated correctly
execVtgateQuery(t, vtgateConn, sourceKs, "update customer set dec80 = 0")
execVtgateQuery(t, vtgateConn, sourceKs, "update customer set blb = \"new blob data\" where cid=3")
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'")
execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')")
// TODO: file a MySQL bug for this. The following query results in the quoted string literal "null"
// stored in the j3 column. But with and without the literal quotes, the value in the PARTIAL_JSON
// diff is the unquoted literal null which is a JSON null type. This leads to a vdiff mismatch.
//execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'")
//execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')")
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = 'null'")
execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', 'null')")
waitForNoWorkflowLag(t, vc, targetKs, workflow)
dec80Replicated := false
for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} {
Expand Down
14 changes: 8 additions & 6 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package vreplication
import (
"encoding/json"
"fmt"
"slices"
"sort"
"strings"

Expand Down Expand Up @@ -386,11 +387,11 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
var newVal *sqltypes.Value
var err error
if field.Type == querypb.Type_JSON {
//log.Errorf("DEBUG: JSON field %v, value: %v", field.Name, vals[i].RawStr())
switch {
case vals[i].IsNull(): // An SQL NULL and not an actual JSON value
newVal = &sqltypes.NULL
case rowChange.JsonPartialValues != nil && isBitSet(rowChange.JsonPartialValues.Cols, jsonIndex):
case rowChange.JsonPartialValues != nil && isBitSet(rowChange.JsonPartialValues.Cols, jsonIndex) &&
!slices.Equal(vals[i].Raw(), sqltypes.NullBytes):
// An SQL expression that can be converted to a JSON value such
// as JSON_INSERT().
// This occurs e.g. when using partial JSON values as a result of
Expand All @@ -400,9 +401,12 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
// binlog_row_value_options=PARTIAL_JSON then the JSON column
// has the data bit set and the diff is empty.
setBit(rowChange.DataColumns.Cols, i, false)
newVal = ptr.Of(sqltypes.MakeTrusted(querypb.Type_EXPRESSION, nil))
} else {
newVal = ptr.Of(sqltypes.MakeTrusted(querypb.Type_EXPRESSION, []byte(
fmt.Sprintf(vals[i].RawStr(), field.Name),
)))
}
s := fmt.Sprintf(vals[i].RawStr(), field.Name)
newVal = ptr.Of(sqltypes.MakeTrusted(querypb.Type_EXPRESSION, []byte(s)))
default: // A JSON value (which may be a JSON null literal value)
newVal, err = vjson.MarshalSQLValue(vals[i].Raw())
if err != nil {
Expand Down Expand Up @@ -444,7 +448,6 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
case before && after:
if !tp.pkChanged(bindvars) && !tp.HasExtraSourcePkColumns {
if tp.isPartial(rowChange) {
//log.Errorf("DEBUG: building partial update query using DataColumns: %08b", rowChange.DataColumns.Cols)
upd, err := tp.getPartialUpdateQuery(rowChange.DataColumns)
if err != nil {
return nil, err
Expand Down Expand Up @@ -607,7 +610,6 @@ func execParsedQuery(pq *sqlparser.ParsedQuery, bindvars map[string]*querypb.Bin
if err != nil {
return nil, err
}
//log.Errorf("DEBUG: execParsedQuery: %s", query)
return executor(query)
}

Expand Down
4 changes: 0 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,23 +1031,19 @@ func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *strea
}
if afterOK {
rowChange.After = sqltypes.RowToProto3(afterValues)
//log.Errorf("DEBUG: partial = %v", partial)
//log.Errorf("DEBUG: rowChange: After: %+v", rowChange.After)
if (vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage != 0) &&
(partial || row.JSONPartialValues.Count() > 0) {

rowChange.DataColumns = &binlogdatapb.RowChange_Bitmap{
Count: int64(rows.DataColumns.Count()),
Cols: rows.DataColumns.Bits(),
}
//log.Errorf("DEBUG: rowChange: DataColumns: %08b", rowChange.DataColumns.Cols)
}
if row.JSONPartialValues.Count() > 0 {
rowChange.JsonPartialValues = &binlogdatapb.RowChange_Bitmap{
Count: int64(row.JSONPartialValues.Count()),
Cols: row.JSONPartialValues.Bits(),
}
//log.Errorf("DEBUG: rowChange: JSONPartialColumns: %08b", rowChange.JsonPartialValues.Cols)
}
}
rowChanges = append(rowChanges, rowChange)
Expand Down

0 comments on commit f2a5445

Please sign in to comment.