From c1887b706b66b69703211762cbf6427c760c02c6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 15 Dec 2023 17:12:46 -0500 Subject: [PATCH 1/5] Don't try to get PK col info from I_S when there are no PK cols Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/table_plan.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go index f636eea5cae..21efb097169 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go @@ -223,6 +223,9 @@ func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlpa // saves the collations in the tablePlan's comparePKs column info // structs for those subsequent operations. func (tp *tablePlan) getPKColumnCollations(dbClient binlogplayer.DBClient) error { + if len(tp.comparePKs) == 0 { + return nil + } columnList := make([]string, len(tp.comparePKs)) for i := range tp.comparePKs { columnList[i] = tp.comparePKs[i].colName From 4910b0743f669f166eb60a8819191420ba96f73e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 15 Dec 2023 20:15:34 -0500 Subject: [PATCH 2/5] Add unit test case This also more generally adds support for diffing tables without PK columns. Signed-off-by: Matt Lord --- .../tabletmanager/vdiff/framework_test.go | 5 ++ .../tabletmanager/vdiff/table_plan.go | 35 ++++++--- .../vdiff/workflow_differ_test.go | 72 +++++++++++++------ 3 files changed, 79 insertions(+), 33 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index d0b81179f0f..656a904c89d 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -100,6 +100,10 @@ var ( Columns: []string{"id", "dt"}, PrimaryKeyColumns: []string{"id"}, Fields: sqltypes.MakeTestFields("id|dt", "int64|datetime"), + }, { + Name: "nopk", + Columns: []string{"c1", "c2"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), }, }, } @@ -110,6 +114,7 @@ var ( "multipk": 3, "aggr": 4, "datze": 5, + "nopk": 6, } ) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go index 21efb097169..d3fdc9c0aec 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go @@ -23,15 +23,15 @@ import ( "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" - tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/log" - querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/engine/opcode" + + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const sqlSelectColumnCollations = "select column_name as column_name, collation_name as collation_name from information_schema.columns where table_schema=%a and table_name=%a and column_name in %a" @@ -75,7 +75,7 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str sourceSelect := &sqlparser.Select{} targetSelect := &sqlparser.Select{} - // aggregates is the list of Aggregate functions, if any. + // Aggregates is the list of Aggregate functions, if any. var aggregates []*engine.AggregateParams for _, selExpr := range sel.SelectExprs { switch selExpr := selExpr.(type) { @@ -152,10 +152,22 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str }, } - err = tp.findPKs(dbClient, targetSelect) - if err != nil { - return nil, err + if len(tp.table.PrimaryKeyColumns) == 0 { // Then we need to ORDER BY every column in the table. + orderByAll := make(sqlparser.OrderBy, len(tp.compareCols)) + for i := range tp.compareCols { + orderByAll[i] = &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI(tp.compareCols[i].colName)}, + Direction: sqlparser.AscOrder, + } + } + tp.orderBy = orderByAll + } else { + err = tp.findPKs(dbClient, targetSelect) + if err != nil { + return nil, err + } } + // Remove in_keyrange. It's not understood by mysql. sourceSelect.Where = sel.Where // removeKeyrange(sel.Where) // The source should also perform the group by. @@ -177,6 +189,9 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str // findPKs identifies PKs and removes them from the columns to do data comparison. func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlparser.Select) error { + if len(tp.table.PrimaryKeyColumns) == 0 { + return nil + } var orderby sqlparser.OrderBy for _, pk := range tp.table.PrimaryKeyColumns { found := false @@ -195,7 +210,7 @@ func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlpa tp.compareCols[i].isPK = true tp.comparePKs = append(tp.comparePKs, tp.compareCols[i]) tp.selectPks = append(tp.selectPks, i) - // We'll be comparing pks separately. So, remove them from compareCols. + // We'll be comparing PKs separately. So, remove them from compareCols. tp.pkCols = append(tp.pkCols, i) found = true break diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go index 10c6406f046..1c7b2e833b8 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go @@ -157,7 +157,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // non-pk text column. + // Non-PK text column. input: &binlogdatapb.Rule{ Match: "nonpktext", Filter: "select c1, textcol from nonpktext", @@ -178,7 +178,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // non-pk text column, different order. + // Non-PK text column, different order. input: &binlogdatapb.Rule{ Match: "nonpktext", Filter: "select textcol, c1 from nonpktext", @@ -199,7 +199,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // pk text column. + // PK text column. input: &binlogdatapb.Rule{ Match: "pktext", Filter: "select textcol, c2 from pktext", @@ -220,7 +220,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // pk text column, different order. + // PK text column, different order. input: &binlogdatapb.Rule{ Match: "pktext", Filter: "select c2, textcol from pktext", @@ -241,7 +241,31 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // text column as expression. + // No PK. + input: &binlogdatapb.Rule{ + Match: "nopk", + Filter: "select c1, c2 from nopk", + }, + table: "nopk", + tablePlan: &tablePlan{ + dbName: vdiffDBName, + table: testSchema.TableDefinitions[tableDefMap["nopk"]], + sourceQuery: "select c1, c2 from nopk order by c1 asc, c2 asc", + targetQuery: "select c1, c2 from nopk order by c1 asc, c2 asc", + compareCols: []compareColInfo{{0, collations.Local().LookupByName(sqltypes.NULL.String()), false, "c1"}, {1, collations.Local().LookupByName(sqltypes.NULL.String()), false, "c2"}}, + orderBy: sqlparser.OrderBy{ + &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI("c1")}, + Direction: sqlparser.AscOrder, + }, + &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI("c2")}, + Direction: sqlparser.AscOrder, + }, + }, + }, + }, { + // Text column as expression. input: &binlogdatapb.Rule{ Match: "pktext", Filter: "select c2, a+b as textcol from pktext", @@ -262,7 +286,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // multiple pk columns. + // Multiple PK columns. input: &binlogdatapb.Rule{ Match: "multipk", }, @@ -397,7 +421,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // group by + // Group by. input: &binlogdatapb.Rule{ Match: "t1", Filter: "select * from t1 group by c1", @@ -418,7 +442,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // aggregations + // Aggregations. input: &binlogdatapb.Rule{ Match: "aggr", Filter: "select c1, c2, count(*) as c3, sum(c4) as c4 from t1 group by c1", @@ -443,7 +467,7 @@ func TestBuildPlanSuccess(t *testing.T) { }, }, }, { - // date conversion on import. + // Date conversion on import. input: &binlogdatapb.Rule{ Match: "datze", }, @@ -492,20 +516,22 @@ func TestBuildPlanSuccess(t *testing.T) { collationList[i] = sqltypes.NULL.String() } } - columnBV, err := sqltypes.BuildBindVariable(columnList) - require.NoError(t, err) - query, err := sqlparser.ParseAndBind(sqlSelectColumnCollations, - sqltypes.StringBindVariable(vdiffDBName), - sqltypes.StringBindVariable(tcase.tablePlan.table.Name), - columnBV, - ) - require.NoError(t, err) - dbc.ExpectRequest(query, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "collation_name", - "varchar", - ), - collationList..., - ), nil) + if len(columnList) > 0 { + columnBV, err := sqltypes.BuildBindVariable(columnList) + require.NoError(t, err) + query, err := sqlparser.ParseAndBind(sqlSelectColumnCollations, + sqltypes.StringBindVariable(vdiffDBName), + sqltypes.StringBindVariable(tcase.tablePlan.table.Name), + columnBV, + ) + require.NoError(t, err) + dbc.ExpectRequest(query, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "collation_name", + "varchar", + ), + collationList..., + ), nil) + } err = wd.buildPlan(dbc, filter, testSchema) require.NoError(t, err, tcase.input) require.Equal(t, 1, len(wd.tableDiffers), tcase.input) From c44ba67eb5189015634cf29d6f4c7b160dfe9920 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 16 Dec 2023 00:44:01 -0500 Subject: [PATCH 3/5] Add e2e test Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/config_test.go | 10 +++++ go/test/endtoend/vreplication/vdiff2_test.go | 10 +++-- .../tabletmanager/vdiff/framework_test.go | 4 +- .../tabletmanager/vdiff/table_plan.go | 22 ++++------- .../vdiff/workflow_differ_test.go | 39 +++++++++++-------- 5 files changed, 50 insertions(+), 35 deletions(-) diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 0e430548a13..62f20f36e80 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -60,6 +60,7 @@ create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, m create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id)); create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); create table loadtest (id int, name varchar(256), primary key(id), key(name)); +create table nopk (name varchar(128), age int unsigned); ` // These should always be ignored in vreplication internalSchema = ` @@ -94,6 +95,7 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name)); "db_order_test": {}, "vdiff_order": {}, "datze": {}, + "nopk": {}, "reftable": { "type": "reference" } @@ -216,6 +218,14 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name)); } ] }, + "nopk": { + "column_vindexes": [ + { + "columns": ["name"], + "name": "unicode_loose_md5" + } + ] + }, "reftable": { "type": "reference" } diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index b4753750871..d7b8cb6a47e 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -70,7 +70,7 @@ var testCases = []*testCase{ sourceShards: "0", targetShards: "-80,80-", tabletBaseID: 200, - tables: "customer,Lead,Lead-1", + tables: "customer,Lead,Lead-1,nopk", autoRetryError: true, retryInsert: `insert into customer(cid, name, typ) values(1991234, 'Testy McTester', 'soho')`, resume: true, @@ -117,7 +117,7 @@ func TestVDiff2(t *testing.T) { sourceShards := []string{"0"} targetKs := "customer" targetShards := []string{"-80", "80-"} - // This forces us to use multiple vstream packets even with small test tables + // This forces us to use multiple vstream packets even with small test tables. extraVTTabletArgs = []string{"--vstream_packet_size=1"} vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig) @@ -150,7 +150,11 @@ func TestVDiff2(t *testing.T) { query := `insert into customer(cid, name, typ, sport) values(1001, null, 'soho','')` execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:%s", sourceKs, sourceShards[0]), query) - generateMoreCustomers(t, sourceKs, 100) + generateMoreCustomers(t, sourceKs, 1000) + + // Create rows in the nopk table using the customer names and random ages between 20 and 100. + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.nopk(name, age) select name, floor(rand()*80)+20 from %s.customer", sourceKs, sourceKs), -1, false) + require.NoError(t, err, "failed to insert rows into nopk table: %v", err) // The primary tablet is only added in the first cell. // We ONLY add primary tablets in this test. diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index 656a904c89d..08018099573 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -102,8 +102,8 @@ var ( Fields: sqltypes.MakeTestFields("id|dt", "int64|datetime"), }, { Name: "nopk", - Columns: []string{"c1", "c2"}, - Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + Columns: []string{"c1", "c2", "c3"}, + Fields: sqltypes.MakeTestFields("c1|c2|c3", "int64|int64|int64"), }, }, } diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go index d3fdc9c0aec..ca3296f79b7 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go @@ -152,20 +152,14 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str }, } - if len(tp.table.PrimaryKeyColumns) == 0 { // Then we need to ORDER BY every column in the table. - orderByAll := make(sqlparser.OrderBy, len(tp.compareCols)) - for i := range tp.compareCols { - orderByAll[i] = &sqlparser.Order{ - Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI(tp.compareCols[i].colName)}, - Direction: sqlparser.AscOrder, - } - } - tp.orderBy = orderByAll - } else { - err = tp.findPKs(dbClient, targetSelect) - if err != nil { - return nil, err - } + if len(tp.table.PrimaryKeyColumns) == 0 { + // We use every column together as a substitute PK. + tp.table.PrimaryKeyColumns = append(tp.table.PrimaryKeyColumns, tp.table.Columns...) + } + + err = tp.findPKs(dbClient, targetSelect) + if err != nil { + return nil, err } // Remove in_keyrange. It's not understood by mysql. diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go index 1c7b2e833b8..f940ccfc364 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go @@ -241,18 +241,21 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // No PK. + // No PK. Use all columns as a substitute. input: &binlogdatapb.Rule{ Match: "nopk", - Filter: "select c1, c2 from nopk", + Filter: "select * from nopk", }, table: "nopk", tablePlan: &tablePlan{ dbName: vdiffDBName, table: testSchema.TableDefinitions[tableDefMap["nopk"]], - sourceQuery: "select c1, c2 from nopk order by c1 asc, c2 asc", - targetQuery: "select c1, c2 from nopk order by c1 asc, c2 asc", - compareCols: []compareColInfo{{0, collations.Local().LookupByName(sqltypes.NULL.String()), false, "c1"}, {1, collations.Local().LookupByName(sqltypes.NULL.String()), false, "c2"}}, + sourceQuery: "select c1, c2, c3 from nopk order by c1 asc, c2 asc, c3 asc", + targetQuery: "select c1, c2, c3 from nopk order by c1 asc, c2 asc, c3 asc", + compareCols: []compareColInfo{{0, collations.Local().LookupByName(sqltypes.NULL.String()), true, "c1"}, {1, collations.Local().LookupByName(sqltypes.NULL.String()), true, "c2"}, {2, collations.Local().LookupByName(sqltypes.NULL.String()), true, "c3"}}, + comparePKs: []compareColInfo{{0, collations.Local().LookupByName(sqltypes.NULL.String()), true, "c1"}, {1, collations.Local().LookupByName(sqltypes.NULL.String()), true, "c2"}, {2, collations.Local().LookupByName(sqltypes.NULL.String()), true, "c3"}}, + pkCols: []int{0, 1, 2}, + selectPks: []int{0, 1, 2}, orderBy: sqlparser.OrderBy{ &sqlparser.Order{ Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI("c1")}, @@ -262,6 +265,10 @@ func TestBuildPlanSuccess(t *testing.T) { Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI("c2")}, Direction: sqlparser.AscOrder, }, + &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI("c3")}, + Direction: sqlparser.AscOrder, + }, }, }, }, { @@ -505,18 +512,18 @@ func TestBuildPlanSuccess(t *testing.T) { wd, err := newWorkflowDiffer(ct, vdiffenv.opts) require.NoError(t, err) dbc.ExpectRequestRE("select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report", noResults, nil) - columnList := make([]string, len(tcase.tablePlan.comparePKs)) - collationList := make([]string, len(tcase.tablePlan.comparePKs)) - env := collations.Local() - for i := range tcase.tablePlan.comparePKs { - columnList[i] = tcase.tablePlan.comparePKs[i].colName - if tcase.tablePlan.comparePKs[i].collation != collations.Unknown { - collationList[i] = env.LookupName(tcase.tablePlan.comparePKs[i].collation) - } else { - collationList[i] = sqltypes.NULL.String() + if len(tcase.tablePlan.comparePKs) > 0 { + columnList := make([]string, len(tcase.tablePlan.comparePKs)) + collationList := make([]string, len(tcase.tablePlan.comparePKs)) + env := collations.Local() + for i := range tcase.tablePlan.comparePKs { + columnList[i] = tcase.tablePlan.comparePKs[i].colName + if tcase.tablePlan.comparePKs[i].collation != collations.Unknown { + collationList[i] = env.LookupName(tcase.tablePlan.comparePKs[i].collation) + } else { + collationList[i] = sqltypes.NULL.String() + } } - } - if len(columnList) > 0 { columnBV, err := sqltypes.BuildBindVariable(columnList) require.NoError(t, err) query, err := sqlparser.ParseAndBind(sqlSelectColumnCollations, From 9c252d546e330e993db2f89756d0a1b8b4c5594b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 16 Dec 2023 14:02:38 -0500 Subject: [PATCH 4/5] Add same PKE support that vreplication has Signed-off-by: Matt Lord --- go/vt/mysqlctl/mysql_daemon.go | 1 - go/vt/mysqlctl/schema.go | 10 ++---- .../tabletmanager/vdiff/framework_test.go | 19 ++++++---- .../tabletmanager/vdiff/table_plan.go | 29 +++++++++++++-- .../vdiff/workflow_differ_test.go | 36 +++++++++++++++++++ .../tabletmanager/vreplication/vreplicator.go | 6 +++- .../vreplication/vreplicator_test.go | 5 ++- .../vttablet/tabletserver/vstreamer/engine.go | 10 ++++-- .../tabletserver/vstreamer/rowstreamer.go | 2 +- 9 files changed, 94 insertions(+), 24 deletions(-) diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index 66454e8b8a8..9e8baebefd6 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -94,7 +94,6 @@ type MysqlDaemon interface { GetSchema(ctx context.Context, dbName string, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error) GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error) - GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error) PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) diff --git a/go/vt/mysqlctl/schema.go b/go/vt/mysqlctl/schema.go index 6f1c7c19570..f3325827ab9 100644 --- a/go/vt/mysqlctl/schema.go +++ b/go/vt/mysqlctl/schema.go @@ -579,13 +579,7 @@ func (mysqld *Mysqld) ApplySchemaChange(ctx context.Context, dbName string, chan // defined PRIMARY KEY then it may return the columns for // that index if it is likely the most efficient one amongst // the available PKE indexes on the table. -func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error) { - conn, err := getPoolReconnect(ctx, mysqld.dbaPool) - if err != nil { - return nil, "", err - } - defer conn.Recycle() - +func GetPrimaryKeyEquivalentColumns(ctx context.Context, exec func(string, int, bool) (*sqltypes.Result, error), dbName, table string) ([]string, string, error) { // We use column name aliases to guarantee lower case for our named results. sql := ` SELECT index_cols.COLUMN_NAME AS column_name, index_cols.INDEX_NAME as index_name FROM information_schema.STATISTICS AS index_cols INNER JOIN @@ -629,7 +623,7 @@ func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName encodedDbName := encodeEntityName(dbName) encodedTable := encodeEntityName(table) sql = fmt.Sprintf(sql, encodedDbName, encodedTable, encodedDbName, encodedTable, encodedDbName, encodedTable) - qr, err := conn.Conn.ExecuteFetch(sql, 1000, true) + qr, err := exec(sql, 1000, true) if err != nil { return nil, "", err } diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index 08018099573..f7ff1a7c5cc 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -104,17 +104,22 @@ var ( Name: "nopk", Columns: []string{"c1", "c2", "c3"}, Fields: sqltypes.MakeTestFields("c1|c2|c3", "int64|int64|int64"), + }, { + Name: "nopkwithpke", + Columns: []string{"c1", "c2", "c3"}, + Fields: sqltypes.MakeTestFields("c1|c2|c3", "int64|int64|int64"), }, }, } tableDefMap = map[string]int{ - "t1": 0, - "nonpktext": 1, - "pktext": 2, - "multipk": 3, - "aggr": 4, - "datze": 5, - "nopk": 6, + "t1": 0, + "nonpktext": 1, + "pktext": 2, + "multipk": 3, + "aggr": 4, + "datze": 5, + "nopk": 6, + "nopkwithpke": 7, } ) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go index ca3296f79b7..9aca9da2423 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go @@ -17,6 +17,7 @@ limitations under the License. package vdiff import ( + "context" "fmt" "strings" @@ -24,6 +25,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" @@ -153,8 +155,17 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str } if len(tp.table.PrimaryKeyColumns) == 0 { - // We use every column together as a substitute PK. - tp.table.PrimaryKeyColumns = append(tp.table.PrimaryKeyColumns, tp.table.Columns...) + // We use the columns from a PKE if there is one. + pkeCols, err := tp.getPKEquivalentColumns(dbClient) + if err != nil { + return nil, vterrors.Wrapf(err, "error getting PK equivalent columns for table %s", tp.table.Name) + } + if len(pkeCols) > 0 { + tp.table.PrimaryKeyColumns = append(tp.table.PrimaryKeyColumns, pkeCols...) + } else { + // We use every column together as a substitute PK. + tp.table.PrimaryKeyColumns = append(tp.table.PrimaryKeyColumns, tp.table.Columns...) + } } err = tp.findPKs(dbClient, targetSelect) @@ -271,3 +282,17 @@ func (tp *tablePlan) getPKColumnCollations(dbClient binlogplayer.DBClient) error } return nil } + +func (tp *tablePlan) getPKEquivalentColumns(dbClient binlogplayer.DBClient) ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), BackgroundOperationTimeout/2) + defer cancel() + executeFetch := func(query string, maxrows int, wantfields bool) (*sqltypes.Result, error) { + // This sets wantfields to true. + return dbClient.ExecuteFetch(query, maxrows) + } + pkeCols, _, err := mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, executeFetch, tp.dbName, tp.table.Name) + if err != nil { + return nil, err + } + return pkeCols, nil +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go index f940ccfc364..2eb820838f3 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go @@ -271,6 +271,29 @@ func TestBuildPlanSuccess(t *testing.T) { }, }, }, + }, { + // No PK, but a PKE on c3. + input: &binlogdatapb.Rule{ + Match: "nopkwithpke", + Filter: "select * from nopkwithpke", + }, + table: "nopkwithpke", + tablePlan: &tablePlan{ + dbName: vdiffDBName, + table: testSchema.TableDefinitions[tableDefMap["nopkwithpke"]], + sourceQuery: "select c1, c2, c3 from nopkwithpke order by c3 asc", + targetQuery: "select c1, c2, c3 from nopkwithpke order by c3 asc", + compareCols: []compareColInfo{{0, collations.Local().LookupByName(sqltypes.NULL.String()), false, "c1"}, {1, collations.Local().LookupByName(sqltypes.NULL.String()), false, "c2"}, {2, collations.Local().LookupByName(sqltypes.NULL.String()), true, "c3"}}, + comparePKs: []compareColInfo{{2, collations.Local().LookupByName(sqltypes.NULL.String()), true, "c3"}}, + pkCols: []int{2}, + selectPks: []int{2}, + orderBy: sqlparser.OrderBy{ + &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI("c3")}, + Direction: sqlparser.AscOrder, + }, + }, + }, }, { // Text column as expression. input: &binlogdatapb.Rule{ @@ -512,6 +535,19 @@ func TestBuildPlanSuccess(t *testing.T) { wd, err := newWorkflowDiffer(ct, vdiffenv.opts) require.NoError(t, err) dbc.ExpectRequestRE("select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report", noResults, nil) + if len(tcase.tablePlan.table.PrimaryKeyColumns) == 0 { + result := noResults + if tcase.table == "nopkwithpke" { // This has a PKE column: c3 + result = sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "column_name|index_name", + "varchar|varchar", + ), + "c3|c3", + ) + } + dbc.ExpectRequestRE("SELECT index_cols.COLUMN_NAME AS column_name, index_cols.INDEX_NAME as index_name FROM information_schema.STATISTICS", result, nil) + } if len(tcase.tablePlan.comparePKs) > 0 { columnList := make([]string, len(tcase.tablePlan.comparePKs)) collationList := make([]string, len(tcase.tablePlan.comparePKs)) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 9c065866c15..07e1c5e477f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -350,7 +350,11 @@ func (vr *vreplicator) buildColInfoMap(ctx context.Context) (map[string][]*Colum pks = td.PrimaryKeyColumns } else { // Use a PK equivalent if one exists. - if pks, _, err = vr.mysqld.GetPrimaryKeyEquivalentColumns(ctx, vr.dbClient.DBName(), td.Name); err != nil { + executeFetch := func(query string, maxrows int, wantfields bool) (*sqltypes.Result, error) { + // This sets wantfields to true. + return vr.dbClient.ExecuteFetch(query, maxrows) + } + if pks, _, err = mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, executeFetch, vr.dbClient.DBName(), td.Name); err != nil { return nil, err } // Fall back to using every column in the table if there's no PK or PKE. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go index dd4b9dc70f8..7a7449c116c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go @@ -183,7 +183,10 @@ func TestPrimaryKeyEquivalentColumns(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { require.NoError(t, env.Mysqld.ExecuteSuperQuery(ctx, tt.ddl)) - cols, indexName, err := env.Mysqld.GetPrimaryKeyEquivalentColumns(ctx, env.Dbcfgs.DBName, tt.table) + conn, err := env.Mysqld.GetDbaConnection(ctx) + require.NoError(t, err, "could not connect to mysqld: %v", err) + defer conn.Close() + cols, indexName, err := mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, conn.ExecuteFetch, env.Dbcfgs.DBName, tt.table) if (err != nil) != tt.wantErr { t.Errorf("Mysqld.GetPrimaryKeyEquivalentColumns() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 2862601bf1b..5cfd6fac9d2 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -590,9 +590,13 @@ func (vse *Engine) getMySQLEndpoint(ctx context.Context, db dbconfigs.Connector) // mapPKEquivalentCols gets a PK equivalent from mysqld for the table // and maps the column names to field indexes in the MinimalTable struct. -func (vse *Engine) mapPKEquivalentCols(ctx context.Context, table *binlogdatapb.MinimalTable) ([]int, error) { - mysqld := mysqlctl.NewMysqld(vse.env.Config().DB) - pkeColNames, indexName, err := mysqld.GetPrimaryKeyEquivalentColumns(ctx, vse.env.Config().DB.DBName, table.Name) +func (vse *Engine) mapPKEquivalentCols(ctx context.Context, db dbconfigs.Connector, table *binlogdatapb.MinimalTable) ([]int, error) { + conn, err := db.Connect(ctx) + if err != nil { + return nil, err + } + defer conn.Close() + pkeColNames, indexName, err := mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, conn.ExecuteFetch, vse.env.Config().DB.DBName, table.Name) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 99ebbbdfaa5..412255a6027 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -235,7 +235,7 @@ func (rs *rowStreamer) buildPKColumns(st *binlogdatapb.MinimalTable) ([]int, err var pkColumns = make([]int, 0) if len(st.PKColumns) == 0 { // Use a PK equivalent if one exists. - pkColumns, err := rs.vse.mapPKEquivalentCols(rs.ctx, st) + pkColumns, err := rs.vse.mapPKEquivalentCols(rs.ctx, rs.cp, st) if err == nil && len(pkColumns) != 0 { return pkColumns, nil } From a4eb27183ca54ffa43afa8838d27301d5d58212e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 16 Dec 2023 15:40:45 -0500 Subject: [PATCH 5/5] Fix unit tests Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vreplication/framework_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 64a924f28d3..eb16a2e12dc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -567,6 +567,9 @@ func shouldIgnoreQuery(query string) bool { ", component_throttled=", // update of last throttle time, can happen out-of-band, so can't test for it "context cancel", "SELECT rows_copied FROM _vt.vreplication WHERE id=", + // This is only executed if the table has no defined Primary Key, which we don't know in the lower level + // code. + "SELECT index_cols.COLUMN_NAME AS column_name, index_cols.INDEX_NAME as index_name FROM information_schema.STATISTICS", } if sidecardb.MatchesInitQuery(query) { return true