Skip to content

Commit

Permalink
Minor changes from self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Dec 14, 2024
1 parent 1980b89 commit 734d56a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 37 deletions.
33 changes: 16 additions & 17 deletions go/mysql/binlog/binlog_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) {
} else { // Only the inner most function has the field name
diff.WriteString("%s, ") // This will later be replaced by the field name
}
outer = true

pathLen, readTo := readVariableLength(data, pos)
pos = readTo
Expand All @@ -125,27 +126,25 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) {
diff.WriteByte('\'')
diff.Write(path)
diff.WriteByte('\'')

if opType == jsonDiffOpRemove { // No value for remove
diff.WriteByte(')')
} else {
diff.WriteString(", ")
valueLen, readTo := readVariableLength(data, pos)
pos = readTo
value, err := ParseBinaryJSON(data[pos : pos+valueLen])
if err != nil {
return sqltypes.Value{}, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"cannot read JSON diff value for path %s: %v", path, err)
}
pos += valueLen
if value.Type() == json.TypeString {
diff.WriteString(sqlparser.Utf8mb4Str)
}
diff.Write(value.MarshalTo(nil))
diff.WriteByte(')')
continue
}

outer = true
diff.WriteString(", ")
valueLen, readTo := readVariableLength(data, pos)
pos = readTo
value, err := ParseBinaryJSON(data[pos : pos+valueLen])
if err != nil {
return sqltypes.Value{}, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"cannot read JSON diff value for path %s: %v", path, err)
}
pos += valueLen
if value.Type() == json.TypeString {
diff.WriteString(sqlparser.Utf8mb4Str)
}
diff.Write(value.MarshalTo(nil))
diff.WriteByte(')')
}

return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil
Expand Down
7 changes: 3 additions & 4 deletions go/test/utils/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ func SetBinlogRowImageMode(mode string, cnfDir string, includePartialJSON bool)
// variable is empty -- meaning we're not running in the CI and we assume
// MySQL8.0 or later is used, and you can understand the failures and make
// adjustments as necessary -- or it's set to reflect usage of MySQL 8.0 or
// later. This relies on the current standard values used such as mysql5.7,
// mysql8.0, mysql8.4, mariadb10.7, etc. This can be used when the CI test
// behavior needs to be altered based on the specific database platform
// we're testing against.
// later. This relies on the current standard values used such as mysql57,
// mysql80, mysql84, etc. This can be used when the CI test behavior needs
// to be altered based on the specific database platform we're testing against.
func CIDBPlatformIsMySQL8orLater() bool {
dbPlatform := strings.ToLower(os.Getenv("CI_DB_PLATFORM"))
if dbPlatform == "" {
Expand Down
10 changes: 4 additions & 6 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,10 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
jsonIndex := 0
for i, field := range tp.Fields {
if field.Type == querypb.Type_JSON && rowChange.JsonPartialValues != nil {
if !isBitSet(rowChange.JsonPartialValues.Cols, jsonIndex) {
switch {
case !isBitSet(rowChange.JsonPartialValues.Cols, jsonIndex):
// We use the full AFTER value which we already have.
jsonIndex++
continue
}
if len(afterVals[i].Raw()) == 0 {
case len(afterVals[i].Raw()) == 0:
// When using BOTH binlog_row_image=NOBLOB AND
// binlog_row_value_options=PARTIAL_JSON then the JSON column has the data bit
// set and the diff is empty when it's not present. So we want to use the
Expand All @@ -500,7 +498,7 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
return nil, vterrors.Wrapf(err, "failed to bind field value for %s.%s when building insert query",
tp.TargetName, field.Name)
}
} else {
default:
// For JSON columns when binlog-row-value-options=PARTIAL_JSON is used, we
// need to wrap the JSON diff function(s) around the BEFORE value.
diff := bindvars["a_"+field.Name].Value
Expand Down
32 changes: 22 additions & 10 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,14 +1519,15 @@ func TestPlayerRowMove(t *testing.T) {
validateQueryCountStat(t, "replicate", 3)
}

// TestPlayerPartialImagesUpdatePK tests the behavior of the vplayer when we
// have partial binlog images, meaning that binlog-row-image=NOBLOB and
// binlog-row-value-options=PARTIAL_JSON. These are both set when running the
// unit tests with runNoBlobTest=true and runPartialJSONTest=true.
// TestPlayerPartialImagesUpdatePK tests the behavior of the vplayer when
// updating the Primary Key for a row when we have partial binlog
// images, meaning that binlog-row-image=NOBLOB and
// binlog-row-value-options=PARTIAL_JSON. These are both set when running
// the unit tests with runNoBlobTest=true and runPartialJSONTest=true.
// If the test is running in the CI and the database platform is not MySQL
// 8.0 or later (which you can control using the CI_DB_PLATFORM env variable),
// then runPartialJSONTest will be false and the test will be skipped.
// the test if it's not set.
// 8.0 or later (which you can control using the CI_DB_PLATFORM env
// variable), then runPartialJSONTest will be false and the test will be
// skipped.
func TestPlayerPartialImagesUpdatePK(t *testing.T) {
if !runNoBlobTest {
t.Skip("Skipping test as binlog_row_image=NOBLOB is not set")
Expand Down Expand Up @@ -1586,15 +1587,26 @@ func TestPlayerPartialImagesUpdatePK(t *testing.T) {
},
},
{
input: `update src set id = id+10, bd = 'new blob data' where id = 2`,
input: `update src set bd = 'new blob data' where id = 2`,
output: []string{
"update dst set bd=_binary'new blob data' where id=2",
},
data: [][]string{
{"1", "{\"key1\": \"val1\", \"color\": \"red\"}", "blob data"},
{"2", "{\"key2\": \"val2\"}", "new blob data"},
{"3", "{\"key3\": \"val3\"}", "blob data3"},
},
},
{
input: `update src set id = id+10, bd = 'newest blob data' where id = 2`,
output: []string{
"delete from dst where id=2",
"insert into dst(id,jd,bd) values (12,JSON_OBJECT(_utf8mb4'key2', _utf8mb4'val2'),_binary'new blob data')",
"insert into dst(id,jd,bd) values (12,JSON_OBJECT(_utf8mb4'key2', _utf8mb4'val2'),_binary'newest blob data')",
},
data: [][]string{
{"1", "{\"key1\": \"val1\", \"color\": \"red\"}", "blob data"},
{"3", "{\"key3\": \"val3\"}", "blob data3"},
{"12", "{\"key2\": \"val2\"}", "new blob data"},
{"12", "{\"key2\": \"val2\"}", "newest blob data"},
},
},
{
Expand Down

0 comments on commit 734d56a

Please sign in to comment.