Skip to content

Commit

Permalink
More tweaks and fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Dec 15, 2023
1 parent 828a4d3 commit 2ce1231
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 25 deletions.
1 change: 1 addition & 0 deletions go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func commandCreate(cmd *cobra.Command, args []string) error {
WaitUpdateInterval: protoutil.DurationToProto(createOptions.WaitUpdateInterval),
AutoRetry: createOptions.AutoRetry,
MaxReportSampleRows: createOptions.MaxReportSampleRows,
MaxDiffDuration: protoutil.DurationToProto(createOptions.MaxDiffDuration),
})

if err != nil {
Expand Down
16 changes: 10 additions & 6 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
var BackgroundOperationTimeout = topo.RemoteOperationTimeout * 4

var ErrMaxDiffDurationExceeded = vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "table diff was stopped due to exceeding the max-diff-duration time")
var ErrVDiffStoppedByUser = vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped by user")

// compareColInfo contains the metadata for a column of the table being diffed
type compareColInfo struct {
Expand Down Expand Up @@ -522,7 +523,7 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl
case <-ctx.Done():
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-td.wd.ct.done:
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped by user")
return nil, ErrVDiffStoppedByUser
case <-stop:
return nil, ErrMaxDiffDurationExceeded
default:
Expand Down Expand Up @@ -703,13 +704,16 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D
if err != nil {
return err
}

// Update the in-memory lastPK as well so that we can restart the table
// diff if --max-diff-time was specified.
lastpkpb := &querypb.QueryResult{}
if err := prototext.Unmarshal(lastPK, lastpkpb); err != nil {
return err
// diff if --max-diff-duration was specified.
if td.wd.opts.CoreOptions.MaxDiffSeconds > 0 {
lastpkpb := &querypb.QueryResult{}
if err := prototext.Unmarshal(lastPK, lastpkpb); err != nil {
return err
}
td.lastPK = lastpkpb
}
td.lastPK = lastpkpb

query, err = sqlparser.ParseAndBind(sqlUpdateTableProgress,
sqltypes.Int64BindVariable(dr.ProcessedRows),
Expand Down
43 changes: 24 additions & 19 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,22 +136,25 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa
}

func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error {
defer func() {
cancelShardStreams := func() {
if td.shardStreamsCancel != nil {
td.shardStreamsCancel()
}
// Wait for all the shard streams to finish before returning.
td.wgShardStreamers.Wait()
}
defer func() {
cancelShardStreams()
}()

var (
timer *time.Timer
dr *DiffReport
diffErr error
diffTimer *time.Timer
diffReport *DiffReport
diffErr error
)
defer func() {
if timer != nil {
timer.Stop()
if diffTimer != nil {
diffTimer.Stop()
}
}()

Expand All @@ -171,23 +174,25 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
return ErrVDiffStoppedByUser
default:
}

if timer != nil { // We're restarting the diff
timer.Stop()
timer = nil
// Give the underlying resources (mainly MySQL) a moment to catch up.
if diffTimer != nil { // We're restarting the diff
diffTimer.Stop()
diffTimer = nil
cancelShardStreams()
// Give the underlying resources (mainly MySQL) a moment to catch up
// before we pick up where we left off (but with a new database snapshot).
time.Sleep(30 * time.Second)
}
timer = time.NewTimer(maxDiffRuntime)
diffTimer = time.NewTimer(maxDiffRuntime)

if err := td.initialize(ctx); err != nil {
return err
}
log.Infof("Table initialization done on table %s for vdiff %s", td.table.Name, wd.ct.uuid)
dr, diffErr = td.diff(ctx, wd.opts.CoreOptions.MaxRows, wd.opts.ReportOptions.DebugQuery, wd.opts.ReportOptions.OnlyPks, wd.opts.CoreOptions.MaxExtraRowsToCompare, wd.opts.ReportOptions.MaxSampleRows, timer.C)
diffReport, diffErr = td.diff(ctx, wd.opts.CoreOptions.MaxRows, wd.opts.ReportOptions.DebugQuery, wd.opts.ReportOptions.OnlyPks, wd.opts.CoreOptions.MaxExtraRowsToCompare, wd.opts.ReportOptions.MaxSampleRows, diffTimer.C)
log.Errorf("Encountered an error diffing table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, diffErr)
if diffErr == nil { // We finished the diff successfully
break
Expand All @@ -196,23 +201,23 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D
return diffErr
}
}
log.Infof("Table diff done on table %s for vdiff %s with report: %+v", td.table.Name, wd.ct.uuid, dr)
log.Infof("Table diff done on table %s for vdiff %s with report: %+v", td.table.Name, wd.ct.uuid, diffReport)

if dr.ExtraRowsSource > 0 || dr.ExtraRowsTarget > 0 {
if err := wd.reconcileExtraRows(dr, wd.opts.CoreOptions.MaxExtraRowsToCompare, wd.opts.ReportOptions.MaxSampleRows); err != nil {
if diffReport.ExtraRowsSource > 0 || diffReport.ExtraRowsTarget > 0 {
if err := wd.reconcileExtraRows(diffReport, wd.opts.CoreOptions.MaxExtraRowsToCompare, wd.opts.ReportOptions.MaxSampleRows); err != nil {
log.Errorf("Encountered an error reconciling extra rows found for table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, err)
return vterrors.Wrap(err, "failed to reconcile extra rows")
}
}

if dr.MismatchedRows > 0 || dr.ExtraRowsTarget > 0 || dr.ExtraRowsSource > 0 {
if diffReport.MismatchedRows > 0 || diffReport.ExtraRowsTarget > 0 || diffReport.ExtraRowsSource > 0 {
if err := updateTableMismatch(dbClient, wd.ct.id, td.table.Name); err != nil {
return err
}
}

log.Infof("Completed reconciliation on table %s for vdiff %s with updated report: %+v", td.table.Name, wd.ct.uuid, dr)
if err := td.updateTableStateAndReport(ctx, dbClient, CompletedState, dr); err != nil {
log.Infof("Completed reconciliation on table %s for vdiff %s with updated report: %+v", td.table.Name, wd.ct.uuid, diffReport)
if err := td.updateTableStateAndReport(ctx, dbClient, CompletedState, diffReport); err != nil {
return err
}
return nil
Expand Down

0 comments on commit 2ce1231

Please sign in to comment.