diff --git a/go/vt/sidecardb/schema/vdiff/vdiff.sql b/go/vt/sidecardb/schema/vdiff/vdiff.sql index 24f5cf6e7ab..7bd6b81e770 100644 --- a/go/vt/sidecardb/schema/vdiff/vdiff.sql +++ b/go/vt/sidecardb/schema/vdiff/vdiff.sql @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS _vt.vdiff `started_at` timestamp NULL DEFAULT NULL, `liveness_timestamp` timestamp NULL DEFAULT NULL, `completed_at` timestamp NULL DEFAULT NULL, - `last_error` varbinary(512) DEFAULT NULL, + `last_error` varbinary(1024) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `uuid_idx` (`vdiff_uuid`), KEY `state` (`state`), diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go index 55a211475d2..71a28bfd063 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go @@ -84,7 +84,7 @@ func TestEngineOpen(t *testing.T) { ), nil) // Now let's short circuit the vdiff as we know that the open has worked as expected. - shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient) + shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient) vdenv.vde.Open(context.Background(), vdiffenv.vre) defer vdenv.vde.Close() @@ -131,7 +131,7 @@ func TestVDiff(t *testing.T) { ), fmt.Sprintf("1|%s|%s|%s||9223372036854775807|9223372036854775807||PRIMARY,REPLICA|1669511347|0|Running||%s|200||1669511347|1|0||1", vdiffenv.workflow, vreplSource, vdiffSourceGtid, vdiffDBName), ), nil) - vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", singleRowAffected, nil) + vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", singleRowAffected, nil) vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: started')", singleRowAffected, nil) vdenv.dbClient.ExpectRequest(`select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) @@ -187,7 +187,7 @@ func TestVDiff(t *testing.T) { vdenv.dbClient.ExpectRequest("update _vt.vdiff_table set state = 'completed' where vdiff_id = 1 and table_name = 't1'", singleRowAffected, nil) vdenv.dbClient.ExpectRequest(`insert into _vt.vdiff_log(vdiff_id, message) values (1, 'completed: table \'t1\'')`, singleRowAffected, nil) vdenv.dbClient.ExpectRequest("select table_name as table_name from _vt.vdiff_table where vdiff_id = 1 and state != 'completed'", singleRowAffected, nil) - vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = '' , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil) + vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = left('', 1024) , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil) vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: completed')", singleRowAffected, nil) vdenv.vde.mu.Lock() @@ -264,7 +264,7 @@ func TestEngineRetryErroredVDiffs(t *testing.T) { ), nil) // At this point we know that we kicked off the expected retry so we can short circit the vdiff. - shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient) + shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient) expectedControllerCnt++ } diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index 5889720648b..5973326acbf 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -194,7 +194,7 @@ func resetBinlogClient() { // has verified the necessary behavior. func shortCircuitTestAfterQuery(query string, dbClient *binlogplayer.MockDBClient) { dbClient.ExpectRequest(query, singleRowAffected, fmt.Errorf("Short circuiting test")) - dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = 'Short circuiting test' where id = 1", singleRowAffected, nil) + dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = left('Short circuiting test', 1024) where id = 1", singleRowAffected, nil) dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: error')", singleRowAffected, nil) dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'Error: Short circuiting test')", singleRowAffected, nil) } diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index 619c0e5cc79..6a89c0bcc12 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -36,9 +36,16 @@ const ( vd.started_at as started_at, vdt.rows_compared as rows_compared, vd.completed_at as completed_at, IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) +<<<<<<< HEAD where vd.id = %d` // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1` sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d" +======= + where vd.id = %a` + // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`. + // It also truncates the error if needed to ensure that we can save the state when the error text is very long. + sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d" +>>>>>>> 54f2daf05d (VDiff: wait for shard streams of one table diff to complete for before starting that of the next table (#14345)) sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = '' where vd.id = vdt.vdiff_id and vd.id = %d and vd.state != 'completed'` sqlGetVReplicationEntry = "select * from _vt.vreplication %s" diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 58757bdd672..0aeb383d414 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -39,14 +39,25 @@ import ( "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" +<<<<<<< HEAD binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" +======= + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" +>>>>>>> 54f2daf05d (VDiff: wait for shard streams of one table diff to complete for before starting that of the next table (#14345)) "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet/tabletconn" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // how long to wait for background operations to complete @@ -73,6 +84,12 @@ type tableDiffer struct { sourceQuery string table *tabletmanagerdatapb.TableDefinition lastPK *querypb.QueryResult + + // wgShardStreamers is used, with a cancellable context, to wait for all shard streamers + // to finish after each diff is complete. + wgShardStreamers sync.WaitGroup + shardStreamsCtx context.Context + shardStreamsCancel context.CancelFunc } func newTableDiffer(wd *workflowDiffer, table *tabletmanagerdatapb.TableDefinition, sourceQuery string) *tableDiffer { @@ -122,19 +139,21 @@ func (td *tableDiffer) initialize(ctx context.Context) error { } }() + td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(ctx) + if err := td.selectTablets(ctx); err != nil { return err } if err := td.syncSourceStreams(ctx); err != nil { return err } - if err := td.startSourceDataStreams(ctx); err != nil { + if err := td.startSourceDataStreams(td.shardStreamsCtx); err != nil { return err } if err := td.syncTargetStreams(ctx); err != nil { return err } - if err := td.startTargetDataStream(ctx); err != nil { + if err := td.startTargetDataStream(td.shardStreamsCtx); err != nil { return err } td.setupRowSorters() @@ -204,7 +223,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { var ( wg sync.WaitGroup sourceErr, targetErr error - targetTablet *topodata.Tablet + targetTablet *topodatapb.Tablet ) // The cells from the vdiff record are a comma separated list. @@ -255,8 +274,13 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { return targetErr } +<<<<<<< HEAD func pickTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { tp, err := discovery.NewTabletPicker(ts, cells, keyspace, shard, tabletTypes) +======= +func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodatapb.Tablet, error) { + tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) +>>>>>>> 54f2daf05d (VDiff: wait for shard streams of one table diff to complete for before starting that of the next table (#14345)) if err != nil { return nil, err } @@ -355,10 +379,12 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStreamer, query string, lastPK *querypb.QueryResult, gtidch chan string) { log.Infof("streamOneShard Start on %s using query: %s", participant.tablet.Alias.String(), query) + td.wgShardStreamers.Add(1) defer func() { log.Infof("streamOneShard End on %s", participant.tablet.Alias.String()) close(participant.result) close(gtidch) + td.wgShardStreamers.Done() }() participant.err = func() error { conn, err := tabletconn.GetDialer()(participant.tablet, false) diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index 0671d676c7f..a5b3a89df56 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -94,6 +94,14 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa } func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error { + defer func() { + if td.shardStreamsCancel != nil { + td.shardStreamsCancel() + } + // Wait for all the shard streams to finish before returning. + td.wgShardStreamers.Wait() + }() + select { case <-ctx.Done(): return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")