Skip to content

Commit

Permalink
Minor changes after self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 23, 2024
1 parent f1971b2 commit 8f6f957
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 37 deletions.
41 changes: 23 additions & 18 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,28 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// These are the TABLE_MAP_EVENT's optional metadata field types from: libbinlogevents/include/rows_event.h
// See: https://dev.mysql.com/doc/dev/mysql-server/8.0.34/structbinary__log_1_1Table__map__event_1_1Optional__metadata__fields.html
// These are the TABLE_MAP_EVENT's optional metadata field types from
// MySQL's libbinlogevents/include/rows_event.h.
// See also: https://dev.mysql.com/doc/dev/mysql-server/8.0.34/structbinary__log_1_1Table__map__event_1_1Optional__metadata__fields.html
const (
TableMapSignedness uint8 = iota + 1
TableMapDefaultCharset
TableMapColumnCharset
TableMapColumnName
TableMapSetStrValue
TableMapEnumStrValue
TableMapGeometryType
TableMapSimplePrimaryKey
TableMapPrimaryKeyWithPrefix
TableMapEnumAndSetDefaultCharset
TableMapEnumAndSetColumnCharset
TableMapColumnVisibility
tableMapSignedness uint8 = iota + 1
tableMapDefaultCharset
tableMapColumnCharset
tableMapColumnName
tableMapSetStrValue
tableMapEnumStrValue
tableMapGeometryType
tableMapSimplePrimaryKey
tableMapPrimaryKeyWithPrefix
tableMapEnumAndSetDefaultCharset
tableMapEnumAndSetColumnCharset
tableMapColumnVisibility
)

// This byte in the optional metadata indicates that we should
// read the next 2 bytes as a collation ID.
const readTwoByteCollationID = 252

// TableMap implements BinlogEvent.TableMap().
//
// Expected format (L = total length of event data):
Expand Down Expand Up @@ -222,7 +227,7 @@ func metadataWrite(data []byte, pos int, typ byte, value uint16) int {
// What's included depends on the server configuration:
// https://dev.mysql.com/doc/refman/en/replication-options-binary-log.html#sysvar_binlog_row_metadata
// and the table definition.
// We only care about the collation IDs of the text based columns and
// We only care about any collation IDs in the optional metadata and
// this info is provided in all binlog_row_metadata formats.
func readColumnCollationIDs(data []byte, pos, count int) ([]collations.ID, error) {
collationIDs := make([]collations.ID, 0, count)
Expand All @@ -231,7 +236,7 @@ func readColumnCollationIDs(data []byte, pos, count int) ([]collations.ID, error
pos++

fieldLen, read, ok := readLenEncInt(data, pos)
if !ok || read+int(fieldLen) > len(data) {
if !ok {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "error reading optional metadata field length")

Check warning on line 240 in go/mysql/binlog_event_rbr.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_rbr.go#L240

Added line #L240 was not covered by tests
}
pos = read
Expand All @@ -240,10 +245,10 @@ func readColumnCollationIDs(data []byte, pos, count int) ([]collations.ID, error
pos += int(fieldLen)

//log.Errorf("DEBUG: Optional Metadata Field Type: %v, Length: %v, Value: %v", fieldType, fieldLen, fieldVal)
if fieldType == TableMapDefaultCharset || fieldType == TableMapColumnCharset { // It's one or the other
if fieldType == tableMapDefaultCharset || fieldType == tableMapColumnCharset { // It's one or the other
for i := uint64(0); i < fieldLen; i++ {
v := uint16(fieldVal[i])
if v == 252 { // The ID is the subsequent 2 bytes
if v == readTwoByteCollationID { // The ID is the subsequent 2 bytes
v = binary.LittleEndian.Uint16(fieldVal[i+1 : i+3])
i += 2
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
defer func() { waitRetryTime = savedWaitRetryTime }()

execStatements(t, []string{
"create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb3", // Use utf8mb3 to get a consistent collation across MySQL versions
"create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb3", // Use utf8mb3 to get a consistent default collation across MySQL versions
"insert into src values('a', 1), ('c', 2)",
fmt.Sprintf("create table %s.dst(idc varchar(20), val int, primary key(idc))", vrepldb),
})
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ func init() {
panic("could not parse MySQL version: " + err.Error())

Check warning on line 65 in go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go#L65

Added line #L65 was not covered by tests
}
MySQLVersion = fmt.Sprintf("%d.%d.%d", mv.Major, mv.Minor, mv.Patch)
log.Errorf("MySQL version: %s", MySQLVersion)
log.Infof("MySQL version: %s", MySQLVersion)
CollationEnv = collations.NewEnvironment(MySQLVersion)
// utf8mb4_general_ci is the default for MySQL 5.7 and
// utf8mb4_0900_ai_ci is the default for MySQL 8.0.
DefaultCollationID = CollationEnv.DefaultConnectionCharset()
log.Errorf("Default collation ID: %d", DefaultCollationID)
log.Infof("Default collation ID: %d", DefaultCollationID)
}

// Env contains all the env vars for a test against a mysql instance.
Expand Down
29 changes: 14 additions & 15 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,22 +768,22 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er
if err != nil {
return nil, fmt.Errorf("unsupported type: %d, position: %d", typ, i)
}
// Use the the collation inherited or the one specified specifically
// for the column if one was provided in the event's metadata.
// Use the the collation inherited or the one specified explicitly for the
// column if one was provided in the event's optional metadata.
var coll collations.ID
if sqltypes.IsText(t) && len(tm.ColumnCollationIDs) > charFieldIdx {
switch {
case sqltypes.IsText(t) && len(tm.ColumnCollationIDs) > charFieldIdx:
coll = tm.ColumnCollationIDs[charFieldIdx]
charFieldIdx++
} else { // Use the server defined default for the column's type
if t == sqltypes.TypeJSON {
// JSON is a blob at this layer and we should NOT use utf8mb4.
// The collation in MySQL for a JSON column is NULL, meaning there
// is not one (same as for int) and we should use binary.
// I'm not sure why CollationForType treats it differently...
coll = collations.CollationBinaryID
} else {
coll = collations.CollationForType(t, vs.se.Environment().CollationEnv().DefaultConnectionCharset())
}
case t == sqltypes.TypeJSON:
// JSON is a blob at this (storage) layer -- vs the connection/query serving
// layer which CollationForType seems primarily concerned about and JSON at
// the response layer should be using utf-8 as that's the standard -- so we
// should NOT use utf8mb4 as the collation in MySQL for a JSON column is
// NULL, meaning there is not one (same as for int) and we should use binary.
coll = collations.CollationBinaryID
default: // Use the server defined default for the column's type
coll = collations.CollationForType(t, vs.se.Environment().CollationEnv().DefaultConnectionCharset())
}
fields = append(fields, &querypb.Field{
Name: fmt.Sprintf("@%d", i+1),
Expand Down Expand Up @@ -884,8 +884,7 @@ func getFields(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, t
return fieldsCopy, nil
}

// additional column attributes from information_schema.columns. Currently only column_type is used, but
// we expect to add more in the future
// Additional column attributes to get from information_schema.columns.
type extColInfo struct {
columnType string
collationID collations.ID
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ func TestCellValuePadding(t *testing.T) {
}

// TestColumnCollationHandling confirms that we handle column collations
// properly in vstreams now that we parse the values in binlog_row_metadata.
// properly in vstreams now that we parse any optional collation ID values
// in binlog_row_metadata AND we query mysqld for the collation when possible.
func TestColumnCollationHandling(t *testing.T) {
extraCollation := "utf8mb4_ja_0900_as_cs" // Test 2 byte collation ID handling
if strings.HasPrefix(testenv.MySQLVersion, "5.7") { // 5.7 does not support 2 byte collation IDs
Expand Down

0 comments on commit 8f6f957

Please sign in to comment.