Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 21, 2024
1 parent 9963933 commit cc08784
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 41 deletions.
3 changes: 2 additions & 1 deletion go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mysql
import (
"fmt"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand Down Expand Up @@ -222,7 +223,7 @@ type TableMap struct {
// override for character based columns ONLY. This means that the
// array position needs to be mapped to the ordered list of
// character based columns in the table.
ColumnCollationIDs []uint64
ColumnCollationIDs []collations.ID
}

// Rows contains data from a {WRITE,UPDATE,DELETE}_ROWS_EVENT.
Expand Down
46 changes: 25 additions & 21 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import (
"encoding/binary"

"vitess.io/vitess/go/mysql/binlog"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtrpc"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// These are the TABLE_MAP_EVENT's optional metadata field types from: libbinlogevents/include/rows_event.h
Expand Down Expand Up @@ -84,7 +83,7 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) {

columnCount, read, ok := readLenEncInt(data, pos)
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
}
pos = read

Expand All @@ -93,7 +92,7 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) {

metaLen, read, ok := readLenEncInt(data, pos)
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data)
}
pos = read

Expand All @@ -108,21 +107,21 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) {
}
}
if pos != expectedEnd {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data)
}

// A bit array that says if each column can be NULL.
result.CanBeNull, _ = newBitmap(data, pos, int(columnCount))

//log.Errorf("DEBUG: Remining bytes for %s: %v", result.Name, data[pos:])
//log.Errorf("DEBUG: Remaining optional metadata bytes for %s: %v", result.Name, data[pos:])

// Read any CHAR based column collation overrides in the optional metadata.
// Read any text based column collation values provided in the optional metadata.
var err error
if result.ColumnCollationIDs, err = readColumnCollationIDs(data, pos); err != nil {
return nil, err
}

log.Errorf("DEBUG: table %s; ColumnCollationIDs: %+v", result.Name, result.ColumnCollationIDs)
//log.Errorf("DEBUG: table %s; ColumnCollationIDs: %+v", result.Name, result.ColumnCollationIDs)

return result, nil
}
Expand All @@ -148,7 +147,7 @@ func metadataLength(typ byte) int {

default:
// Unknown type. This is used in tests only, so panic.
panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ))
panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ))
}
}

Expand Down Expand Up @@ -184,7 +183,7 @@ func metadataRead(data []byte, pos int, typ byte) (uint16, int, error) {

default:
// Unknown types, we can't go on.
return 0, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)
return 0, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)
}
}

Expand Down Expand Up @@ -215,7 +214,7 @@ func metadataWrite(data []byte, pos int, typ byte, value uint16) int {

default:
// Unknown type. This is used in tests only, so panic.
panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ))
panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ))
}
}

Expand All @@ -226,18 +225,23 @@ func metadataWrite(data []byte, pos int, typ byte, value uint16) int {
// and the table definition.
// We only care about the collation IDs of the character based columns and
// this info is provided in all binlog_row_metadata formats.
func readColumnCollationIDs(data []byte, pos int) ([]uint64, error) {
collationIDs := make([]uint64, 0)
func readColumnCollationIDs(data []byte, pos int) ([]collations.ID, error) {
// Heurestic allocation of the slice's backing array.
collationIDs := make([]collations.ID, 0, len(data[pos:])-3)
for pos < len(data) {
fieldType := uint64(data[pos])
pos++

if fieldType == 254 { // I don't know WTFudge this is yet... but the payload then seems invalid
return nil, nil
}

if fieldType == 0 { // Null byte separator
continue
}

fieldLen, read, ok := readLenEncInt(data, pos)
if !ok {
if !ok || read+int(fieldLen) > len(data) {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "error reading optional metadata field length")
}
pos = read
Expand All @@ -247,14 +251,14 @@ func readColumnCollationIDs(data []byte, pos int) ([]uint64, error) {

//log.Errorf("DEBUG: Optional Metadata Field Type: %v, Length: %v, Value: %v", fieldType, fieldLen, fieldVal)
if fieldType == TableMapDefaultCharset || fieldType == TableMapColumnCharset { // It's one or the other
//log.Errorf("DEBUG: Number of charset bytes : %d", fieldLen)
for i := uint64(0); i < fieldLen; i++ {
v := uint64(fieldVal[i])
if v == 0 || v == 252 { // Ignore Null or unimplemented, respectively
continue
v := uint16(fieldVal[i])
if v == 252 { // The ID is the subsequent 2 bytes
v = binary.LittleEndian.Uint16(fieldVal[i+1 : i+3])
i += 2
}
collationIDs = append(collationIDs, collations.ID(v))
//log.Errorf("DEBUG: charset idx %d: %v", i, v)
collationIDs = append(collationIDs, v)
}
}
}
Expand Down Expand Up @@ -307,7 +311,7 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) {

columnCount, read, ok := readLenEncInt(data, pos)
if !ok {
return result, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
return result, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data)
}
pos = read

Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletserver/repltracker/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import (
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/schema/historian.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

"vitess.io/vitess/go/vt/sqlparser"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const getInitialSchemaVersions = "select id, pos, ddl, time_updated, schemax from %s.schema_version where time_updated > %d order by id asc"
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ import (

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

// VStreamer defines the functions of VStreamer
// VStreamer defines the functions of VStreamer
// that the replicationWatcher needs.
type VStreamer interface {
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error
Expand Down
4 changes: 1 addition & 3 deletions go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ type TestColumn struct {
len int64
dataTypeLowered string
skip bool
collationName string
}

// TestFieldEvent has all the attributes of a table required for creating a field event.
Expand Down Expand Up @@ -377,6 +376,7 @@ func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFiel
tc.collationID = testenv.DefaultCollationID
} else {
tc.collationID = collations.MySQL8().LookupByName(collationName)
log.Errorf("DEBUG: SchemaDiff provided collation for %s.%s: %s:%d", tfe.table, tc.name, collationName, tc.collationID)
}
collation := colldata.Lookup(tc.collationID)
switch tc.dataTypeLowered {
Expand All @@ -395,7 +395,6 @@ func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFiel
if tc.dataTypeLowered == "char" && collation.IsBinary() {
tc.dataType = "BINARY"
}
tc.collationID = testenv.DefaultCollationID
}
tc.colType = fmt.Sprintf("%s(%d)", tc.dataTypeLowered, l)
case "blob":
Expand All @@ -405,7 +404,6 @@ func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFiel
case "text":
tc.len = lengthText
tc.colType = "text"
tc.collationID = testenv.DefaultCollationID
case "set":
tc.len = lengthSet
tc.colType = fmt.Sprintf("%s(%s)", tc.dataTypeLowered, strings.Join(col.Type.EnumValues, ","))
Expand Down
20 changes: 18 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,6 @@ func (vs *vstreamer) buildTablePlan(id uint64, tm *mysql.TableMap) (*binlogdatap
if err != nil {
return nil, err
}

table := &Table{
Name: tm.Name,
Fields: cols,
Expand Down Expand Up @@ -763,12 +762,21 @@ func (vs *vstreamer) buildTablePlan(id uint64, tm *mysql.TableMap) (*binlogdatap

func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, error) {
var fields []*querypb.Field
var charFieldIdx int
for i, typ := range tm.Types {
t, err := sqltypes.MySQLToType(typ, 0)
if err != nil {
return nil, fmt.Errorf("unsupported type: %d, position: %d", typ, i)
}
coll := collations.CollationForType(t, vs.se.Environment().CollationEnv().DefaultConnectionCharset())
// Use the the collation inherited or the one specified specifically
// for the column if one was provided in the event's metadata.
var coll collations.ID
if sqltypes.IsText(t) && len(tm.ColumnCollationIDs) > charFieldIdx {
coll = tm.ColumnCollationIDs[charFieldIdx]
charFieldIdx++
} else { // Use the server defined default for the column's type
coll = collations.CollationForType(t, vs.se.Environment().CollationEnv().DefaultConnectionCharset())
}
fields = append(fields, &querypb.Field{
Name: fmt.Sprintf("@%d", i+1),
Type: t,
Expand Down Expand Up @@ -805,6 +813,14 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er
if err != nil {
return nil, err
}
// This uses the historian which queries the columns in the table and uses the
// generated fields. This means that the fields are using collations for the
// column types based on the *connection collation* and not the actual
// *column collation*.
// So we copy the collation information from the actual TableMap here.
for i := range fieldsCopy {
fieldsCopy[i].Charset = fields[i].Charset
}
return fieldsCopy, nil
}

Expand Down
9 changes: 4 additions & 5 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestCellValuePadding(t *testing.T) {
ddls: []string{
"create table t1(id int, val binary(4), primary key(val))",
"create table t2(id int, val char(4), primary key(val))",
"create table t3(id int, val char(4) collate utf8mb4_bin, primary key(val))"},
"create table t3(id int, txt text, val char(4) collate utf8mb4_bin, val2 varchar(64) collate utf8mb4_general_ci, val3 varchar(255) collate utf8mb4_ja_0900_as_cs, primary key(val))"},
}
defer ts.Close()
require.NoError(t, ts.Init())
Expand All @@ -172,10 +172,10 @@ func TestCellValuePadding(t *testing.T) {
{"update t2 set id = 11 where val = 'aaa'", []TestRowEvent{
{spec: &TestRowEventSpec{table: "t2", changes: []TestRowChange{{before: []string{"1", "aaa"}, after: []string{"11", "aaa"}}}}},
}},
{"insert into t3 values (1, 'aaa')", nil},
{"insert into t3 values (2, 'bb')", nil},
{"insert into t3 values (1, 'aaa', 'aaa', 'aaa', 'aaa')", nil},
{"insert into t3 values (2, 'bb', 'bb', 'bb', 'bb')", nil},
{"update t3 set id = 11 where val = 'aaa'", []TestRowEvent{
{spec: &TestRowEventSpec{table: "t3", changes: []TestRowChange{{before: []string{"1", "aaa"}, after: []string{"11", "aaa"}}}}},
{spec: &TestRowEventSpec{table: "t3", changes: []TestRowChange{{before: []string{"1", "aaa", "aaa", "aaa", "aaa"}, after: []string{"11", "aaa", "aaa", "aaa", "aaa"}}}}},
}},
{"commit", nil},
}}
Expand Down Expand Up @@ -2097,7 +2097,6 @@ func TestGeneratedInvisiblePrimaryKey(t *testing.T) {
}

func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string, tablePK []*binlogdatapb.TableLastPK) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg, ch := startStream(ctx, t, filter, position, tablePK)
Expand Down

0 comments on commit cc08784

Please sign in to comment.