Skip to content

Commit

Permalink
VDiff: Cleanup the controller for a VDiff before deleting it (#14107)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattlord authored Sep 28, 2023
1 parent 9991dad commit 04834c4
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 62 deletions.
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)

if tc.autoRetryError {
testAutoRetryError(t, tc, cells[0].Name)
testAutoRetryError(t, tc, allCellNames)
}

if tc.resume {
testResume(t, tc, cells[0].Name)
testResume(t, tc, allCellNames)
}

// These are done here so that we have a valid workflow to test the commands against
Expand Down
17 changes: 6 additions & 11 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1986,18 +1986,13 @@ func (s *Server) checkIfPreviousJournalExists(ctx context.Context, mz *materiali
// deleteWorkflowVDiffData cleans up any potential VDiff related data associated
// with the workflow on the given tablet.
func (s *Server) deleteWorkflowVDiffData(ctx context.Context, tablet *topodatapb.Tablet, workflow string) {
sqlDeleteVDiffs := `delete from vd, vdt, vdl using _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
inner join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id)
where vd.keyspace = %s and vd.workflow = %s`
query := fmt.Sprintf(sqlDeleteVDiffs, encodeString(tablet.Keyspace), encodeString(workflow))
rows := -1
if _, err := s.tmc.ExecuteFetchAsAllPrivs(ctx, tablet, &tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest{
Query: []byte(query),
MaxRows: uint64(rows),
if _, err := s.tmc.VDiff(ctx, tablet, &tabletmanagerdatapb.VDiffRequest{
Keyspace: tablet.Keyspace,
Workflow: workflow,
Action: string(vdiff.DeleteAction),
ActionArg: vdiff.AllActionArg,
}); err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Num != sqlerror.ERNoSuchTable { // the tables may not exist if no vdiffs have been run
log.Errorf("Error deleting vdiff data for %s.%s workflow: %v", tablet.Keyspace, workflow, err)
}
log.Errorf("Error deleting vdiff data for %s.%s workflow: %v", tablet.Keyspace, workflow, err)
}
}

Expand Down
10 changes: 10 additions & 0 deletions go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,13 @@ func (tmc *fakeTMClient) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *top
RowsAffected: 1,
}, nil
}

func (tmc *fakeTMClient) VDiff(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) {
return &tabletmanagerdatapb.VDiffResponse{
Id: 1,
VdiffUuid: req.VdiffUuid,
Output: &querypb.QueryResult{
RowsAffected: 1,
},
}, nil
}
55 changes: 50 additions & 5 deletions go/vt/vttablet/tabletmanager/vdiff/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,36 @@ func (vde *Engine) handleStopAction(ctx context.Context, dbClient binlogplayer.D
}

func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error {
var err error
query := ""
vde.mu.Lock()
defer vde.mu.Unlock()
var deleteQuery string
cleanupController := func(controller *controller) {
if controller == nil {
return
}
controller.Stop()
delete(vde.controllers, controller.id)
}

switch req.ActionArg {
case AllActionArg:
query, err = sqlparser.ParseAndBind(sqlDeleteVDiffs,
// We need to stop any running controllers before we delete
// the vdiff records.
query, err := sqlparser.ParseAndBind(sqlGetVDiffIDsByKeyspaceWorkflow,
sqltypes.StringBindVariable(req.Keyspace),
sqltypes.StringBindVariable(req.Workflow),
)
if err != nil {
return err
}
res, err := dbClient.ExecuteFetch(query, -1)
if err != nil {
return err
}
for _, row := range res.Named().Rows {
cleanupController(vde.controllers[row.AsInt64("id", -1)])
}
deleteQuery, err = sqlparser.ParseAndBind(sqlDeleteVDiffs,
sqltypes.StringBindVariable(req.Keyspace),
sqltypes.StringBindVariable(req.Workflow),
)
Expand All @@ -369,12 +393,33 @@ func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer
if err != nil {
return fmt.Errorf("action argument %s not supported", req.ActionArg)
}
query, err = sqlparser.ParseAndBind(sqlDeleteVDiffByUUID, sqltypes.StringBindVariable(uuid.String()))
// We need to be sure that the controller is stopped, if
// it's still running, before we delete the vdiff record.
query, err := sqlparser.ParseAndBind(sqlGetVDiffID,
sqltypes.StringBindVariable(uuid.String()),
)
if err != nil {
return err
}
res, err := dbClient.ExecuteFetch(query, 1)
if err != nil {
return err
}
row := res.Named().Row() // Must only be one
if row == nil {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no vdiff found for UUID %s on tablet %v",
uuid, vde.thisTablet.Alias)
}
cleanupController(vde.controllers[row.AsInt64("id", -1)])
deleteQuery, err = sqlparser.ParseAndBind(sqlDeleteVDiffByUUID,
sqltypes.StringBindVariable(uuid.String()),
)
if err != nil {
return err
}
}
if _, err = dbClient.ExecuteFetch(query, 1); err != nil {
// Execute the query which deletes the vdiff record(s).
if _, err := dbClient.ExecuteFetch(deleteQuery, 1); err != nil {
return err
}

Expand Down
70 changes: 57 additions & 13 deletions go/vt/vttablet/tabletmanager/vdiff/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,18 @@ func TestPerformVDiffAction(t *testing.T) {
keyspace := "ks"
workflow := "wf"
uuid := uuid.New().String()
type queryAndResult struct {
query string
result *sqltypes.Result // Optional if you need a non-empty result
}
tests := []struct {
name string
vde *Engine
req *tabletmanagerdatapb.VDiffRequest
preFunc func() error
postFunc func() error
want *tabletmanagerdatapb.VDiffResponse
expectQueries []string
expectQueries []queryAndResult
wantErr error
}{
{
Expand All @@ -72,9 +76,13 @@ func TestPerformVDiffAction(t *testing.T) {
preFunc: func() error {
return tstenv.TopoServ.CreateCellInfo(ctx, "zone100_test", &topodatapb.CellInfo{})
},
expectQueries: []string{
fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"cell1,zone100_test\",\"target_cell\":\"cell1,zone100_test\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
},
{
query: fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"cell1,zone100_test\",\"target_cell\":\"cell1,zone100_test\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
},
},
postFunc: func() error {
return tstenv.TopoServ.DeleteCellInfo(ctx, "zone100_test", true)
Expand Down Expand Up @@ -102,9 +110,13 @@ func TestPerformVDiffAction(t *testing.T) {
Cells: cells,
})
},
expectQueries: []string{
fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"all\",\"target_cell\":\"all\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
},
{
query: fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"all\",\"target_cell\":\"all\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
},
},
postFunc: func() error {
if err := tstenv.TopoServ.DeleteCellInfo(ctx, "zone100_test", true); err != nil {
Expand All @@ -119,9 +131,21 @@ func TestPerformVDiffAction(t *testing.T) {
Action: string(DeleteAction),
ActionArg: uuid,
},
expectQueries: []string{
fmt.Sprintf(`delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
result: sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id",
"int64",
),
"1",
),
},
{
query: fmt.Sprintf(`delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vd.vdiff_uuid = %s`, encodeString(uuid)),
},
},
},
{
Expand All @@ -132,10 +156,23 @@ func TestPerformVDiffAction(t *testing.T) {
Keyspace: keyspace,
Workflow: workflow,
},
expectQueries: []string{
fmt.Sprintf(`delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where keyspace = %s and workflow = %s", encodeString(keyspace), encodeString(workflow)),
result: sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id",
"int64",
),
"1",
"2",
),
},
{
query: fmt.Sprintf(`delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
left join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id)
where vd.keyspace = %s and vd.workflow = %s`, encodeString(keyspace), encodeString(workflow)),
},
},
},
}
Expand All @@ -148,10 +185,14 @@ func TestPerformVDiffAction(t *testing.T) {
if tt.vde == nil {
tt.vde = vdiffenv.vde
}
for _, query := range tt.expectQueries {
vdiffenv.dbClient.ExpectRequest(query, &sqltypes.Result{}, nil)
for _, queryResult := range tt.expectQueries {
if queryResult.result == nil {
queryResult.result = &sqltypes.Result{}
}
vdiffenv.dbClient.ExpectRequest(queryResult.query, queryResult.result, nil)
}
got, err := tt.vde.PerformVDiffAction(ctx, tt.req)
vdiffenv.dbClient.Wait()
if tt.wantErr != nil && !vterrors.Equals(err, tt.wantErr) {
t.Errorf("Engine.PerformVDiffAction() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -163,6 +204,9 @@ func TestPerformVDiffAction(t *testing.T) {
err := tt.postFunc()
require.NoError(t, err, "post function failed: %v", err)
}
// No VDiffs should be running anymore.
require.Equal(t, 0, len(vdiffenv.vde.controllers), "expected no controllers to be running, but found %d",
len(vdiffenv.vde.controllers))
})
}
}
22 changes: 13 additions & 9 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ import (
"strings"
"time"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

/*
Expand Down Expand Up @@ -183,6 +183,8 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient)
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}
ct.workflowFilter = fmt.Sprintf("where workflow = %s and db_name = %s", encodeString(ct.workflow),
Expand All @@ -197,6 +199,8 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient)
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}
source := newMigrationSource()
Expand Down Expand Up @@ -315,9 +319,9 @@ func (ct *controller) saveErrorState(ctx context.Context, saveErr error) error {
log.Warningf("Failed to persist vdiff error state: %v. Will retry in %s", err, retryDelay.String())
select {
case <-ctx.Done():
return fmt.Errorf("engine is shutting down")
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "engine is shutting down")
case <-ct.done:
return fmt.Errorf("vdiff was stopped")
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
case <-time.After(retryDelay):
if retryDelay < maxRetryDelay {
retryDelay = time.Duration(float64(retryDelay) * 1.5)
Expand Down
15 changes: 8 additions & 7 deletions go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ const (
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d"
sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = ''
where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'`
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
sqlGetVDiffsToRun = "select * from _vt.vdiff where state in ('started','pending')" // what VDiffs have not been stopped or completed
sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'"
sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a"
sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc"
sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a"
sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)"
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
sqlGetVDiffsToRun = "select * from _vt.vdiff where state in ('started','pending')" // what VDiffs have not been stopped or completed
sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'"
sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a"
sqlGetVDiffIDsByKeyspaceWorkflow = "select id as id from _vt.vdiff where keyspace = %a and workflow = %a"
sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc"
sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a"
sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)"

sqlNewVDiffTable = "insert into _vt.vdiff_table(vdiff_id, table_name, state, table_rows) values(%a, %a, 'pending', %a)"
sqlGetVDiffTable = `select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStr
case participant.result <- result:
case <-ctx.Done():
return vterrors.Wrap(ctx.Err(), "VStreamRows")
case <-td.wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
}
return nil
})
Expand Down Expand Up @@ -494,6 +496,8 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl
select {
case <-ctx.Done():
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-td.wd.ct.done:
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}

Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}

Expand Down Expand Up @@ -184,6 +186,8 @@ func (wd *workflowDiffer) diff(ctx context.Context) error {
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}

Expand All @@ -203,6 +207,8 @@ func (wd *workflowDiffer) diff(ctx context.Context) error {
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}
query, err := sqlparser.ParseAndBind(sqlGetVDiffTable,
Expand Down
Loading

0 comments on commit 04834c4

Please sign in to comment.