Skip to content

Commit

Permalink
Get binlog_row_image = noblob and binlog-row-value-options = PARTIAL_…
Browse files Browse the repository at this point in the history
…JSON working together

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Dec 10, 2024
1 parent 42d5db4 commit 6e288df
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 2 deletions.
3 changes: 2 additions & 1 deletion go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) {

if ev.Type() == ePartialUpdateRowsEvent {
// The first byte indicates whether or not any JSON values are partial.
// If it's 0 then there's nothing else to do.
// If it's not 1 then there's nothing else to do for the row as any
// columns use the full value.
partialJSON := uint8(data[pos])
pos++
if partialJSON == 1 {
Expand Down
15 changes: 15 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
var newVal *sqltypes.Value
var err error
if field.Type == querypb.Type_JSON {
//log.Errorf("DEBUG: JSON field %v, value: %v", field.Name, vals[i].RawStr())
switch {
case vals[i].IsNull(): // An SQL NULL and not an actual JSON value
newVal = &sqltypes.NULL
Expand All @@ -394,6 +395,12 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
// as JSON_INSERT().
// This occurs e.g. when using partial JSON values as a result of
// mysqld using binlog-row-value-options=PARTIAL_JSON.
if len(vals[i].Raw()) == 0 {
// When using BOTH binlog_row_image=NOBLOB AND
// binlog_row_value_options=PARTIAL_JSON then the JSON column
// has the data bit set and the diff is empty.
setBit(rowChange.DataColumns.Cols, i, false)
}
s := fmt.Sprintf(vals[i].RawStr(), field.Name)
newVal = ptr.Of(sqltypes.MakeTrusted(querypb.Type_EXPRESSION, []byte(s)))
default: // A JSON value (which may be a JSON null literal value)
Expand Down Expand Up @@ -437,6 +444,7 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
case before && after:
if !tp.pkChanged(bindvars) && !tp.HasExtraSourcePkColumns {
if tp.isPartial(rowChange) {
//log.Errorf("DEBUG: building partial update query using DataColumns: %08b", rowChange.DataColumns.Cols)
upd, err := tp.getPartialUpdateQuery(rowChange.DataColumns)
if err != nil {
return nil, err
Expand All @@ -452,6 +460,12 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
return nil, err
}
}
// TODO: the INSERTs done here after deleting the row with the original PK
// need to use the values from the BEFORE image for the columns NOT present
// in the AFTER image due to being a partial image due to the source's usage
// of binlog-row-image=NOBLOB.
// For JSON columns when binlog-row-value-options=PARTIAL_JSON is used, we
// need to wrap the JSON diff function(s) around the BEFORE value.
if tp.isOutsidePKRange(bindvars, before, after, "insert") {
return nil, nil
}
Expand Down Expand Up @@ -593,6 +607,7 @@ func execParsedQuery(pq *sqlparser.ParsedQuery, bindvars map[string]*querypb.Bin
if err != nil {
return nil, err
}
//log.Errorf("DEBUG: execParsedQuery: %s", query)
return executor(query)
}

Expand Down
10 changes: 10 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ func isBitSet(data []byte, index int) bool {
return data[byteIndex]&bitMask > 0
}

func setBit(data []byte, index int, value bool) {
byteIndex := index / 8
bitMask := byte(1 << (uint(index) & 0x7))
if value {
data[byteIndex] |= bitMask
} else {
data[byteIndex] &= 0xff - bitMask
}
}

func (tp *TablePlan) isPartial(rowChange *binlogdatapb.RowChange) bool {
if (tp.WorkflowConfig.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage) == 0 ||
rowChange.DataColumns == nil ||
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,19 +1031,23 @@ func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *strea
}
if afterOK {
rowChange.After = sqltypes.RowToProto3(afterValues)
//log.Errorf("DEBUG: partial = %v", partial)
//log.Errorf("DEBUG: rowChange: After: %+v", rowChange.After)
if (vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage != 0) &&
partial {
(partial || row.JSONPartialValues.Count() > 0) {

rowChange.DataColumns = &binlogdatapb.RowChange_Bitmap{
Count: int64(rows.DataColumns.Count()),
Cols: rows.DataColumns.Bits(),
}
//log.Errorf("DEBUG: rowChange: DataColumns: %08b", rowChange.DataColumns.Cols)
}
if row.JSONPartialValues.Count() > 0 {
rowChange.JsonPartialValues = &binlogdatapb.RowChange_Bitmap{
Count: int64(row.JSONPartialValues.Count()),
Cols: row.JSONPartialValues.Bits(),
}
//log.Errorf("DEBUG: rowChange: JSONPartialColumns: %08b", rowChange.JsonPartialValues.Cols)
}
}
rowChanges = append(rowChanges, rowChange)
Expand Down

0 comments on commit 6e288df

Please sign in to comment.