diff --git a/go/mysql/binlog/binlog_json.go b/go/mysql/binlog/binlog_json.go index 73146abd06b..2a096728724 100644 --- a/go/mysql/binlog/binlog_json.go +++ b/go/mysql/binlog/binlog_json.go @@ -105,7 +105,10 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { path := data[pos : uint64(pos)+pathLen] pos += int(pathLen) log.Errorf("DEBUG: json diff path: %s", string(path)) - diff.WriteString(fmt.Sprintf("'%s', ", path)) + // We have to specify the unicode character set for the strings we + // use in the expression as the connection can be using a different + // character set (e.g. vreplication always uses set names binary). + diff.WriteString(fmt.Sprintf("_utf8mb4'%s', ", path)) if opType == jsonDiffOpRemove { // No value for remove diff.WriteString(")") @@ -124,6 +127,9 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { return sqltypes.Value{}, fmt.Errorf("cannot read JSON diff value for path %s: %w", path, err) } log.Errorf("DEBUG: json diff value: %v", value) + if value.Type() == json.TypeString { + diff.WriteString("_utf8mb4") + } diff.WriteString(fmt.Sprintf("%s)", value)) return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go index be9ae071ded..4a61da04a20 100644 --- a/go/mysql/binlog_event_mysql56_test.go +++ b/go/mysql/binlog_event_mysql56_test.go @@ -353,7 +353,7 @@ func TestMySQL56PartialUpdateRowsEvent(t *testing.T) { vals, err := ev.StringValuesForTests(tm, i) require.NoError(t, err) // The third column is the JSON column. - require.Equal(t, vals[2], `JSON_INSERT(%s, '$.role', "manager")`) + require.Equal(t, `JSON_INSERT(%s, _utf8mb4'$.role', _utf8mb4"manager")`, vals[2]) t.Logf("Rows: %v", vals) } } diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index 51736c606db..0cc9d5b17cf 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -1340,7 +1340,7 @@ type RowChange struct { // set if the value in the after image is a partial JSON value that // is represented as an expression of // JSON_[INSERT|REPLACE|REMOVE](%s, '$.path', value) which then is - // used to add/update/replace a path in the JSON document. When the + // used to add/update/remove a path in the JSON document. When the // value is used the fmt directive must be replaced by the actual // column name of the JSON field. JsonPartialValues *RowChange_Bitmap `protobuf:"bytes,4,opt,name=json_partial_values,json=jsonPartialValues,proto3" json:"json_partial_values,omitempty"` diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 80870de23ce..0a5bd9f26fd 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -1753,7 +1753,7 @@ func addInvariants(dbClient *binlogplayer.MockDBClient, vreplID, sourceTabletUID fmt.Sprintf("%s||0|0|Stopped|1|%s|0|0", position, workflow), )) dbClient.AddInvariant(setSessionTZ, &sqltypes.Result{}) - //dbClient.AddInvariant(setNames, &sqltypes.Result{}) + dbClient.AddInvariant(setNames, &sqltypes.Result{}) dbClient.AddInvariant(setNetReadTimeout, &sqltypes.Result{}) dbClient.AddInvariant(setNetWriteTimeout, &sqltypes.Result{}) diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 51a37b09284..7067211ff10 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -215,6 +215,11 @@ func setDBClientSettings(dbClient binlogplayer.DBClient, workflowConfig *vttable if _, err := dbClient.ExecuteFetch("set @@session.time_zone = '+00:00'", maxRows); err != nil { return err } + // Tables may have varying character sets. To ship the bits without interpreting them + // we set the character set to be binary. + if _, err := dbClient.ExecuteFetch("set names 'binary'", maxRows); err != nil { + return err + } if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", workflowConfig.NetReadTimeout), maxRows); err != nil { return err diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index 0fdf69b0773..4351e48d12a 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -340,7 +340,7 @@ message RowChange { // set if the value in the after image is a partial JSON value that // is represented as an expression of // JSON_[INSERT|REPLACE|REMOVE](%s, '$.path', value) which then is - // used to add/update/replace a path in the JSON document. When the + // used to add/update/remove a path in the JSON document. When the // value is used the fmt directive must be replaced by the actual // column name of the JSON field. Bitmap json_partial_values = 4;