Skip to content

Commit

Permalink
Add PKE handling for source as well
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 9, 2025
1 parent a325187 commit 02b7f06
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
35 changes: 24 additions & 11 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"encoding/json"
"fmt"
"slices"
"strings"
"sync"
"time"
Expand All @@ -36,6 +35,7 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
Expand Down Expand Up @@ -746,7 +746,6 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D
if err != nil {
return err
}
log.Errorf("DEBUG: lastpk for table %s to DB: %s", td.table.Name, string(lastPKTxt))
query, err = sqlparser.ParseAndBind(sqlUpdateTableProgress,
sqltypes.Int64BindVariable(dr.ProcessedRows),
sqltypes.StringBindVariable(string(lastPKTxt)),
Expand Down Expand Up @@ -826,7 +825,6 @@ func updateTableMismatch(dbClient binlogplayer.DBClient, vdiffID int64, table st
}

func (td *tableDiffer) lastPKFromRow(row []sqltypes.Value) *tabletmanagerdatapb.VDiffTableLastPK {
var source, target *querypb.QueryResult
buildQR := func(pkCols []int) *querypb.QueryResult {
pkColCnt := len(pkCols)
pkFields := make([]*querypb.Field, pkColCnt)
Expand All @@ -840,15 +838,9 @@ func (td *tableDiffer) lastPKFromRow(row []sqltypes.Value) *tabletmanagerdatapb.
Rows: []*querypb.Row{sqltypes.RowToProto3(pkVals)},
}
}
target = buildQR(td.tablePlan.pkCols)
if len(td.tablePlan.sourcePkCols) == 0 || slices.Equal(td.tablePlan.sourcePkCols, td.tablePlan.pkCols) {
source = target
} else {
source = buildQR(td.tablePlan.sourcePkCols)
}
return &tabletmanagerdatapb.VDiffTableLastPK{
Source: source,
Target: target,
Source: buildQR(td.tablePlan.sourcePkCols),
Target: buildQR(td.tablePlan.pkCols),
}
}

Expand Down Expand Up @@ -905,6 +897,9 @@ func (td *tableDiffer) getSourcePKCols() error {
ctx, cancel := context.WithTimeout(td.wd.ct.vde.ctx, topo.RemoteOperationTimeout*3)
defer cancel()
// We use the first sourceShard as all of them should have the same schema.
if len(td.wd.ct.sources) == 0 {
return fmt.Errorf("no source shards found")
}
sourceShardName := maps.Keys(td.wd.ct.sources)[0]
sourceTS, err := td.wd.getSourceTopoServer()
if err != nil {
Expand All @@ -928,6 +923,24 @@ func (td *tableDiffer) getSourcePKCols() error {
return err
}
sourceTable := sourceSchema.TableDefinitions[0]
if len(sourceTable.PrimaryKeyColumns) == 0 {
// We use the columns from a PKE if there is one.
executeFetch := func(query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
res, err := td.wd.ct.tmc.ExecuteFetchAsApp(ctx, sourceTablet.Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{
Query: []byte(query),
MaxRows: 1,
})
if err != nil {
return nil, err
}
return sqltypes.Proto3ToResult(res), nil
}
pkeCols, _, err := mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, executeFetch, td.wd.ct.sourceKeyspace, td.table.Name)
if err != nil {
return err
}
sourceTable.PrimaryKeyColumns = pkeCols
}
sourcePKColumns := make(map[string]struct{}, len(sourceTable.PrimaryKeyColumns))
td.tablePlan.sourcePkCols = make([]int, 0, len(sourceTable.PrimaryKeyColumns))
for _, pkc := range sourceTable.PrimaryKeyColumns {
Expand Down
1 change: 0 additions & 1 deletion go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ func (wd *workflowDiffer) getTableLastPK(dbClient binlogplayer.DBClient, tableNa
if lastpk, err = qr.Named().Row().ToBytes("lastpk"); err != nil {
return nil, err
}
log.Errorf("DEBUG: lastpk for table %s from DB: %s", tableName, string(lastpk))
if len(lastpk) != 0 {
lastPK := &tabletmanagerdatapb.VDiffTableLastPK{}
if err := prototext.Unmarshal(lastpk, lastPK); err != nil {
Expand Down

0 comments on commit 02b7f06

Please sign in to comment.