Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VDiff: wait for shard streams of one table diff to complete for before starting that of the next table #14345

Merged
merged 4 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/sidecardb/schema/vdiff/vdiff.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS vdiff
`started_at` timestamp NULL DEFAULT NULL,
`liveness_timestamp` timestamp NULL DEFAULT NULL,
`completed_at` timestamp NULL DEFAULT NULL,
`last_error` varbinary(512) DEFAULT NULL,
`last_error` varbinary(1024) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uuid_idx` (`vdiff_uuid`),
KEY `state` (`state`),
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/vdiff/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestEngineOpen(t *testing.T) {
), nil)

// Now let's short circuit the vdiff as we know that the open has worked as expected.
shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient)
shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient)

vdenv.vde.Open(context.Background(), vdiffenv.vre)
defer vdenv.vde.Close()
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestVDiff(t *testing.T) {
),
fmt.Sprintf("1|%s|%s|%s||9223372036854775807|9223372036854775807||PRIMARY,REPLICA|1669511347|0|Running||%s|200||1669511347|1|0||1", vdiffenv.workflow, vreplSource, vdiffSourceGtid, vdiffDBName),
), nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: started')", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest(`select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report
from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestVDiff(t *testing.T) {
vdenv.dbClient.ExpectRequest("update _vt.vdiff_table set state = 'completed' where vdiff_id = 1 and table_name = 't1'", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest(`insert into _vt.vdiff_log(vdiff_id, message) values (1, 'completed: table \'t1\'')`, singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("select table_name as table_name from _vt.vdiff_table where vdiff_id = 1 and state != 'completed'", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = '' , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = left('', 1024) , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: completed')", singleRowAffected, nil)

vdenv.vde.mu.Lock()
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestEngineRetryErroredVDiffs(t *testing.T) {
), nil)

// At this point we know that we kicked off the expected retry so we can short circit the vdiff.
shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient)
shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient)

expectedControllerCnt++
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func resetBinlogClient() {
// has verified the necessary behavior.
func shortCircuitTestAfterQuery(query string, dbClient *binlogplayer.MockDBClient) {
dbClient.ExpectRequest(query, singleRowAffected, fmt.Errorf("Short circuiting test"))
dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = 'Short circuiting test' where id = 1", singleRowAffected, nil)
dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = left('Short circuiting test', 1024) where id = 1", singleRowAffected, nil)
dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: error')", singleRowAffected, nil)
dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'Error: Short circuiting test')", singleRowAffected, nil)
}
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ const (
IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report
from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vd.id = %a`
// sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d"
// sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`.
// It also truncates the error if needed to ensure that we can save the state when the error text is very long.
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d"
sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = ''
where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'`
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
Expand Down
29 changes: 20 additions & 9 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,19 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// how long to wait for background operations to complete
Expand All @@ -72,6 +73,12 @@ type tableDiffer struct {
sourceQuery string
table *tabletmanagerdatapb.TableDefinition
lastPK *querypb.QueryResult

// wgShardStreamers is used, with a cancellable context, to wait for all shard streamers
// to finish after each diff is complete.
wgShardStreamers sync.WaitGroup
shardStreamsCtx context.Context
shardStreamsCancel context.CancelFunc
}

func newTableDiffer(wd *workflowDiffer, table *tabletmanagerdatapb.TableDefinition, sourceQuery string) *tableDiffer {
Expand Down Expand Up @@ -121,19 +128,21 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
}
}()

td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(ctx)

if err := td.selectTablets(ctx); err != nil {
return err
}
if err := td.syncSourceStreams(ctx); err != nil {
return err
}
if err := td.startSourceDataStreams(ctx); err != nil {
if err := td.startSourceDataStreams(td.shardStreamsCtx); err != nil {
return err
}
if err := td.syncTargetStreams(ctx); err != nil {
return err
}
if err := td.startTargetDataStream(ctx); err != nil {
if err := td.startTargetDataStream(td.shardStreamsCtx); err != nil {
return err
}
td.setupRowSorters()
Expand Down Expand Up @@ -203,7 +212,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
var (
wg sync.WaitGroup
sourceErr, targetErr error
targetTablet *topodata.Tablet
targetTablet *topodatapb.Tablet
)

// The cells from the vdiff record are a comma separated list.
Expand Down Expand Up @@ -254,7 +263,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
return targetErr
}

func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodatapb.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -354,10 +363,12 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err

func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStreamer, query string, lastPK *querypb.QueryResult, gtidch chan string) {
log.Infof("streamOneShard Start on %s using query: %s", participant.tablet.Alias.String(), query)
td.wgShardStreamers.Add(1)
defer func() {
log.Infof("streamOneShard End on %s", participant.tablet.Alias.String())
close(participant.result)
close(gtidch)
td.wgShardStreamers.Done()
}()
participant.err = func() error {
conn, err := tabletconn.GetDialer()(participant.tablet, false)
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa
}

func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error {
defer func() {
if td.shardStreamsCancel != nil {
td.shardStreamsCancel()
}
// Wait for all the shard streams to finish before returning.
td.wgShardStreamers.Wait()
}()

select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
Expand Down
Loading