From be7b6706c95ccfa2b455053f06caf2a732c25b8c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 19 Dec 2023 21:20:44 -0500 Subject: [PATCH] VDiff: Support diffing tables without a defined Primary Key (#14794) Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/config_test.go | 10 ++ go/test/endtoend/vreplication/vdiff2_test.go | 10 +- go/vt/mysqlctl/mysql_daemon.go | 1 - go/vt/mysqlctl/schema.go | 10 +- .../tabletmanager/vdiff/framework_test.go | 22 ++- .../tabletmanager/vdiff/table_plan.go | 51 ++++++- .../vdiff/workflow_differ_test.go | 133 +++++++++++++----- .../vreplication/framework_test.go | 3 + .../tabletmanager/vreplication/vreplicator.go | 6 +- .../vreplication/vreplicator_test.go | 5 +- .../vttablet/tabletserver/vstreamer/engine.go | 10 +- .../tabletserver/vstreamer/rowstreamer.go | 2 +- 12 files changed, 200 insertions(+), 63 deletions(-) diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 0e430548a13..62f20f36e80 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -60,6 +60,7 @@ create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, m create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id)); create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); create table loadtest (id int, name varchar(256), primary key(id), key(name)); +create table nopk (name varchar(128), age int unsigned); ` // These should always be ignored in vreplication internalSchema = ` @@ -94,6 +95,7 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name)); "db_order_test": {}, "vdiff_order": {}, "datze": {}, + "nopk": {}, "reftable": { "type": "reference" } @@ -216,6 +218,14 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name)); } ] }, + "nopk": { + "column_vindexes": [ + { + "columns": ["name"], + "name": "unicode_loose_md5" + } + ] + }, "reftable": { "type": "reference" } diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index b4753750871..d7b8cb6a47e 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -70,7 +70,7 @@ var testCases = []*testCase{ sourceShards: "0", targetShards: "-80,80-", tabletBaseID: 200, - tables: "customer,Lead,Lead-1", + tables: "customer,Lead,Lead-1,nopk", autoRetryError: true, retryInsert: `insert into customer(cid, name, typ) values(1991234, 'Testy McTester', 'soho')`, resume: true, @@ -117,7 +117,7 @@ func TestVDiff2(t *testing.T) { sourceShards := []string{"0"} targetKs := "customer" targetShards := []string{"-80", "80-"} - // This forces us to use multiple vstream packets even with small test tables + // This forces us to use multiple vstream packets even with small test tables. extraVTTabletArgs = []string{"--vstream_packet_size=1"} vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig) @@ -150,7 +150,11 @@ func TestVDiff2(t *testing.T) { query := `insert into customer(cid, name, typ, sport) values(1001, null, 'soho','')` execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:%s", sourceKs, sourceShards[0]), query) - generateMoreCustomers(t, sourceKs, 100) + generateMoreCustomers(t, sourceKs, 1000) + + // Create rows in the nopk table using the customer names and random ages between 20 and 100. + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.nopk(name, age) select name, floor(rand()*80)+20 from %s.customer", sourceKs, sourceKs), -1, false) + require.NoError(t, err, "failed to insert rows into nopk table: %v", err) // The primary tablet is only added in the first cell. // We ONLY add primary tablets in this test. diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index 66454e8b8a8..9e8baebefd6 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -94,7 +94,6 @@ type MysqlDaemon interface { GetSchema(ctx context.Context, dbName string, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error) GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error) - GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error) PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) diff --git a/go/vt/mysqlctl/schema.go b/go/vt/mysqlctl/schema.go index 6f1c7c19570..f3325827ab9 100644 --- a/go/vt/mysqlctl/schema.go +++ b/go/vt/mysqlctl/schema.go @@ -579,13 +579,7 @@ func (mysqld *Mysqld) ApplySchemaChange(ctx context.Context, dbName string, chan // defined PRIMARY KEY then it may return the columns for // that index if it is likely the most efficient one amongst // the available PKE indexes on the table. -func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error) { - conn, err := getPoolReconnect(ctx, mysqld.dbaPool) - if err != nil { - return nil, "", err - } - defer conn.Recycle() - +func GetPrimaryKeyEquivalentColumns(ctx context.Context, exec func(string, int, bool) (*sqltypes.Result, error), dbName, table string) ([]string, string, error) { // We use column name aliases to guarantee lower case for our named results. sql := ` SELECT index_cols.COLUMN_NAME AS column_name, index_cols.INDEX_NAME as index_name FROM information_schema.STATISTICS AS index_cols INNER JOIN @@ -629,7 +623,7 @@ func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName encodedDbName := encodeEntityName(dbName) encodedTable := encodeEntityName(table) sql = fmt.Sprintf(sql, encodedDbName, encodedTable, encodedDbName, encodedTable, encodedDbName, encodedTable) - qr, err := conn.Conn.ExecuteFetch(sql, 1000, true) + qr, err := exec(sql, 1000, true) if err != nil { return nil, "", err } diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index a75349817e1..0676c5204be 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -100,16 +100,26 @@ var ( Columns: []string{"id", "dt"}, PrimaryKeyColumns: []string{"id"}, Fields: sqltypes.MakeTestFields("id|dt", "int64|datetime"), + }, { + Name: "nopk", + Columns: []string{"c1", "c2", "c3"}, + Fields: sqltypes.MakeTestFields("c1|c2|c3", "int64|int64|int64"), + }, { + Name: "nopkwithpke", + Columns: []string{"c1", "c2", "c3"}, + Fields: sqltypes.MakeTestFields("c1|c2|c3", "int64|int64|int64"), }, }, } tableDefMap = map[string]int{ - "t1": 0, - "nonpktext": 1, - "pktext": 2, - "multipk": 3, - "aggr": 4, - "datze": 5, + "t1": 0, + "nonpktext": 1, + "pktext": 2, + "multipk": 3, + "aggr": 4, + "datze": 5, + "nopk": 6, + "nopkwithpke": 7, } ) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go index 2211f8fbc45..5e43a8ce29d 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go @@ -17,21 +17,23 @@ limitations under the License. package vdiff import ( + "context" "fmt" "strings" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" - tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/log" - querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/engine/opcode" + + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const sqlSelectColumnCollations = "select column_name as column_name, collation_name as collation_name from information_schema.columns where table_schema=%a and table_name=%a and column_name in %a" @@ -75,7 +77,7 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str sourceSelect := &sqlparser.Select{} targetSelect := &sqlparser.Select{} - // aggregates is the list of Aggregate functions, if any. + // Aggregates is the list of Aggregate functions, if any. var aggregates []*engine.AggregateParams for _, selExpr := range sel.SelectExprs { switch selExpr := selExpr.(type) { @@ -153,10 +155,25 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str }, } + if len(tp.table.PrimaryKeyColumns) == 0 { + // We use the columns from a PKE if there is one. + pkeCols, err := tp.getPKEquivalentColumns(dbClient) + if err != nil { + return nil, vterrors.Wrapf(err, "error getting PK equivalent columns for table %s", tp.table.Name) + } + if len(pkeCols) > 0 { + tp.table.PrimaryKeyColumns = append(tp.table.PrimaryKeyColumns, pkeCols...) + } else { + // We use every column together as a substitute PK. + tp.table.PrimaryKeyColumns = append(tp.table.PrimaryKeyColumns, tp.table.Columns...) + } + } + err = tp.findPKs(dbClient, targetSelect, collationEnv) if err != nil { return nil, err } + // Remove in_keyrange. It's not understood by mysql. sourceSelect.Where = sel.Where // removeKeyrange(sel.Where) // The source should also perform the group by. @@ -178,6 +195,9 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str // findPKs identifies PKs and removes them from the columns to do data comparison. func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlparser.Select, collationEnv *collations.Environment) error { + if len(tp.table.PrimaryKeyColumns) == 0 { + return nil + } var orderby sqlparser.OrderBy for _, pk := range tp.table.PrimaryKeyColumns { found := false @@ -196,7 +216,7 @@ func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlpa tp.compareCols[i].isPK = true tp.comparePKs = append(tp.comparePKs, tp.compareCols[i]) tp.selectPks = append(tp.selectPks, i) - // We'll be comparing pks separately. So, remove them from compareCols. + // We'll be comparing PKs separately. So, remove them from compareCols. tp.pkCols = append(tp.pkCols, i) found = true break @@ -224,6 +244,9 @@ func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlpa // saves the collations in the tablePlan's comparePKs column info // structs for those subsequent operations. func (tp *tablePlan) getPKColumnCollations(dbClient binlogplayer.DBClient, collationEnv *collations.Environment) error { + if len(tp.comparePKs) == 0 { + return nil + } columnList := make([]string, len(tp.comparePKs)) for i := range tp.comparePKs { columnList[i] = tp.comparePKs[i].colName @@ -259,3 +282,17 @@ func (tp *tablePlan) getPKColumnCollations(dbClient binlogplayer.DBClient, colla } return nil } + +func (tp *tablePlan) getPKEquivalentColumns(dbClient binlogplayer.DBClient) ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), BackgroundOperationTimeout/2) + defer cancel() + executeFetch := func(query string, maxrows int, wantfields bool) (*sqltypes.Result, error) { + // This sets wantfields to true. + return dbClient.ExecuteFetch(query, maxrows) + } + pkeCols, _, err := mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, executeFetch, tp.dbName, tp.table.Name) + if err != nil { + return nil, err + } + return pkeCols, nil +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go index 29c019750f1..a460b87a4f6 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go @@ -157,7 +157,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // non-pk text column. + // Non-PK text column. input: &binlogdatapb.Rule{ Match: "nonpktext", Filter: "select c1, textcol from nonpktext", @@ -178,7 +178,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // non-pk text column, different order. + // Non-PK text column, different order. input: &binlogdatapb.Rule{ Match: "nonpktext", Filter: "select textcol, c1 from nonpktext", @@ -199,7 +199,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // pk text column. + // PK text column. input: &binlogdatapb.Rule{ Match: "pktext", Filter: "select textcol, c2 from pktext", @@ -220,7 +220,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // pk text column, different order. + // PK text column, different order. input: &binlogdatapb.Rule{ Match: "pktext", Filter: "select c2, textcol from pktext", @@ -241,7 +241,61 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // text column as expression. + // No PK. Use all columns as a substitute. + input: &binlogdatapb.Rule{ + Match: "nopk", + Filter: "select * from nopk", + }, + table: "nopk", + tablePlan: &tablePlan{ + dbName: vdiffDBName, + table: testSchema.TableDefinitions[tableDefMap["nopk"]], + sourceQuery: "select c1, c2, c3 from nopk order by c1 asc, c2 asc, c3 asc", + targetQuery: "select c1, c2, c3 from nopk order by c1 asc, c2 asc, c3 asc", + compareCols: []compareColInfo{{0, collations.MySQL8().LookupByName(sqltypes.NULL.String()), true, "c1"}, {1, collations.MySQL8().LookupByName(sqltypes.NULL.String()), true, "c2"}, {2, collations.MySQL8().LookupByName(sqltypes.NULL.String()), true, "c3"}}, + comparePKs: []compareColInfo{{0, collations.MySQL8().LookupByName(sqltypes.NULL.String()), true, "c1"}, {1, collations.MySQL8().LookupByName(sqltypes.NULL.String()), true, "c2"}, {2, collations.MySQL8().LookupByName(sqltypes.NULL.String()), true, "c3"}}, + pkCols: []int{0, 1, 2}, + selectPks: []int{0, 1, 2}, + orderBy: sqlparser.OrderBy{ + &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI("c1")}, + Direction: sqlparser.AscOrder, + }, + &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI("c2")}, + Direction: sqlparser.AscOrder, + }, + &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI("c3")}, + Direction: sqlparser.AscOrder, + }, + }, + }, + }, { + // No PK, but a PKE on c3. + input: &binlogdatapb.Rule{ + Match: "nopkwithpke", + Filter: "select * from nopkwithpke", + }, + table: "nopkwithpke", + tablePlan: &tablePlan{ + dbName: vdiffDBName, + table: testSchema.TableDefinitions[tableDefMap["nopkwithpke"]], + sourceQuery: "select c1, c2, c3 from nopkwithpke order by c3 asc", + targetQuery: "select c1, c2, c3 from nopkwithpke order by c3 asc", + compareCols: []compareColInfo{{0, collations.MySQL8().LookupByName(sqltypes.NULL.String()), false, "c1"}, {1, collations.MySQL8().LookupByName(sqltypes.NULL.String()), false, "c2"}, {2, collations.MySQL8().LookupByName(sqltypes.NULL.String()), true, "c3"}}, + comparePKs: []compareColInfo{{2, collations.MySQL8().LookupByName(sqltypes.NULL.String()), true, "c3"}}, + pkCols: []int{2}, + selectPks: []int{2}, + orderBy: sqlparser.OrderBy{ + &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI("c3")}, + Direction: sqlparser.AscOrder, + }, + }, + }, + }, { + // Text column as expression. input: &binlogdatapb.Rule{ Match: "pktext", Filter: "select c2, a+b as textcol from pktext", @@ -262,7 +316,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // multiple pk columns. + // Multiple PK columns. input: &binlogdatapb.Rule{ Match: "multipk", }, @@ -397,7 +451,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // group by + // Group by. input: &binlogdatapb.Rule{ Match: "t1", Filter: "select * from t1 group by c1", @@ -418,7 +472,7 @@ func TestBuildPlanSuccess(t *testing.T) { }}, }, }, { - // aggregations + // Aggregations. input: &binlogdatapb.Rule{ Match: "aggr", Filter: "select c1, c2, count(*) as c3, sum(c4) as c4 from t1 group by c1", @@ -443,7 +497,7 @@ func TestBuildPlanSuccess(t *testing.T) { }, }, }, { - // date conversion on import. + // Date conversion on import. input: &binlogdatapb.Rule{ Match: "datze", }, @@ -481,31 +535,46 @@ func TestBuildPlanSuccess(t *testing.T) { wd, err := newWorkflowDiffer(ct, vdiffenv.opts, collations.MySQL8()) require.NoError(t, err) dbc.ExpectRequestRE("select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report", noResults, nil) - columnList := make([]string, len(tcase.tablePlan.comparePKs)) - collationList := make([]string, len(tcase.tablePlan.comparePKs)) - env := collations.MySQL8() - for i := range tcase.tablePlan.comparePKs { - columnList[i] = tcase.tablePlan.comparePKs[i].colName - if tcase.tablePlan.comparePKs[i].collation != collations.Unknown { - collationList[i] = env.LookupName(tcase.tablePlan.comparePKs[i].collation) - } else { - collationList[i] = sqltypes.NULL.String() + if len(tcase.tablePlan.table.PrimaryKeyColumns) == 0 { + result := noResults + if tcase.table == "nopkwithpke" { // This has a PKE column: c3 + result = sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "column_name|index_name", + "varchar|varchar", + ), + "c3|c3", + ) } + dbc.ExpectRequestRE("SELECT index_cols.COLUMN_NAME AS column_name, index_cols.INDEX_NAME as index_name FROM information_schema.STATISTICS", result, nil) + } + if len(tcase.tablePlan.comparePKs) > 0 { + columnList := make([]string, len(tcase.tablePlan.comparePKs)) + collationList := make([]string, len(tcase.tablePlan.comparePKs)) + env := collations.MySQL8() + for i := range tcase.tablePlan.comparePKs { + columnList[i] = tcase.tablePlan.comparePKs[i].colName + if tcase.tablePlan.comparePKs[i].collation != collations.Unknown { + collationList[i] = env.LookupName(tcase.tablePlan.comparePKs[i].collation) + } else { + collationList[i] = sqltypes.NULL.String() + } + } + columnBV, err := sqltypes.BuildBindVariable(columnList) + require.NoError(t, err) + query, err := sqlparser.ParseAndBind(sqlSelectColumnCollations, + sqltypes.StringBindVariable(vdiffDBName), + sqltypes.StringBindVariable(tcase.tablePlan.table.Name), + columnBV, + ) + require.NoError(t, err) + dbc.ExpectRequest(query, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "collation_name", + "varchar", + ), + collationList..., + ), nil) } - columnBV, err := sqltypes.BuildBindVariable(columnList) - require.NoError(t, err) - query, err := sqlparser.ParseAndBind(sqlSelectColumnCollations, - sqltypes.StringBindVariable(vdiffDBName), - sqltypes.StringBindVariable(tcase.tablePlan.table.Name), - columnBV, - ) - require.NoError(t, err) - dbc.ExpectRequest(query, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "collation_name", - "varchar", - ), - collationList..., - ), nil) err = wd.buildPlan(dbc, filter, testSchema) require.NoError(t, err, tcase.input) require.Equal(t, 1, len(wd.tableDiffers), tcase.input) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 3d811c65914..6f0a9a5c6b5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -568,6 +568,9 @@ func shouldIgnoreQuery(query string) bool { ", component_throttled=", // update of last throttle time, can happen out-of-band, so can't test for it "context cancel", "SELECT rows_copied FROM _vt.vreplication WHERE id=", + // This is only executed if the table has no defined Primary Key, which we don't know in the lower level + // code. + "SELECT index_cols.COLUMN_NAME AS column_name, index_cols.INDEX_NAME as index_name FROM information_schema.STATISTICS", } if sidecardb.MatchesInitQuery(query) { return true diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 20d2ab4a59d..575b398c3df 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -350,7 +350,11 @@ func (vr *vreplicator) buildColInfoMap(ctx context.Context) (map[string][]*Colum pks = td.PrimaryKeyColumns } else { // Use a PK equivalent if one exists. - if pks, _, err = vr.mysqld.GetPrimaryKeyEquivalentColumns(ctx, vr.dbClient.DBName(), td.Name); err != nil { + executeFetch := func(query string, maxrows int, wantfields bool) (*sqltypes.Result, error) { + // This sets wantfields to true. + return vr.dbClient.ExecuteFetch(query, maxrows) + } + if pks, _, err = mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, executeFetch, vr.dbClient.DBName(), td.Name); err != nil { return nil, err } // Fall back to using every column in the table if there's no PK or PKE. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go index 3a5c0578661..c38402dfd22 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go @@ -184,7 +184,10 @@ func TestPrimaryKeyEquivalentColumns(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { require.NoError(t, env.Mysqld.ExecuteSuperQuery(ctx, tt.ddl)) - cols, indexName, err := env.Mysqld.GetPrimaryKeyEquivalentColumns(ctx, env.Dbcfgs.DBName, tt.table) + conn, err := env.Mysqld.GetDbaConnection(ctx) + require.NoError(t, err, "could not connect to mysqld: %v", err) + defer conn.Close() + cols, indexName, err := mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, conn.ExecuteFetch, env.Dbcfgs.DBName, tt.table) if (err != nil) != tt.wantErr { t.Errorf("Mysqld.GetPrimaryKeyEquivalentColumns() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index d81ee7c2ce8..977d0453513 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -590,9 +590,13 @@ func (vse *Engine) getMySQLEndpoint(ctx context.Context, db dbconfigs.Connector) // mapPKEquivalentCols gets a PK equivalent from mysqld for the table // and maps the column names to field indexes in the MinimalTable struct. -func (vse *Engine) mapPKEquivalentCols(ctx context.Context, table *binlogdatapb.MinimalTable) ([]int, error) { - mysqld := mysqlctl.NewMysqld(vse.env.Config().DB) - pkeColNames, indexName, err := mysqld.GetPrimaryKeyEquivalentColumns(ctx, vse.env.Config().DB.DBName, table.Name) +func (vse *Engine) mapPKEquivalentCols(ctx context.Context, db dbconfigs.Connector, table *binlogdatapb.MinimalTable) ([]int, error) { + conn, err := db.Connect(ctx) + if err != nil { + return nil, err + } + defer conn.Close() + pkeColNames, indexName, err := mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, conn.ExecuteFetch, vse.env.Config().DB.DBName, table.Name) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 395e152dfb0..b287aa08287 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -223,7 +223,7 @@ func (rs *rowStreamer) buildPKColumns(st *binlogdatapb.MinimalTable) ([]int, err var pkColumns = make([]int, 0) if len(st.PKColumns) == 0 { // Use a PK equivalent if one exists. - pkColumns, err := rs.vse.mapPKEquivalentCols(rs.ctx, st) + pkColumns, err := rs.vse.mapPKEquivalentCols(rs.ctx, rs.cp, st) if err == nil && len(pkColumns) != 0 { return pkColumns, nil }