Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: use proper column collations in vstreamer #15313

Merged
merged 12 commits into from
Feb 26, 2024
9 changes: 9 additions & 0 deletions go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package mysql
import (
"fmt"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

Expand Down Expand Up @@ -216,6 +218,13 @@ type TableMap struct {
// - If the metadata is one byte, only the lower 8 bits are used.
// - If the metadata is two bytes, all 16 bits are used.
Metadata []uint16

// ColumnCollationIDs contains information about the inherited
// or implied column default collation and any explicit per-column
// override for text based columns ONLY. This means that the
// array position needs to be mapped to the ordered list of
// text based columns in the table.
ColumnCollationIDs []collations.ID
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a lot of the code is going to be easier to write and easier to follow if this slice has one entry for each column and you simply fill the non-text columns with a Binary collation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Soooo... I agree! But unfortunately I'd have to perform the same logic to fill in those gaps as the binlog row metadata does not include the column's index or name. I'll think about it some more though.

Copy link
Contributor Author

@mattlord mattlord Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explored this path, but we don't have access to the collation ENV in order to properly fill in the gaps here. So leaving it as-is for now, where users of this (vstreamer) have access to both the collation ENV and mysqld.

}

// Rows contains data from a {WRITE,UPDATE,DELETE}_ROWS_EVENT.
Expand Down
15 changes: 9 additions & 6 deletions go/mysql/binlog_event_make_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"

"vitess.io/vitess/go/mysql/binlog"
Expand Down Expand Up @@ -222,6 +223,7 @@ func TestTableMapEvent(t *testing.T) {
0,
384, // Length of the varchar field.
},
ColumnCollationIDs: []collations.ID{},
}
tm.CanBeNull.Set(1, true)
tm.CanBeNull.Set(2, true)
Expand Down Expand Up @@ -258,12 +260,13 @@ func TestLargeTableMapEvent(t *testing.T) {
}

tm := &TableMap{
Flags: 0x8090,
Database: "my_database",
Name: "my_table",
Types: types,
CanBeNull: NewServerBitmap(colLen),
Metadata: metadata,
Flags: 0x8090,
Database: "my_database",
Name: "my_table",
Types: types,
CanBeNull: NewServerBitmap(colLen),
Metadata: metadata,
ColumnCollationIDs: []collations.ID{},
}
tm.CanBeNull.Set(1, true)
tm.CanBeNull.Set(2, true)
Expand Down
84 changes: 75 additions & 9 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,28 @@
"encoding/binary"

"vitess.io/vitess/go/mysql/binlog"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// These are the TABLE_MAP_EVENT's optional metadata field types from: libbinlogevents/include/rows_event.h
// See: https://dev.mysql.com/doc/dev/mysql-server/8.0.34/structbinary__log_1_1Table__map__event_1_1Optional__metadata__fields.html
const (
TableMapSignedness uint8 = iota + 1
TableMapDefaultCharset
TableMapColumnCharset
TableMapColumnName
TableMapSetStrValue
TableMapEnumStrValue
TableMapGeometryType
TableMapSimplePrimaryKey
TableMapPrimaryKeyWithPrefix
TableMapEnumAndSetDefaultCharset
TableMapEnumAndSetColumnCharset
TableMapColumnVisibility
)

// TableMap implements BinlogEvent.TableMap().
Expand All @@ -43,6 +61,7 @@
// cc column-def, one byte per column
// <var> column-meta-def (var-len encoded string)
// n NULL-bitmask, length: (cc + 7) / 8
// n Optional Metadata
func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) {
data := ev.Bytes()[f.HeaderLength:]

Expand All @@ -64,7 +83,7 @@

columnCount, read, ok := readLenEncInt(data, pos)
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)

Check warning on line 86 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L86

Added line #L86 was not covered by tests
}
pos = read

Expand All @@ -73,7 +92,7 @@

metaLen, read, ok := readLenEncInt(data, pos)
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data)

Check warning on line 95 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L95

Added line #L95 was not covered by tests
}
pos = read

Expand All @@ -88,11 +107,20 @@
}
}
if pos != expectedEnd {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data)

Check warning on line 110 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L110

Added line #L110 was not covered by tests
}

// A bit array that says if each column can be NULL.
result.CanBeNull, _ = newBitmap(data, pos, int(columnCount))
result.CanBeNull, read = newBitmap(data, pos, int(columnCount))
pos = read

// Read any text based column collation values provided in the optional metadata.
//log.Errorf("DEBUG: Remaining optional metadata bytes for %s: %v", result.Name, data[pos:])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these DEBUG logging statements will be removed before merging.

var err error
if result.ColumnCollationIDs, err = readColumnCollationIDs(data, pos, int(columnCount)); err != nil {
return nil, err

Check warning on line 121 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L121

Added line #L121 was not covered by tests
}
//log.Errorf("DEBUG: table %s; ColumnCollationIDs: %+v", result.Name, result.ColumnCollationIDs)

return result, nil
}
Expand All @@ -118,7 +146,7 @@

default:
// Unknown type. This is used in tests only, so panic.
panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ))
panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ))

Check warning on line 149 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L149

Added line #L149 was not covered by tests
}
}

Expand Down Expand Up @@ -154,7 +182,7 @@

default:
// Unknown types, we can't go on.
return 0, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)
return 0, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)

Check warning on line 185 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L185

Added line #L185 was not covered by tests
}
}

Expand Down Expand Up @@ -185,8 +213,46 @@

default:
// Unknown type. This is used in tests only, so panic.
panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ))
panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ))

Check warning on line 216 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L216

Added line #L216 was not covered by tests
}
}

// readColumnCollationIDs reads from the optional metadata that exists.
// See: https://github.com/mysql/mysql-server/blob/8.0/libbinlogevents/include/rows_event.h
// What's included depends on the server configuration:
// https://dev.mysql.com/doc/refman/en/replication-options-binary-log.html#sysvar_binlog_row_metadata
// and the table definition.
// We only care about the collation IDs of the text based columns and
// this info is provided in all binlog_row_metadata formats.
func readColumnCollationIDs(data []byte, pos, count int) ([]collations.ID, error) {
collationIDs := make([]collations.ID, 0, count)
for pos < len(data) {
fieldType := uint8(data[pos])
pos++

fieldLen, read, ok := readLenEncInt(data, pos)
if !ok || read+int(fieldLen) > len(data) {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "error reading optional metadata field length")

Check warning on line 235 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L235

Added line #L235 was not covered by tests
}
pos = read

fieldVal := data[pos : pos+int(fieldLen)]
pos += int(fieldLen)

//log.Errorf("DEBUG: Optional Metadata Field Type: %v, Length: %v, Value: %v", fieldType, fieldLen, fieldVal)
if fieldType == TableMapDefaultCharset || fieldType == TableMapColumnCharset { // It's one or the other
for i := uint64(0); i < fieldLen; i++ {
v := uint16(fieldVal[i])
if v == 252 { // The ID is the subsequent 2 bytes
v = binary.LittleEndian.Uint16(fieldVal[i+1 : i+3])
i += 2
}
collationIDs = append(collationIDs, collations.ID(v))
//log.Errorf("DEBUG: charset idx %d: %v", i, v)
}
}
}
return collationIDs, nil
}

// Rows implements BinlogEvent.TableMap().
Expand Down Expand Up @@ -235,7 +301,7 @@

columnCount, read, ok := readLenEncInt(data, pos)
if !ok {
return result, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
return result, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)

Check warning on line 304 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L304

Added line #L304 was not covered by tests
}
pos = read

Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func testPlayerCopyCharPK(t *testing.T) {
defer func() { waitRetryTime = savedWaitRetryTime }()

execStatements(t, []string{
"create table src(idc binary(2) , val int, primary key(idc))",
"create table src(idc binary(2), val int, primary key(idc))",
"insert into src values('a', 1), ('c', 2)",
fmt.Sprintf("create table %s.dst(idc binary(2), val int, primary key(idc))", vrepldb),
})
Expand Down Expand Up @@ -215,7 +215,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
defer func() { waitRetryTime = savedWaitRetryTime }()

execStatements(t, []string{
"create table src(idc varchar(20), val int, primary key(idc))",
"create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb3", // Use utf8mb3 to get a consistent collation across MySQL versions
"insert into src values('a', 1), ('c', 2)",
fmt.Sprintf("create table %s.dst(idc varchar(20), val int, primary key(idc))", vrepldb),
})
Expand Down Expand Up @@ -285,7 +285,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
"/update _vt.vreplication set state='Copying'",
// Copy mode.
"insert into dst(idc,val) values ('a',1)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"a\\"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"a\\"}'.*`,
// Copy-catchup mode.
`/insert into dst\(idc,val\) select 'B', 3 from dual where \( .* 'B' COLLATE .* \) <= \( .* 'a' COLLATE .* \)`,
).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer {
Expand All @@ -295,11 +295,11 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
//upd1 := expect.
upd1 := expect.Then(qh.Eventually(
"insert into dst(idc,val) values ('B',3)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"B\\"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"B\\"}'.*`,
))
upd2 := expect.Then(qh.Eventually(
"insert into dst(idc,val) values ('c',2)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\\"c\\"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"idc\\" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:\\"c\\"}'.*`,
))
upd1.Then(upd2.Eventually())
return upd2
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletserver/repltracker/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import (
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/schema/historian.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

"vitess.io/vitess/go/vt/sqlparser"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const getInitialSchemaVersions = "select id, pos, ddl, time_updated, schemax from %s.schema_version where time_updated > %d order by id asc"
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ import (

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

// VStreamer defines the functions of VStreamer
// VStreamer defines the functions of VStreamer
// that the replicationWatcher needs.
type VStreamer interface {
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error
Expand Down
Loading
Loading