Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VDiff: Support diffing tables without a defined Primary Key #14794

Merged
merged 6 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, m
create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id));
create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
create table loadtest (id int, name varchar(256), primary key(id), key(name));
create table nopk (name varchar(128), age int unsigned);
`
// These should always be ignored in vreplication
internalSchema = `
Expand Down Expand Up @@ -94,6 +95,7 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name));
"db_order_test": {},
"vdiff_order": {},
"datze": {},
"nopk": {},
"reftable": {
"type": "reference"
}
Expand Down Expand Up @@ -216,6 +218,14 @@ create table loadtest (id int, name varchar(256), primary key(id), key(name));
}
]
},
"nopk": {
"column_vindexes": [
{
"columns": ["name"],
"name": "unicode_loose_md5"
}
]
},
"reftable": {
"type": "reference"
}
Expand Down
10 changes: 7 additions & 3 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var testCases = []*testCase{
sourceShards: "0",
targetShards: "-80,80-",
tabletBaseID: 200,
tables: "customer,Lead,Lead-1",
tables: "customer,Lead,Lead-1,nopk",
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(1991234, 'Testy McTester', 'soho')`,
resume: true,
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestVDiff2(t *testing.T) {
sourceShards := []string{"0"}
targetKs := "customer"
targetShards := []string{"-80", "80-"}
// This forces us to use multiple vstream packets even with small test tables
// This forces us to use multiple vstream packets even with small test tables.
extraVTTabletArgs = []string{"--vstream_packet_size=1"}

vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig)
Expand Down Expand Up @@ -150,7 +150,11 @@ func TestVDiff2(t *testing.T) {
query := `insert into customer(cid, name, typ, sport) values(1001, null, 'soho','')`
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:%s", sourceKs, sourceShards[0]), query)

generateMoreCustomers(t, sourceKs, 100)
generateMoreCustomers(t, sourceKs, 1000)

// Create rows in the nopk table using the customer names and random ages between 20 and 100.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.nopk(name, age) select name, floor(rand()*80)+20 from %s.customer", sourceKs, sourceKs), -1, false)
require.NoError(t, err, "failed to insert rows into nopk table: %v", err)

// The primary tablet is only added in the first cell.
// We ONLY add primary tablets in this test.
Expand Down
1 change: 0 additions & 1 deletion go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ type MysqlDaemon interface {
GetSchema(ctx context.Context, dbName string, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error)
GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error)
GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error)
GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error)
PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error)
ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error)

Expand Down
10 changes: 2 additions & 8 deletions go/vt/mysqlctl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,13 +579,7 @@ func (mysqld *Mysqld) ApplySchemaChange(ctx context.Context, dbName string, chan
// defined PRIMARY KEY then it may return the columns for
// that index if it is likely the most efficient one amongst
// the available PKE indexes on the table.
func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error) {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did the functionality change for this PR require a change in how we acquire a connection here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I first created this I had a mysqlctl.Mysqld instance readily available in the callsite. In working on this, I realized that I should have changed it when using it in vstreamer as I created a new instance there, which is heavy as it creates conn pools etc (and I wasn't properly calling close in a defer 🤦‍♂️), so I changed it here to use a callback to talk to the DB instead. It's much lighter and is far better (and is an already established pattern in the mysqlctl package).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we adopt this pattern, then we will find ourselves passing exec functions in a large amount of functions - not saying it's wrong - just thinking of the code/signature impact.

Copy link
Contributor Author

@mattlord mattlord Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already a pattern in that file/package and elsewhere. I agree that it's not one which should be used w/o good reason.

if err != nil {
return nil, "", err
}
defer conn.Recycle()

func GetPrimaryKeyEquivalentColumns(ctx context.Context, exec func(string, int, bool) (*sqltypes.Result, error), dbName, table string) ([]string, string, error) {
// We use column name aliases to guarantee lower case for our named results.
sql := `
SELECT index_cols.COLUMN_NAME AS column_name, index_cols.INDEX_NAME as index_name FROM information_schema.STATISTICS AS index_cols INNER JOIN
Expand Down Expand Up @@ -629,7 +623,7 @@ func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName
encodedDbName := encodeEntityName(dbName)
encodedTable := encodeEntityName(table)
sql = fmt.Sprintf(sql, encodedDbName, encodedTable, encodedDbName, encodedTable, encodedDbName, encodedTable)
qr, err := conn.Conn.ExecuteFetch(sql, 1000, true)
qr, err := exec(sql, 1000, true)
if err != nil {
return nil, "", err
}
Expand Down
22 changes: 16 additions & 6 deletions go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,26 @@ var (
Columns: []string{"id", "dt"},
PrimaryKeyColumns: []string{"id"},
Fields: sqltypes.MakeTestFields("id|dt", "int64|datetime"),
}, {
Name: "nopk",
Columns: []string{"c1", "c2", "c3"},
Fields: sqltypes.MakeTestFields("c1|c2|c3", "int64|int64|int64"),
}, {
Name: "nopkwithpke",
Columns: []string{"c1", "c2", "c3"},
Fields: sqltypes.MakeTestFields("c1|c2|c3", "int64|int64|int64"),
},
},
}
tableDefMap = map[string]int{
"t1": 0,
"nonpktext": 1,
"pktext": 2,
"multipk": 3,
"aggr": 4,
"datze": 5,
"t1": 0,
"nonpktext": 1,
"pktext": 2,
"multipk": 3,
"aggr": 4,
"datze": 5,
"nopk": 6,
"nopkwithpke": 7,
}
)

Expand Down
51 changes: 44 additions & 7 deletions go/vt/vttablet/tabletmanager/vdiff/table_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@ limitations under the License.
package vdiff

import (
"context"
"fmt"
"strings"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/engine/opcode"

querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const sqlSelectColumnCollations = "select column_name as column_name, collation_name as collation_name from information_schema.columns where table_schema=%a and table_name=%a and column_name in %a"
Expand Down Expand Up @@ -75,7 +77,7 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str

sourceSelect := &sqlparser.Select{}
targetSelect := &sqlparser.Select{}
// aggregates is the list of Aggregate functions, if any.
// Aggregates is the list of Aggregate functions, if any.
var aggregates []*engine.AggregateParams
for _, selExpr := range sel.SelectExprs {
switch selExpr := selExpr.(type) {
Expand Down Expand Up @@ -153,10 +155,25 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str
},
}

if len(tp.table.PrimaryKeyColumns) == 0 {
// We use the columns from a PKE if there is one.
pkeCols, err := tp.getPKEquivalentColumns(dbClient)
if err != nil {
return nil, vterrors.Wrapf(err, "error getting PK equivalent columns for table %s", tp.table.Name)
}
if len(pkeCols) > 0 {
tp.table.PrimaryKeyColumns = append(tp.table.PrimaryKeyColumns, pkeCols...)
} else {
// We use every column together as a substitute PK.
tp.table.PrimaryKeyColumns = append(tp.table.PrimaryKeyColumns, tp.table.Columns...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to allow full scan in VDiff? Should we instead say this table isn't supported? I'm imagining a VDiff running over weeks - as a user of VDiff I'd not want it to run so long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, yes. We allow it in VReplication as well. This is up to the user -- it's not uncommon to have small tables w/o a primary key or PKE. In some cases all queries against the table will be a full scan. It's not our call to say that you shouldn't be able to move those tables and diff them after doing so.

}
}

err = tp.findPKs(dbClient, targetSelect, collationEnv)
if err != nil {
return nil, err
}

// Remove in_keyrange. It's not understood by mysql.
sourceSelect.Where = sel.Where // removeKeyrange(sel.Where)
// The source should also perform the group by.
Expand All @@ -178,6 +195,9 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str

// findPKs identifies PKs and removes them from the columns to do data comparison.
func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlparser.Select, collationEnv *collations.Environment) error {
if len(tp.table.PrimaryKeyColumns) == 0 {
return nil
}
var orderby sqlparser.OrderBy
for _, pk := range tp.table.PrimaryKeyColumns {
found := false
Expand All @@ -196,7 +216,7 @@ func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlpa
tp.compareCols[i].isPK = true
tp.comparePKs = append(tp.comparePKs, tp.compareCols[i])
tp.selectPks = append(tp.selectPks, i)
// We'll be comparing pks separately. So, remove them from compareCols.
// We'll be comparing PKs separately. So, remove them from compareCols.
tp.pkCols = append(tp.pkCols, i)
found = true
break
Expand Down Expand Up @@ -224,6 +244,9 @@ func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlpa
// saves the collations in the tablePlan's comparePKs column info
// structs for those subsequent operations.
func (tp *tablePlan) getPKColumnCollations(dbClient binlogplayer.DBClient, collationEnv *collations.Environment) error {
if len(tp.comparePKs) == 0 {
return nil
}
columnList := make([]string, len(tp.comparePKs))
for i := range tp.comparePKs {
columnList[i] = tp.comparePKs[i].colName
Expand Down Expand Up @@ -259,3 +282,17 @@ func (tp *tablePlan) getPKColumnCollations(dbClient binlogplayer.DBClient, colla
}
return nil
}

func (tp *tablePlan) getPKEquivalentColumns(dbClient binlogplayer.DBClient) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), BackgroundOperationTimeout/2)
defer cancel()
executeFetch := func(query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
// This sets wantfields to true.
return dbClient.ExecuteFetch(query, maxrows)
}
pkeCols, _, err := mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, executeFetch, tp.dbName, tp.table.Name)
if err != nil {
return nil, err
}
return pkeCols, nil
}
Loading
Loading