diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 477bb2518b5..350cd5d8251 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "regexp" "sync" "testing" @@ -330,9 +331,9 @@ func TestVStreamSharded(t *testing.T) { received bool } expectedEvents := []*expectedEvent{ - {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"-80"}`, false}, + {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"-80"}`, false}, {`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"11"}} keyspace:"ks" shard:"-80"}`, false}, - {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"80-"}`, false}, + {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"80-"}`, false}, {`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"44"}} keyspace:"ks" shard:"80-"}`, false}, } for { @@ -357,7 +358,7 @@ func TestVStreamSharded(t *testing.T) { for _, ev := range evs { s := fmt.Sprintf("%v", ev) for _, expectedEv := range expectedEvents { - if expectedEv.ev == s { + if removeAnyDeprecatedDisplayWidths(expectedEv.ev) == removeAnyDeprecatedDisplayWidths(s) { expectedEv.received = true break } @@ -381,6 +382,17 @@ func TestVStreamSharded(t *testing.T) { } +func removeAnyDeprecatedDisplayWidths(orig string) string { + var adjusted string + baseIntType := "int" + intRE := regexp.MustCompile(`(?i)int\(([0-9]*)?\)`) + adjusted = intRE.ReplaceAllString(orig, baseIntType) + baseYearType := "year" + yearRE := regexp.MustCompile(`(?i)year\(([0-9]*)?\)`) + adjusted = yearRE.ReplaceAllString(adjusted, baseYearType) + return adjusted +} + var printMu sync.Mutex func printEvents(evs []*binlogdatapb.VEvent) { diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 3da2c970298..a2bc29d1379 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -24,6 +24,8 @@ import ( "sync" "time" + "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/schema" @@ -587,6 +589,32 @@ func (se *Engine) GetSchema() map[string]*Table { return tables } +// MarshalMinimalSchema returns a protobuf encoded binlogdata.MinimalSchema +func (se *Engine) MarshalMinimalSchema() ([]byte, error) { + se.mu.Lock() + defer se.mu.Unlock() + dbSchema := &binlogdatapb.MinimalSchema{ + Tables: make([]*binlogdatapb.MinimalTable, 0, len(se.tables)), + } + for _, table := range se.tables { + dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table)) + } + return proto.Marshal(dbSchema) +} + +func newMinimalTable(st *Table) *binlogdatapb.MinimalTable { + table := &binlogdatapb.MinimalTable{ + Name: st.Name.String(), + Fields: st.Fields, + } + pkc := make([]int64, len(st.PKColumns)) + for i, pk := range st.PKColumns { + pkc[i] = int64(pk) + } + table.PKColumns = pkc + return table +} + // GetConnection returns a connection from the pool func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) { return se.conns.Get(ctx, nil) diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 51204e4db3f..3bfa19a36d0 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -23,8 +23,6 @@ import ( "sync" "time" - "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/mysql" @@ -240,14 +238,10 @@ func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error } func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error { - tables := tr.engine.GetSchema() - dbSchema := &binlogdatapb.MinimalSchema{ - Tables: []*binlogdatapb.MinimalTable{}, - } - for _, table := range tables { - dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table)) + blob, err := tr.engine.MarshalMinimalSchema() + if err != nil { + return err } - blob, _ := proto.Marshal(dbSchema) conn, err := tr.engine.GetConnection(ctx) if err != nil { @@ -265,19 +259,6 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, return nil } -func newMinimalTable(st *Table) *binlogdatapb.MinimalTable { - table := &binlogdatapb.MinimalTable{ - Name: st.Name.String(), - Fields: st.Fields, - } - var pkc []int64 - for _, pk := range st.PKColumns { - pkc = append(pkc, int64(pk)) - } - table.PKColumns = pkc - return table -} - func encodeString(in string) string { buf := bytes.NewBuffer(nil) sqltypes.NewVarChar(in).EncodeSQL(buf) diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 67be0513e67..560546d7c04 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -153,9 +153,14 @@ func (rs *rowStreamer) buildPlan() error { return err } ti := &Table{ - Name: st.Name, - Fields: st.Fields, + Name: st.Name, } + + ti.Fields, err = getFields(rs.ctx, rs.cp, st.Name, rs.cp.DBName(), st.Fields) + if err != nil { + return err + } + // The plan we build is identical to the one for vstreamer. // This is because the row format of a read is identical // to the row format of a binlog event. So, the same diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go index 2a9ac5a47ff..4115a006c37 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go @@ -72,7 +72,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: simulates rollup, with non-pk column wantStream = []string{ - `fields:{name:"1" type:INT64} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"1" type:INT64} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"1bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -80,7 +80,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: simulates rollup, with pk and non-pk column wantStream = []string{ - `fields:{name:"1" type:INT64} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"1" type:INT64} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:1 lengths:1 lengths:3 values:"11aaa"} rows:{lengths:1 lengths:1 lengths:3 values:"12bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -88,7 +88,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: no pk in select list wantStream = []string{ - `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:3 values:"aaa"} rows:{lengths:3 values:"bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -96,7 +96,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: all rows wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -104,7 +104,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: lastpk=1 wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 where (id > 1) order by id" @@ -112,7 +112,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: different column ordering wantStream = []string{ - `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:3 lengths:1 values:"aaa1"} rows:{lengths:3 lengths:1 values:"bbb2"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -120,7 +120,7 @@ func TestStreamRowsScan(t *testing.T) { // t2: all rows wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32}`, `rows:{lengths:1 lengths:1 lengths:3 values:"12aaa"} rows:{lengths:1 lengths:1 lengths:3 values:"13bbb"} lastpk:{lengths:1 lengths:1 values:"13"}`, } wantQuery = "select id1, id2, val from t2 order by id1, id2" @@ -128,7 +128,7 @@ func TestStreamRowsScan(t *testing.T) { // t2: lastpk=1,2 wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32}`, `rows:{lengths:1 lengths:1 lengths:3 values:"13bbb"} lastpk:{lengths:1 lengths:1 values:"13"}`, } wantQuery = "select id1, id2, val from t2 where (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2" @@ -136,7 +136,7 @@ func TestStreamRowsScan(t *testing.T) { // t3: all rows wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32} pkfields:{name:"val" type:VARBINARY}`, + `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32} pkfields:{name:"val" type:VARBINARY}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 lengths:3 values:"2bbb"}`, } wantQuery = "select id, val from t3 order by id, val" @@ -144,7 +144,7 @@ func TestStreamRowsScan(t *testing.T) { // t3: lastpk: 1,'aaa' wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32} pkfields:{name:"val" type:VARBINARY}`, + `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32} pkfields:{name:"val" type:VARBINARY}`, `rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 lengths:3 values:"2bbb"}`, } wantQuery = "select id, val from t3 where (id = 1 and val > 'aaa') or (id > 1) order by id, val" @@ -152,7 +152,7 @@ func TestStreamRowsScan(t *testing.T) { // t4: all rows wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32} pkfields:{name:"id3" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32} pkfields:{name:"id3" type:INT32}`, `rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"123aaa"} rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"234bbb"} lastpk:{lengths:1 lengths:1 lengths:1 values:"234"}`, } wantQuery = "select id1, id2, id3, val from t4 order by id1, id2, id3" @@ -160,7 +160,7 @@ func TestStreamRowsScan(t *testing.T) { // t4: lastpk: 1,2,3 wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32} pkfields:{name:"id3" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32} pkfields:{name:"id3" type:INT32}`, `rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"234bbb"} lastpk:{lengths:1 lengths:1 lengths:1 values:"234"}`, } wantQuery = "select id1, id2, id3, val from t4 where (id1 = 1 and id2 = 2 and id3 > 3) or (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2, id3" @@ -255,7 +255,7 @@ func TestStreamRowsKeyRange(t *testing.T) { // Only the first row should be returned, but lastpk should be 6. wantStream := []string{ - `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32}`, `rows:{lengths:1 lengths:3 values:"1aaa"} lastpk:{lengths:1 values:"6"}`, } wantQuery := "select id1, val from t1 order by id1" @@ -287,7 +287,7 @@ func TestStreamRowsFilterInt(t *testing.T) { time.Sleep(1 * time.Second) wantStream := []string{ - `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"4ddd"} lastpk:{lengths:1 values:"5"}`, } wantQuery := "select id1, id2, val from t1 order by id1" @@ -320,7 +320,7 @@ func TestStreamRowsFilterVarBinary(t *testing.T) { time.Sleep(1 * time.Second) wantStream := []string{ - `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32}`, `rows:{lengths:1 lengths:6 values:"2newton"} rows:{lengths:1 lengths:6 values:"3newton"} rows:{lengths:1 lengths:6 values:"5newton"} lastpk:{lengths:1 values:"6"}`, } wantQuery := "select id1, val from t1 order by id1" @@ -346,7 +346,7 @@ func TestStreamRowsMultiPacket(t *testing.T) { engine.se.Reload(context.Background()) wantStream := []string{ - `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:1 lengths:3 values:"1234"} rows:{lengths:1 lengths:4 values:"26789"} rows:{lengths:1 lengths:1 values:"31"} lastpk:{lengths:1 values:"3"}`, `rows:{lengths:1 lengths:10 values:"42345678901"} lastpk:{lengths:1 values:"4"}`, `rows:{lengths:1 lengths:1 values:"52"} lastpk:{lengths:1 values:"5"}`, @@ -415,7 +415,9 @@ func checkStream(t *testing.T, query string, lastpk []sqltypes.Value, wantQuery re, _ := regexp.Compile(` flags:[\d]+`) srows = re.ReplaceAllString(srows, "") - if srows != wantStream[i] { + want := env.RemoveAnyDeprecatedDisplayWidths(wantStream[i]) + + if srows != want { ch <- fmt.Errorf("stream %d:\n%s, want\n%s", i, srows, wantStream[i]) } i++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index da61163a6ca..ffbed6e3c42 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -24,6 +24,7 @@ import ( "time" "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/timer" @@ -778,39 +779,16 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er } // Columns should be truncated to match those in tm. - fields = st.Fields[:len(tm.Types)] - extColInfos, err := vs.getExtColInfos(tm.Name, tm.Database) + fieldsCopy, err := getFields(vs.ctx, vs.cp, tm.Name, tm.Database, st.Fields[:len(tm.Types)]) if err != nil { return nil, err } - for _, field := range fields { - // we want the MySQL column type info so that we can properly handle - // ambiguous binlog events and other cases where the internal types - // don't match the MySQL column type. One example being that in binlog - // events CHAR columns with a binary collation are indistinguishable - // from BINARY columns. - if extColInfo, ok := extColInfos[field.Name]; ok { - field.ColumnType = extColInfo.columnType - } - } - return fields, nil -} - -// additional column attributes from information_schema.columns. Currently only column_type is used, but -// we expect to add more in the future -type extColInfo struct { - columnType string -} - -func encodeString(in string) string { - buf := bytes.NewBuffer(nil) - sqltypes.NewVarChar(in).EncodeSQL(buf) - return buf.String() + return fieldsCopy, nil } -func (vs *vstreamer) getExtColInfos(table, database string) (map[string]*extColInfo, error) { +func getExtColInfos(ctx context.Context, cp dbconfigs.Connector, table, database string) (map[string]*extColInfo, error) { extColInfos := make(map[string]*extColInfo) - conn, err := vs.cp.Connect(vs.ctx) + conn, err := cp.Connect(ctx) if err != nil { return nil, err } @@ -830,6 +808,37 @@ func (vs *vstreamer) getExtColInfos(table, database string) (map[string]*extColI return extColInfos, nil } +func getFields(ctx context.Context, cp dbconfigs.Connector, table, database string, fields []*querypb.Field) ([]*querypb.Field, error) { + // Make a deep copy of the schema.Engine fields as they are pointers and + // will be modified by adding ColumnType below + fieldsCopy := make([]*querypb.Field, len(fields)) + for i, field := range fields { + fieldsCopy[i] = proto.Clone(field).(*querypb.Field) + } + extColInfos, err := getExtColInfos(ctx, cp, table, database) + if err != nil { + return nil, err + } + for _, field := range fieldsCopy { + if colInfo, ok := extColInfos[field.Name]; ok { + field.ColumnType = colInfo.columnType + } + } + return fieldsCopy, nil +} + +// additional column attributes from information_schema.columns. Currently only column_type is used, but +// we expect to add more in the future +type extColInfo struct { + columnType string +} + +func encodeString(in string) string { + buf := bytes.NewBuffer(nil) + sqltypes.NewVarChar(in).EncodeSQL(buf) + return buf.String() +} + func (vs *vstreamer) processJournalEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) { // Get DbName params, err := vs.cp.MysqlParams() diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index 191ba408f97..cc47daf8b21 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -418,8 +418,8 @@ func TestVStreamCopySimpleFlow(t *testing.T) { tablePKs = append(tablePKs, getTablePK("t1", 1)) tablePKs = append(tablePKs, getTablePK("t2", 2)) - t1FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63}}"} - t2FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63}}"} + t1FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}"} + t2FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}"} t1Events := []string{} t2Events := []string{} for i := 1; i <= 10; i++ { @@ -503,7 +503,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { var expectedEvents = []string{ "type:BEGIN", - "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id1\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63} fields:{name:\"id2\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", + "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id1\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id2\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:GTID", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:1 lengths:1 values:\"12\"}}}", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{rows:{lengths:1 values:\"1\"}}}}", @@ -512,7 +512,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\"} completed:true}", "type:COMMIT", "type:BEGIN", - "type:FIELD field_event:{table_name:\"t2a\" fields:{name:\"id1\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63} fields:{name:\"id2\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", + "type:FIELD field_event:{table_name:\"t2a\" fields:{name:\"id1\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id2\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t2a\" row_changes:{after:{lengths:1 lengths:1 values:\"14\"}}}", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\" lastpk:{rows:{lengths:1 values:\"1\"}}}}", "type:COMMIT", @@ -520,7 +520,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\"} completed:true}", "type:COMMIT", "type:BEGIN", - "type:FIELD field_event:{table_name:\"t2b\" fields:{name:\"id1\" type:VARCHAR table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id1\" column_length:80 charset:45} fields:{name:\"id2\" type:INT32 table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", + "type:FIELD field_event:{table_name:\"t2b\" fields:{name:\"id1\" type:VARCHAR table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id1\" column_length:80 charset:45 column_type:\"varchar(20)\"} fields:{name:\"id2\" type:INT32 table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"a5\"}}}", "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"b6\"}}}", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\" lastpk:{rows:{lengths:1 values:\"b\"}}}}", @@ -566,8 +566,11 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { } got := ev.String() want := expectedEvents[i] + + want = env.RemoveAnyDeprecatedDisplayWidths(want) + if !strings.HasPrefix(got, want) { - errGoroutine = fmt.Errorf("Event %d did not match, want %s, got %s", i, want, got) + errGoroutine = fmt.Errorf("event %d did not match, want %s, got %s", i, want, got) return errGoroutine } }