From 69f88f4a5a2a85a80884d5e9d28f8e4fa2892bad Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Mon, 18 Sep 2023 09:45:17 -0400 Subject: [PATCH 1/2] Enable failures in `tools/e2e_test_race.sh` and fix races (#13654) Signed-off-by: Florent Poinsard --- go/vt/vttablet/endtoend/framework/client.go | 6 ++++++ go/vt/vttablet/endtoend/transaction_test.go | 12 ++++++------ go/vt/vttablet/tabletserver/throttle/client.go | 5 ++++- tools/e2e_test_race.sh | 4 +++- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/endtoend/framework/client.go b/go/vt/vttablet/endtoend/framework/client.go index 78ace7504a2..7998425a38c 100644 --- a/go/vt/vttablet/endtoend/framework/client.go +++ b/go/vt/vttablet/endtoend/framework/client.go @@ -19,6 +19,7 @@ package framework import ( "context" "errors" + "sync" "time" "google.golang.org/protobuf/proto" @@ -40,6 +41,7 @@ type QueryClient struct { target *querypb.Target server *tabletserver.TabletServer transactionID int64 + reservedIDMu sync.Mutex reservedID int64 sessionStateChanges string } @@ -114,6 +116,8 @@ func (client *QueryClient) Commit() error { func (client *QueryClient) Rollback() error { defer func() { client.transactionID = 0 }() rID, err := client.server.Rollback(client.ctx, client.target, client.transactionID) + client.reservedIDMu.Lock() + defer client.reservedIDMu.Unlock() client.reservedID = rID if err != nil { return err @@ -293,6 +297,8 @@ func (client *QueryClient) MessageAck(name string, ids []string) (int64, error) // ReserveExecute performs a ReserveExecute. func (client *QueryClient) ReserveExecute(query string, preQueries []string, bindvars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + client.reservedIDMu.Lock() + defer client.reservedIDMu.Unlock() if client.reservedID != 0 { return nil, errors.New("already reserved a connection") } diff --git a/go/vt/vttablet/endtoend/transaction_test.go b/go/vt/vttablet/endtoend/transaction_test.go index 6751e60f9ad..8f6546df5f1 100644 --- a/go/vt/vttablet/endtoend/transaction_test.go +++ b/go/vt/vttablet/endtoend/transaction_test.go @@ -321,7 +321,7 @@ func TestShutdownGracePeriod(t *testing.T) { err := client.Begin(false) require.NoError(t, err) go func() { - _, err = client.Execute("select sleep(10) from dual", nil) + _, err := client.Execute("select sleep(10) from dual", nil) assert.Error(t, err) }() @@ -346,7 +346,7 @@ func TestShutdownGracePeriod(t *testing.T) { err = client.Begin(false) require.NoError(t, err) go func() { - _, err = client.Execute("select sleep(11) from dual", nil) + _, err := client.Execute("select sleep(11) from dual", nil) assert.Error(t, err) }() @@ -373,7 +373,7 @@ func TestShutdownGracePeriodWithStreamExecute(t *testing.T) { err := client.Begin(false) require.NoError(t, err) go func() { - _, err = client.StreamExecute("select sleep(10) from dual", nil) + _, err := client.StreamExecute("select sleep(10) from dual", nil) assert.Error(t, err) }() @@ -398,7 +398,7 @@ func TestShutdownGracePeriodWithStreamExecute(t *testing.T) { err = client.Begin(false) require.NoError(t, err) go func() { - _, err = client.StreamExecute("select sleep(11) from dual", nil) + _, err := client.StreamExecute("select sleep(11) from dual", nil) assert.Error(t, err) }() @@ -425,7 +425,7 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) { err := client.Begin(false) require.NoError(t, err) go func() { - _, err = client.ReserveExecute("select sleep(10) from dual", nil, nil) + _, err := client.ReserveExecute("select sleep(10) from dual", nil, nil) assert.Error(t, err) }() @@ -450,7 +450,7 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) { err = client.Begin(false) require.NoError(t, err) go func() { - _, err = client.ReserveExecute("select sleep(11) from dual", nil, nil) + _, err := client.ReserveExecute("select sleep(11) from dual", nil, nil) assert.Error(t, err) }() diff --git a/go/vt/vttablet/tabletserver/throttle/client.go b/go/vt/vttablet/tabletserver/throttle/client.go index 10fe910d264..240ac7a0260 100644 --- a/go/vt/vttablet/tabletserver/throttle/client.go +++ b/go/vt/vttablet/tabletserver/throttle/client.go @@ -50,7 +50,8 @@ type Client struct { checkType ThrottleCheckType flags CheckFlags - lastSuccessfulThrottle int64 + lastSuccessfulThrottleMu sync.Mutex + lastSuccessfulThrottle int64 } // NewProductionClient creates a client suitable for foreground/production jobs, which have normal priority. @@ -94,6 +95,8 @@ func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName string) (t // no throttler return true } + c.lastSuccessfulThrottleMu.Lock() + defer c.lastSuccessfulThrottleMu.Unlock() if c.lastSuccessfulThrottle >= atomic.LoadInt64(&throttleTicks) { // if last check was OK just very recently there is no need to check again return true diff --git a/tools/e2e_test_race.sh b/tools/e2e_test_race.sh index 7dad5b259a3..b072e1261e2 100755 --- a/tools/e2e_test_race.sh +++ b/tools/e2e_test_race.sh @@ -38,9 +38,11 @@ packages_with_tests=$(echo "$packages_with_tests" | grep -vE "go/test/endtoend" # endtoend tests should be in a directory called endtoend all_e2e_tests=$(echo "$packages_with_tests" | cut -d" " -f1) +set -exo pipefail + # Run all endtoend tests. echo "$all_e2e_tests" | xargs go test $VT_GO_PARALLEL -race 2>&1 | tee $temp_log_file -if [ ${PIPESTATUS[0]} -ne 0 ]; then +if [ ${PIPESTATUS[1]} -ne 0 ]; then if grep "WARNING: DATA RACE" -q $temp_log_file; then echo echo "ERROR: go test -race found a data race. See log above." From df54722e23dc22ce422bd8151f3aaf1f30c22f5b Mon Sep 17 00:00:00 2001 From: Brendan Dougherty Date: Mon, 19 Jun 2023 19:59:45 -0400 Subject: [PATCH 2/2] Vttablet schema tracking: Fix _vt.schema_version corruption (#13045) * Copy fields from schema.Engine before modifying This fixes a race condition which caused protobuf marshaled schema data in _vt.schema_version rows to become corrupted when ColumnType was modified between the time when Field message sizes were calculated and when Field message data was written to the buffer. Signed-off-by: Brendan Dougherty * Acquire schema engine mutex while marshaling schema This change is purely defensive, but it may help avoid future race conditions caused by modifying shared schema structs while they are being marshaled to protobuf. Signed-off-by: Brendan Dougherty * reorganize imports and allocate slices with make Signed-off-by: Austen Lacy * gofmt Signed-off-by: Austen Lacy * flakey unit test Signed-off-by: Austen Lacy * Use common code to update column_type for both vstreamer and rowstream events in a thread-safe fashion. Fix related tests Signed-off-by: Rohit Nayak * Use DBName and not keyspace name while getting extended field info! Signed-off-by: Rohit Nayak * Fix TestVStreamSharded Signed-off-by: Rohit Nayak --------- Signed-off-by: Brendan Dougherty Signed-off-by: Austen Lacy Signed-off-by: Rohit Nayak Co-authored-by: Austen Lacy Co-authored-by: Rohit Nayak --- go/vt/vtgate/endtoend/vstream_test.go | 18 +++++- go/vt/vttablet/tabletserver/schema/engine.go | 26 ++++++++ go/vt/vttablet/tabletserver/schema/tracker.go | 25 +------- .../tabletserver/vstreamer/rowstreamer.go | 9 ++- .../vstreamer/rowstreamer_test.go | 36 ++++++----- .../tabletserver/vstreamer/vstreamer.go | 63 +++++++++++-------- .../vstreamer/vstreamer_flaky_test.go | 15 +++-- 7 files changed, 115 insertions(+), 77 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 477bb2518b5..350cd5d8251 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "regexp" "sync" "testing" @@ -330,9 +331,9 @@ func TestVStreamSharded(t *testing.T) { received bool } expectedEvents := []*expectedEvent{ - {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"-80"}`, false}, + {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"-80"}`, false}, {`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"11"}} keyspace:"ks" shard:"-80"}`, false}, - {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"80-"}`, false}, + {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"80-"}`, false}, {`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"44"}} keyspace:"ks" shard:"80-"}`, false}, } for { @@ -357,7 +358,7 @@ func TestVStreamSharded(t *testing.T) { for _, ev := range evs { s := fmt.Sprintf("%v", ev) for _, expectedEv := range expectedEvents { - if expectedEv.ev == s { + if removeAnyDeprecatedDisplayWidths(expectedEv.ev) == removeAnyDeprecatedDisplayWidths(s) { expectedEv.received = true break } @@ -381,6 +382,17 @@ func TestVStreamSharded(t *testing.T) { } +func removeAnyDeprecatedDisplayWidths(orig string) string { + var adjusted string + baseIntType := "int" + intRE := regexp.MustCompile(`(?i)int\(([0-9]*)?\)`) + adjusted = intRE.ReplaceAllString(orig, baseIntType) + baseYearType := "year" + yearRE := regexp.MustCompile(`(?i)year\(([0-9]*)?\)`) + adjusted = yearRE.ReplaceAllString(adjusted, baseYearType) + return adjusted +} + var printMu sync.Mutex func printEvents(evs []*binlogdatapb.VEvent) { diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 576a804debb..a23dbfe1277 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -593,6 +593,32 @@ func (se *Engine) GetSchema() map[string]*Table { return tables } +// MarshalMinimalSchema returns a protobuf encoded binlogdata.MinimalSchema +func (se *Engine) MarshalMinimalSchema() ([]byte, error) { + se.mu.Lock() + defer se.mu.Unlock() + dbSchema := &binlogdatapb.MinimalSchema{ + Tables: make([]*binlogdatapb.MinimalTable, 0, len(se.tables)), + } + for _, table := range se.tables { + dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table)) + } + return dbSchema.MarshalVT() +} + +func newMinimalTable(st *Table) *binlogdatapb.MinimalTable { + table := &binlogdatapb.MinimalTable{ + Name: st.Name.String(), + Fields: st.Fields, + } + pkc := make([]int64, len(st.PKColumns)) + for i, pk := range st.PKColumns { + pkc[i] = int64(pk) + } + table.PKColumns = pkc + return table +} + // GetConnection returns a connection from the pool func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) { return se.conns.Get(ctx, nil) diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 51204e4db3f..3bfa19a36d0 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -23,8 +23,6 @@ import ( "sync" "time" - "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/mysql" @@ -240,14 +238,10 @@ func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error } func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error { - tables := tr.engine.GetSchema() - dbSchema := &binlogdatapb.MinimalSchema{ - Tables: []*binlogdatapb.MinimalTable{}, - } - for _, table := range tables { - dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table)) + blob, err := tr.engine.MarshalMinimalSchema() + if err != nil { + return err } - blob, _ := proto.Marshal(dbSchema) conn, err := tr.engine.GetConnection(ctx) if err != nil { @@ -265,19 +259,6 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, return nil } -func newMinimalTable(st *Table) *binlogdatapb.MinimalTable { - table := &binlogdatapb.MinimalTable{ - Name: st.Name.String(), - Fields: st.Fields, - } - var pkc []int64 - for _, pk := range st.PKColumns { - pkc = append(pkc, int64(pk)) - } - table.PKColumns = pkc - return table -} - func encodeString(in string) string { buf := bytes.NewBuffer(nil) sqltypes.NewVarChar(in).EncodeSQL(buf) diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 67be0513e67..560546d7c04 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -153,9 +153,14 @@ func (rs *rowStreamer) buildPlan() error { return err } ti := &Table{ - Name: st.Name, - Fields: st.Fields, + Name: st.Name, } + + ti.Fields, err = getFields(rs.ctx, rs.cp, st.Name, rs.cp.DBName(), st.Fields) + if err != nil { + return err + } + // The plan we build is identical to the one for vstreamer. // This is because the row format of a read is identical // to the row format of a binlog event. So, the same diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go index 2a9ac5a47ff..4115a006c37 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go @@ -72,7 +72,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: simulates rollup, with non-pk column wantStream = []string{ - `fields:{name:"1" type:INT64} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"1" type:INT64} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"1bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -80,7 +80,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: simulates rollup, with pk and non-pk column wantStream = []string{ - `fields:{name:"1" type:INT64} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"1" type:INT64} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:1 lengths:1 lengths:3 values:"11aaa"} rows:{lengths:1 lengths:1 lengths:3 values:"12bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -88,7 +88,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: no pk in select list wantStream = []string{ - `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:3 values:"aaa"} rows:{lengths:3 values:"bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -96,7 +96,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: all rows wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -104,7 +104,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: lastpk=1 wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 where (id > 1) order by id" @@ -112,7 +112,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: different column ordering wantStream = []string{ - `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:3 lengths:1 values:"aaa1"} rows:{lengths:3 lengths:1 values:"bbb2"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -120,7 +120,7 @@ func TestStreamRowsScan(t *testing.T) { // t2: all rows wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32}`, `rows:{lengths:1 lengths:1 lengths:3 values:"12aaa"} rows:{lengths:1 lengths:1 lengths:3 values:"13bbb"} lastpk:{lengths:1 lengths:1 values:"13"}`, } wantQuery = "select id1, id2, val from t2 order by id1, id2" @@ -128,7 +128,7 @@ func TestStreamRowsScan(t *testing.T) { // t2: lastpk=1,2 wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32}`, `rows:{lengths:1 lengths:1 lengths:3 values:"13bbb"} lastpk:{lengths:1 lengths:1 values:"13"}`, } wantQuery = "select id1, id2, val from t2 where (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2" @@ -136,7 +136,7 @@ func TestStreamRowsScan(t *testing.T) { // t3: all rows wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32} pkfields:{name:"val" type:VARBINARY}`, + `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32} pkfields:{name:"val" type:VARBINARY}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 lengths:3 values:"2bbb"}`, } wantQuery = "select id, val from t3 order by id, val" @@ -144,7 +144,7 @@ func TestStreamRowsScan(t *testing.T) { // t3: lastpk: 1,'aaa' wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32} pkfields:{name:"val" type:VARBINARY}`, + `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32} pkfields:{name:"val" type:VARBINARY}`, `rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 lengths:3 values:"2bbb"}`, } wantQuery = "select id, val from t3 where (id = 1 and val > 'aaa') or (id > 1) order by id, val" @@ -152,7 +152,7 @@ func TestStreamRowsScan(t *testing.T) { // t4: all rows wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32} pkfields:{name:"id3" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32} pkfields:{name:"id3" type:INT32}`, `rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"123aaa"} rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"234bbb"} lastpk:{lengths:1 lengths:1 lengths:1 values:"234"}`, } wantQuery = "select id1, id2, id3, val from t4 order by id1, id2, id3" @@ -160,7 +160,7 @@ func TestStreamRowsScan(t *testing.T) { // t4: lastpk: 1,2,3 wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32} pkfields:{name:"id3" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32} pkfields:{name:"id2" type:INT32} pkfields:{name:"id3" type:INT32}`, `rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"234bbb"} lastpk:{lengths:1 lengths:1 lengths:1 values:"234"}`, } wantQuery = "select id1, id2, id3, val from t4 where (id1 = 1 and id2 = 2 and id3 > 3) or (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2, id3" @@ -255,7 +255,7 @@ func TestStreamRowsKeyRange(t *testing.T) { // Only the first row should be returned, but lastpk should be 6. wantStream := []string{ - `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32}`, `rows:{lengths:1 lengths:3 values:"1aaa"} lastpk:{lengths:1 values:"6"}`, } wantQuery := "select id1, val from t1 order by id1" @@ -287,7 +287,7 @@ func TestStreamRowsFilterInt(t *testing.T) { time.Sleep(1 * time.Second) wantStream := []string{ - `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"4ddd"} lastpk:{lengths:1 values:"5"}`, } wantQuery := "select id1, id2, val from t1 order by id1" @@ -320,7 +320,7 @@ func TestStreamRowsFilterVarBinary(t *testing.T) { time.Sleep(1 * time.Second) wantStream := []string{ - `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32}`, + `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32}`, `rows:{lengths:1 lengths:6 values:"2newton"} rows:{lengths:1 lengths:6 values:"3newton"} rows:{lengths:1 lengths:6 values:"5newton"} lastpk:{lengths:1 values:"6"}`, } wantQuery := "select id1, val from t1 order by id1" @@ -346,7 +346,7 @@ func TestStreamRowsMultiPacket(t *testing.T) { engine.se.Reload(context.Background()) wantStream := []string{ - `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32}`, + `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32}`, `rows:{lengths:1 lengths:3 values:"1234"} rows:{lengths:1 lengths:4 values:"26789"} rows:{lengths:1 lengths:1 values:"31"} lastpk:{lengths:1 values:"3"}`, `rows:{lengths:1 lengths:10 values:"42345678901"} lastpk:{lengths:1 values:"4"}`, `rows:{lengths:1 lengths:1 values:"52"} lastpk:{lengths:1 values:"5"}`, @@ -415,7 +415,9 @@ func checkStream(t *testing.T, query string, lastpk []sqltypes.Value, wantQuery re, _ := regexp.Compile(` flags:[\d]+`) srows = re.ReplaceAllString(srows, "") - if srows != wantStream[i] { + want := env.RemoveAnyDeprecatedDisplayWidths(wantStream[i]) + + if srows != want { ch <- fmt.Errorf("stream %d:\n%s, want\n%s", i, srows, wantStream[i]) } i++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index da61163a6ca..ffbed6e3c42 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -24,6 +24,7 @@ import ( "time" "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/timer" @@ -778,39 +779,16 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er } // Columns should be truncated to match those in tm. - fields = st.Fields[:len(tm.Types)] - extColInfos, err := vs.getExtColInfos(tm.Name, tm.Database) + fieldsCopy, err := getFields(vs.ctx, vs.cp, tm.Name, tm.Database, st.Fields[:len(tm.Types)]) if err != nil { return nil, err } - for _, field := range fields { - // we want the MySQL column type info so that we can properly handle - // ambiguous binlog events and other cases where the internal types - // don't match the MySQL column type. One example being that in binlog - // events CHAR columns with a binary collation are indistinguishable - // from BINARY columns. - if extColInfo, ok := extColInfos[field.Name]; ok { - field.ColumnType = extColInfo.columnType - } - } - return fields, nil -} - -// additional column attributes from information_schema.columns. Currently only column_type is used, but -// we expect to add more in the future -type extColInfo struct { - columnType string -} - -func encodeString(in string) string { - buf := bytes.NewBuffer(nil) - sqltypes.NewVarChar(in).EncodeSQL(buf) - return buf.String() + return fieldsCopy, nil } -func (vs *vstreamer) getExtColInfos(table, database string) (map[string]*extColInfo, error) { +func getExtColInfos(ctx context.Context, cp dbconfigs.Connector, table, database string) (map[string]*extColInfo, error) { extColInfos := make(map[string]*extColInfo) - conn, err := vs.cp.Connect(vs.ctx) + conn, err := cp.Connect(ctx) if err != nil { return nil, err } @@ -830,6 +808,37 @@ func (vs *vstreamer) getExtColInfos(table, database string) (map[string]*extColI return extColInfos, nil } +func getFields(ctx context.Context, cp dbconfigs.Connector, table, database string, fields []*querypb.Field) ([]*querypb.Field, error) { + // Make a deep copy of the schema.Engine fields as they are pointers and + // will be modified by adding ColumnType below + fieldsCopy := make([]*querypb.Field, len(fields)) + for i, field := range fields { + fieldsCopy[i] = proto.Clone(field).(*querypb.Field) + } + extColInfos, err := getExtColInfos(ctx, cp, table, database) + if err != nil { + return nil, err + } + for _, field := range fieldsCopy { + if colInfo, ok := extColInfos[field.Name]; ok { + field.ColumnType = colInfo.columnType + } + } + return fieldsCopy, nil +} + +// additional column attributes from information_schema.columns. Currently only column_type is used, but +// we expect to add more in the future +type extColInfo struct { + columnType string +} + +func encodeString(in string) string { + buf := bytes.NewBuffer(nil) + sqltypes.NewVarChar(in).EncodeSQL(buf) + return buf.String() +} + func (vs *vstreamer) processJournalEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) { // Get DbName params, err := vs.cp.MysqlParams() diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index 191ba408f97..cc47daf8b21 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -418,8 +418,8 @@ func TestVStreamCopySimpleFlow(t *testing.T) { tablePKs = append(tablePKs, getTablePK("t1", 1)) tablePKs = append(tablePKs, getTablePK("t2", 2)) - t1FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63}}"} - t2FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63}}"} + t1FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}"} + t2FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}"} t1Events := []string{} t2Events := []string{} for i := 1; i <= 10; i++ { @@ -503,7 +503,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { var expectedEvents = []string{ "type:BEGIN", - "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id1\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63} fields:{name:\"id2\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", + "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id1\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id2\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:GTID", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:1 lengths:1 values:\"12\"}}}", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{rows:{lengths:1 values:\"1\"}}}}", @@ -512,7 +512,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\"} completed:true}", "type:COMMIT", "type:BEGIN", - "type:FIELD field_event:{table_name:\"t2a\" fields:{name:\"id1\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63} fields:{name:\"id2\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", + "type:FIELD field_event:{table_name:\"t2a\" fields:{name:\"id1\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id2\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t2a\" row_changes:{after:{lengths:1 lengths:1 values:\"14\"}}}", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\" lastpk:{rows:{lengths:1 values:\"1\"}}}}", "type:COMMIT", @@ -520,7 +520,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\"} completed:true}", "type:COMMIT", "type:BEGIN", - "type:FIELD field_event:{table_name:\"t2b\" fields:{name:\"id1\" type:VARCHAR table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id1\" column_length:80 charset:45} fields:{name:\"id2\" type:INT32 table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", + "type:FIELD field_event:{table_name:\"t2b\" fields:{name:\"id1\" type:VARCHAR table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id1\" column_length:80 charset:45 column_type:\"varchar(20)\"} fields:{name:\"id2\" type:INT32 table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"a5\"}}}", "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"b6\"}}}", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\" lastpk:{rows:{lengths:1 values:\"b\"}}}}", @@ -566,8 +566,11 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { } got := ev.String() want := expectedEvents[i] + + want = env.RemoveAnyDeprecatedDisplayWidths(want) + if !strings.HasPrefix(got, want) { - errGoroutine = fmt.Errorf("Event %d did not match, want %s, got %s", i, want, got) + errGoroutine = fmt.Errorf("event %d did not match, want %s, got %s", i, want, got) return errGoroutine } }