diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 56feeee0860..16bacc5f266 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -229,6 +229,23 @@ func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) { delete(env.tablets[tablet.Keyspace], int(tablet.Alias.Uid)) } +func (env *testEnv) confirmRoutingAllTablesToTarget(t *testing.T) { + t.Helper() + env.tmc.mu.Lock() + defer env.tmc.mu.Unlock() + wantRR := make(map[string][]string) + for _, sd := range env.tmc.schema { + for _, td := range sd.TableDefinitions { + for _, tt := range []string{"", "@rdonly", "@replica"} { + wantRR[td.Name+tt] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)} + wantRR[fmt.Sprintf("%s.%s", env.sourceKeyspace.KeyspaceName, td.Name+tt)] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)} + wantRR[fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name+tt)] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)} + } + } + } + checkRouting(t, env.ws, wantRR) +} + type testTMClient struct { tmclient.TabletManagerClient schema map[string]*tabletmanagerdatapb.SchemaDefinition @@ -240,6 +257,7 @@ type testTMClient struct { env *testEnv // For access to the env config from tmc methods. reverse atomic.Bool // Are we reversing traffic? + frozen atomic.Bool // Are the workflows frozen? } func newTestTMClient(env *testEnv) *testTMClient { @@ -306,6 +324,9 @@ func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *t }, }, } + if tmc.frozen.Load() { + stream.Message = Frozen + } res.Streams = append(res.Streams, stream) } @@ -503,3 +524,90 @@ func (tmc *testTMClient) WaitForPosition(ctx context.Context, tablet *topodatapb func (tmc *testTMClient) VReplicationWaitForPos(ctx context.Context, tablet *topodatapb.Tablet, id int32, pos string) error { return nil } + +// +// Utility / helper functions. +// + +func checkRouting(t *testing.T, ws *Server, want map[string][]string) { + t.Helper() + ctx := context.Background() + got, err := topotools.GetRoutingRules(ctx, ws.ts) + require.NoError(t, err) + require.EqualValues(t, got, want, "routing rules don't match: got: %v, want: %v", got, want) + cells, err := ws.ts.GetCellInfoNames(ctx) + require.NoError(t, err) + for _, cell := range cells { + checkCellRouting(t, ws, cell, want) + } +} + +func checkCellRouting(t *testing.T, ws *Server, cell string, want map[string][]string) { + t.Helper() + ctx := context.Background() + svs, err := ws.ts.GetSrvVSchema(ctx, cell) + require.NoError(t, err) + got := make(map[string][]string, len(svs.RoutingRules.Rules)) + for _, rr := range svs.RoutingRules.Rules { + got[rr.FromTable] = append(got[rr.FromTable], rr.ToTables...) + } + require.EqualValues(t, got, want, "routing rules don't match for cell %s: got: %v, want: %v", cell, got, want) +} + +func checkDenyList(t *testing.T, ts *topo.Server, keyspace, shard string, want []string) { + t.Helper() + ctx := context.Background() + si, err := ts.GetShard(ctx, keyspace, shard) + require.NoError(t, err) + tc := si.GetTabletControl(topodatapb.TabletType_PRIMARY) + var got []string + if tc != nil { + got = tc.DeniedTables + } + require.EqualValues(t, got, want, "denied tables for %s/%s: got: %v, want: %v", keyspace, shard, got, want) +} + +func checkServedTypes(t *testing.T, ts *topo.Server, keyspace, shard string, want int) { + t.Helper() + ctx := context.Background() + si, err := ts.GetShard(ctx, keyspace, shard) + require.NoError(t, err) + servedTypes, err := ts.GetShardServingTypes(ctx, si) + require.NoError(t, err) + require.Equal(t, want, len(servedTypes), "shard %s/%s has wrong served types: got: %v, want: %v", + keyspace, shard, len(servedTypes), want) +} + +func checkCellServedTypes(t *testing.T, ts *topo.Server, keyspace, shard, cell string, want int) { + t.Helper() + ctx := context.Background() + srvKeyspace, err := ts.GetSrvKeyspace(ctx, cell, keyspace) + require.NoError(t, err) + count := 0 +outer: + for _, partition := range srvKeyspace.GetPartitions() { + for _, ref := range partition.ShardReferences { + if ref.Name == shard { + count++ + continue outer + } + } + } + require.Equal(t, want, count, "serving types for %s/%s in cell %s: got: %d, want: %d", keyspace, shard, cell, count, want) +} + +func checkIfPrimaryServing(t *testing.T, ts *topo.Server, keyspace, shard string, want bool) { + t.Helper() + ctx := context.Background() + si, err := ts.GetShard(ctx, keyspace, shard) + require.NoError(t, err) + require.Equal(t, want, si.IsPrimaryServing, "primary serving for %s/%s: got: %v, want: %v", keyspace, shard, si.IsPrimaryServing, want) +} + +func checkIfTableExistInVSchema(ctx context.Context, t *testing.T, ts *topo.Server, keyspace, table string) bool { + vschema, err := ts.GetVSchema(ctx, keyspace) + require.NoError(t, err) + require.NotNil(t, vschema) + _, ok := vschema.Tables[table] + return ok +} diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 3601ed2d1a1..22ae49883dc 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2581,20 +2581,18 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropTargets") if lockErr != nil { - ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr) } defer workflowUnlock(&err) ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets") if lockErr != nil { - ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) - return nil, lockErr + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer sourceUnlock(&err) if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets") if lockErr != nil { - ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) - return nil, lockErr + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } defer targetUnlock(&err) ctx = lockCtx @@ -2779,20 +2777,18 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropSources") if lockErr != nil { - ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr) } defer workflowUnlock(&err) ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources") if lockErr != nil { - ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) - return nil, lockErr + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer sourceUnlock(&err) if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources") if lockErr != nil { - ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) - return nil, lockErr + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } defer targetUnlock(&err) ctx = lockCtx @@ -3020,13 +3016,12 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "completeMigrateWorkflow") if lockErr != nil { - ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr) } defer workflowUnlock(&err) ctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow") if lockErr != nil { - ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) - return nil, lockErr + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } defer targetUnlock(&err) @@ -3193,16 +3188,10 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc cellsStr := strings.Join(req.Cells, ",") - // Consistently handle errors by logging and returning them. - handleError := func(message string, err error) (*[]string, error) { - werr := vterrors.Wrapf(err, message) - ts.Logger().Error(werr) - return nil, werr - } - log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String()) if !switchReplica && !switchRdonly { - return handleError("invalid tablet types", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr)) + return defaultErrorHandler(ts.Logger(), "invalid tablet types", + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr)) } // For partial (shard-by-shard migrations) or multi-tenant migrations, traffic for all tablet types // is expected to be switched at once. For other MoveTables migrations where we use table routing rules @@ -3214,7 +3203,8 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc trafficSwitchingIsAllOrNothing = true case ts.MigrationType() == binlogdatapb.MigrationType_TABLES && ts.IsMultiTenantMigration(): if direction == DirectionBackward { - return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "requesting reversal of read traffic for multi-tenant migrations is not supported")) + return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, + "requesting reversal of read traffic for multi-tenant migrations is not supported")) } // For multi-tenant migrations, we only support switching traffic to all cells at once allCells, err := ts.TopoServer().GetCellInfoNames(ctx) @@ -3222,16 +3212,19 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return nil, err } if len(req.GetCells()) != 0 && len(req.GetCells()) != len(allCells) { - return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "requesting read traffic for multi-tenant migrations must include all cells")) + return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, + "requesting read traffic for multi-tenant migrations must include all cells")) } } if !trafficSwitchingIsAllOrNothing { if direction == DirectionBackward && switchReplica && len(state.ReplicaCellsSwitched) == 0 { - return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched")) + return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + "requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched")) } if direction == DirectionBackward && switchRdonly && len(state.RdonlyCellsSwitched) == 0 { - return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")) + return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")) } } @@ -3253,7 +3246,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // If journals exist notify user and fail. journalsExist, _, err := ts.checkJournals(ctx) if err != nil { - return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err) + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err) } if journalsExist { log.Infof("Found a previous journal entry for %d", ts.id) @@ -3266,7 +3259,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } if err := ts.validate(ctx); err != nil { - return handleError("workflow validation failed", err) + return defaultErrorHandler(ts.Logger(), "workflow validation failed", err) } // For switching reads, locking the source keyspace is sufficient. @@ -3282,7 +3275,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // For reads, locking the source keyspace is sufficient. ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTTL)) if lockErr != nil { - return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer unlock(&err) confirmKeyspaceLocksHeld := func() error { @@ -3297,7 +3290,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // Remove mirror rules for the specified tablet types. if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil { - return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types", + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types", ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) } @@ -3306,7 +3299,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc case ts.IsMultiTenantMigration(): err := sw.switchKeyspaceReads(ctx, roTabletTypes) if err != nil { - return handleError(fmt.Sprintf("failed to switch read traffic, from source keyspace %s to target keyspace %s, workflow %s", + return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to switch read traffic, from source keyspace %s to target keyspace %s, workflow %s", ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) } case ts.isPartialMigration: @@ -3314,28 +3307,28 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc default: err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, rebuildSrvVSchema, direction) if err != nil { - return handleError("failed to switch read traffic for the tables", err) + return defaultErrorHandler(ts.Logger(), "failed to switch read traffic for the tables", err) } } return sw.logs(), nil } if err := confirmKeyspaceLocksHeld(); err != nil { - return handleError("locks were lost", err) + return defaultErrorHandler(ts.Logger(), "locks were lost", err) } ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil { - return handleError("failed to switch read traffic for the shards", err) + return defaultErrorHandler(ts.Logger(), "failed to switch read traffic for the shards", err) } if err := confirmKeyspaceLocksHeld(); err != nil { - return handleError("locks were lost", err) + return defaultErrorHandler(ts.Logger(), "locks were lost", err) } ts.Logger().Infof("switchShardReads Completed: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", ts.targetKeyspace, cellsStr) - return handleError("failed to validate SrvKeyspace record", err2) + return defaultErrorHandler(ts.Logger(), "failed to validate SrvKeyspace record", err2) } return sw.logs(), nil } diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 542361a1571..be78b2ae4a9 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -208,6 +208,269 @@ func TestVDiffCreate(t *testing.T) { } } +func TestMoveTablesComplete(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + table1Name := "t1" + table2Name := "t1_2" + table3Name := "t1_3" + tableTemplate := "CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))" + sourceKeyspaceName := "sourceks" + targetKeyspaceName := "targetks" + tabletTypes := []topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } + lockName := fmt.Sprintf("%s/%s", targetKeyspaceName, workflowName) + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + table1Name: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: table1Name, + Schema: fmt.Sprintf(tableTemplate, table1Name), + }, + }, + }, + table2Name: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: table2Name, + Schema: fmt.Sprintf(tableTemplate, table2Name), + }, + }, + }, + table3Name: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: table3Name, + Schema: fmt.Sprintf(tableTemplate, table3Name), + }, + }, + }, + } + + testcases := []struct { + name string + sourceKeyspace, targetKeyspace *testKeyspace + preFunc func(t *testing.T, env *testEnv) + req *vtctldatapb.MoveTablesCompleteRequest + expectedSourceQueries []*queryResult + expectedTargetQueries []*queryResult + want *vtctldatapb.MoveTablesCompleteResponse + wantErr string + postFunc func(t *testing.T, env *testEnv) + }{ + { + name: "basic", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.MoveTablesCompleteRequest{ + TargetKeyspace: targetKeyspaceName, + Workflow: workflowName, + }, + expectedSourceQueries: []*queryResult{ + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", sourceKeyspaceName, table1Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", sourceKeyspaceName, table2Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", sourceKeyspaceName, table3Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + }, + }, + expectedTargetQueries: []*queryResult{ + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + targetKeyspaceName, workflowName), + result: &querypb.QueryResult{}, + }, + }, + want: &vtctldatapb.MoveTablesCompleteResponse{ + Summary: fmt.Sprintf("Successfully completed the %s workflow in the %s keyspace", + workflowName, targetKeyspaceName), + }, + }, + { + name: "keep routing rules and data", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.MoveTablesCompleteRequest{ + TargetKeyspace: targetKeyspaceName, + Workflow: workflowName, + KeepRoutingRules: true, + KeepData: true, + }, + expectedSourceQueries: []*queryResult{ + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + }, + }, + expectedTargetQueries: []*queryResult{ + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + targetKeyspaceName, workflowName), + result: &querypb.QueryResult{}, + }, + }, + postFunc: func(t *testing.T, env *testEnv) { + env.confirmRoutingAllTablesToTarget(t) + }, + want: &vtctldatapb.MoveTablesCompleteResponse{ + Summary: fmt.Sprintf("Successfully completed the %s workflow in the %s keyspace", + workflowName, targetKeyspaceName), + }, + }, + { + name: "rename tables", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.MoveTablesCompleteRequest{ + TargetKeyspace: targetKeyspaceName, + Workflow: workflowName, + RenameTables: true, + }, + expectedSourceQueries: []*queryResult{ + { + query: fmt.Sprintf("rename table `vt_%s`.`%s` TO `vt_%s`.`_%s_old`", sourceKeyspaceName, table1Name, sourceKeyspaceName, table1Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("rename table `vt_%s`.`%s` TO `vt_%s`.`_%s_old`", sourceKeyspaceName, table2Name, sourceKeyspaceName, table2Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("rename table `vt_%s`.`%s` TO `vt_%s`.`_%s_old`", sourceKeyspaceName, table3Name, sourceKeyspaceName, table3Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + }, + }, + expectedTargetQueries: []*queryResult{ + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + targetKeyspaceName, workflowName), + result: &querypb.QueryResult{}, + }, + }, + want: &vtctldatapb.MoveTablesCompleteResponse{ + Summary: fmt.Sprintf("Successfully completed the %s workflow in the %s keyspace", + workflowName, targetKeyspaceName), + }, + }, + { + name: "named lock held", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.MoveTablesCompleteRequest{ + TargetKeyspace: targetKeyspaceName, + Workflow: workflowName, + KeepRoutingRules: true, + }, + preFunc: func(t *testing.T, env *testEnv) { + _, _, err := env.ts.LockName(ctx, lockName, "test") + require.NoError(t, err) + topo.LockTimeout = 500 * time.Millisecond + }, + postFunc: func(t *testing.T, env *testEnv) { + topo.LockTimeout = 45 * time.Second // reset it to the default + }, + wantErr: fmt.Sprintf("failed to lock the %s workflow: deadline exceeded: internal/named_locks/%s", lockName, lockName), + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + require.NotNil(t, tc.sourceKeyspace) + require.NotNil(t, tc.targetKeyspace) + require.NotNil(t, tc.req) + env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) + defer env.close() + env.tmc.schema = schema + env.tmc.frozen.Store(true) + if tc.expectedSourceQueries != nil { + require.NotNil(t, env.tablets[tc.sourceKeyspace.KeyspaceName]) + for _, eq := range tc.expectedSourceQueries { + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, eq) + } + } + if tc.expectedTargetQueries != nil { + require.NotNil(t, env.tablets[tc.targetKeyspace.KeyspaceName]) + for _, eq := range tc.expectedTargetQueries { + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, eq) + } + } + if tc.preFunc != nil { + tc.preFunc(t, env) + } + // Setup the routing rules as they would be after having previously done SwitchTraffic. + env.addTableRoutingRules(t, ctx, tabletTypes, []string{table1Name, table2Name, table3Name}) + got, err := env.ws.MoveTablesComplete(ctx, tc.req) + if tc.wantErr != "" { + require.EqualError(t, err, tc.wantErr) + } else { + require.NoError(t, err) + require.EqualValues(t, got, tc.want, "Server.MoveTablesComplete() = %v, want %v", got, tc.want) + } + if tc.postFunc != nil { + tc.postFunc(t, env) + } else { // Default post checks + // Confirm that we have no routing rules. + rr, err := env.ts.GetRoutingRules(ctx) + require.NoError(t, err) + require.Zero(t, rr.Rules) + + // Confirm that we have no shard tablet controls, which is where + // DeniedTables live. + for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { + for _, shardName := range keyspace.ShardNames { + checkDenyList(t, env.ts, keyspace.KeyspaceName, shardName, nil) + } + } + } + }) + } +} + func TestWorkflowDelete(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() @@ -219,6 +482,7 @@ func TestWorkflowDelete(t *testing.T) { tableTemplate := "CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))" sourceKeyspaceName := "sourceks" targetKeyspaceName := "targetks" + lockName := fmt.Sprintf("%s/%s", targetKeyspaceName, workflowName) schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ table1Name: { TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ @@ -254,7 +518,7 @@ func TestWorkflowDelete(t *testing.T) { expectedSourceQueries []*queryResult expectedTargetQueries []*queryResult want *vtctldatapb.WorkflowDeleteResponse - wantErr bool + wantErr string postFunc func(t *testing.T, env *testEnv) }{ { @@ -389,6 +653,30 @@ func TestWorkflowDelete(t *testing.T) { } }, }, + { + name: "named lock held", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowDeleteRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + }, + preFunc: func(t *testing.T, env *testEnv) { + _, _, err := env.ts.LockName(ctx, lockName, "test") + require.NoError(t, err) + topo.LockTimeout = 500 * time.Millisecond + }, + postFunc: func(t *testing.T, env *testEnv) { + topo.LockTimeout = 45 * time.Second // reset it to the default + }, + wantErr: fmt.Sprintf("failed to lock the %s workflow: deadline exceeded: internal/named_locks/%s", lockName, lockName), + }, } for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { @@ -414,11 +702,12 @@ func TestWorkflowDelete(t *testing.T) { tc.preFunc(t, env) } got, err := env.ws.WorkflowDelete(ctx, tc.req) - if (err != nil) != tc.wantErr { - require.Fail(t, "unexpected error value", "Server.WorkflowDelete() error = %v, wantErr %v", err, tc.wantErr) - return + if tc.wantErr != "" { + require.EqualError(t, err, tc.wantErr) + } else { + require.NoError(t, err) + require.EqualValues(t, got, tc.want, "Server.WorkflowDelete() = %v, want %v", got, tc.want) } - require.EqualValues(t, got, tc.want, "Server.WorkflowDelete() = %v, want %v", got, tc.want) if tc.postFunc != nil { tc.postFunc(t, env) } else { // Default post checks @@ -431,9 +720,7 @@ func TestWorkflowDelete(t *testing.T) { // DeniedTables live. for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { for _, shardName := range keyspace.ShardNames { - si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName) - require.NoError(t, err) - require.Zero(t, si.Shard.TabletControls) + checkDenyList(t, env.ts, keyspace.KeyspaceName, shardName, nil) } } } diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 9cedf01733e..fde7b36da6e 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -37,6 +37,7 @@ import ( "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" @@ -959,3 +960,11 @@ func IsTableDidNotExistError(err error) bool { } return false } + +// defaultErrorHandler provides a way to consistently handle errors by logging and +// returning them. +func defaultErrorHandler(logger logutil.Logger, message string, err error) (*[]string, error) { + werr := vterrors.Wrapf(err, message) + logger.Error(werr) + return nil, werr +}