diff --git a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go index 019367fe82b..4004afc0ac0 100644 --- a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go +++ b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go @@ -91,6 +91,7 @@ func commandSwitchTraffic(cmd *cobra.Command, args []string) error { req := &vtctldatapb.WorkflowSwitchTrafficRequest{ Keyspace: BaseOptions.TargetKeyspace, Workflow: BaseOptions.Workflow, + Cells: SwitchTrafficOptions.Cells, TabletTypes: SwitchTrafficOptions.TabletTypes, MaxReplicationLagAllowed: protoutil.DurationToProto(SwitchTrafficOptions.MaxReplicationLagAllowed), Timeout: protoutil.DurationToProto(SwitchTrafficOptions.Timeout), diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index a75b6727fdc..189857b71ee 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -34,7 +34,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) -const testWorkflowFlavor = workflowFlavorRandom +const testWorkflowFlavor = workflowFlavorVtctld // TestFKWorkflow runs a MoveTables workflow with atomic copy for a db with foreign key constraints. // It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without, diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 022262d4b15..b4b31836af3 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -46,9 +46,11 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/proto/topodata" ) const ( @@ -410,7 +412,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa // as a CSV have secondary keys. This is useful when testing the // --defer-secondary-keys flag to confirm that the secondary keys // were re-added by the time the workflow hits the running phase. -// For a Reshard workflow, where no tables are specififed, pass +// For a Reshard workflow, where no tables are specified, pass // an empty string for the tables and all tables in the target // keyspace will be checked. func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletProcess, ksName string, tables string) { @@ -430,6 +432,12 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro } } for _, tablet := range tablets { + // Be sure that the schema is up to date. + err := vc.VtctldClient.ExecuteCommand("ReloadSchema", topoproto.TabletAliasString(&topodata.TabletAlias{ + Cell: tablet.Cell, + Uid: uint32(tablet.TabletUID), + })) + require.NoError(t, err) for _, table := range tableArr { if schema.IsInternalOperationTableName(table) { continue diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index e853022bfd4..88231d4be87 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" ) func TestMoveTablesBuffering(t *testing.T) { @@ -16,7 +15,7 @@ func TestMoveTablesBuffering(t *testing.T) { vc = setupMinimalCluster(t) defer vc.TearDown() - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables setupMinimalCustomerKeyspace(t) tables := "loadtest" err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs, diff --git a/go/test/endtoend/vreplication/partial_movetables_seq_test.go b/go/test/endtoend/vreplication/partial_movetables_seq_test.go index bb354a5ec01..eec304e0a4d 100644 --- a/go/test/endtoend/vreplication/partial_movetables_seq_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_seq_test.go @@ -27,7 +27,6 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -228,12 +227,12 @@ func (wf *workflow) create() { cell := wf.tc.defaultCellName switch typ { case "movetables": - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables sourceShards := strings.Join(wf.options.sourceShards, ",") err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace, strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", defaultWorkflowExecOptions) case "reshard": - currentWorkflowType = wrangler.ReshardWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard sourceShards := strings.Join(wf.options.sourceShards, ",") targetShards := strings.Join(wf.options.targetShards, ",") if targetShards == "" { @@ -389,7 +388,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { // Switch all traffic for the shard wf80Dash.switchTraffic() - expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", + expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", targetKs, wfName, shard, shard) require.Equal(t, expectedSwitchOutput, lastOutput) currentCustomerCount = getCustomerCount(t, "") @@ -449,7 +448,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { wfDash80.create() wfDash80.switchTraffic() - expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", + expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", targetKs, wfName) require.Equal(t, expectedSwitchOutput, lastOutput) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 0acb922cd30..956710edd8a 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -56,7 +56,7 @@ var ( sourceTab, sourceReplicaTab, sourceRdonlyTab *cluster.VttabletProcess lastOutput string - currentWorkflowType wrangler.VReplicationWorkflowType + currentWorkflowType binlogdatapb.VReplicationWorkflowType ) type workflowExecOptions struct { @@ -103,59 +103,62 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, sourceShards, targetShards string, options *workflowExecOptions) error { var args []string - if currentWorkflowType == wrangler.MoveTablesWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { args = append(args, "MoveTables") } else { args = append(args, "Reshard") } - args = append(args, "--") + args = append(args, "--workflow", workflow, "--target-keyspace", targetKs, action) - if BypassLagCheck { - args = append(args, "--max_replication_lag_allowed=2542087h") - } - if options.atomicCopy { - args = append(args, "--atomic-copy") - } switch action { case workflowActionCreate: - if currentWorkflowType == wrangler.MoveTablesWorkflow { - args = append(args, "--source", sourceKs) + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { + args = append(args, "--source-keyspace", sourceKs) if tables != "" { args = append(args, "--tables", tables) } else { - args = append(args, "--all") + args = append(args, "--all-tables") } if sourceShards != "" { - args = append(args, "--source_shards", sourceShards) + args = append(args, "--source-shards", sourceShards) } } else { - args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards) + args = append(args, "--source-shards", sourceShards, "--target-shards", targetShards) } // Test new experimental --defer-secondary-keys flag switch currentWorkflowType { - case wrangler.MoveTablesWorkflow, wrangler.MigrateWorkflow, wrangler.ReshardWorkflow: - + case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard: if !options.atomicCopy && options.deferSecondaryKeys { args = append(args, "--defer-secondary-keys") } - args = append(args, "--initialize-target-sequences") // Only used for MoveTables } default: if options.shardSubset != "" { args = append(args, "--shards", options.shardSubset) } } - if cells != "" { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && action == workflowActionSwitchTraffic { + args = append(args, "--initialize-target-sequences") + } + if action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic { + if BypassLagCheck { + args = append(args, "--max-replication-lag-allowed=2542087h") + } + args = append(args, "--timeout", time.Minute.String()) + } + if action == workflowActionCreate && options.atomicCopy { + args = append(args, "--atomic-copy") + } + if (action == workflowActionCreate || action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic) && cells != "" { args = append(args, "--cells", cells) } - if tabletTypes != "" { - args = append(args, "--tablet_types", tabletTypes) + if action != workflowActionComplete && tabletTypes != "" { + args = append(args, "--tablet-types", tabletTypes) } - args = append(args, "--timeout", time.Minute.String()) - ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) - args = append(args, action, ksWorkflow) - output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...) + args = append(args, "--action_timeout=2m") + t.Logf("Executing workflow command: vtctldclient %v", args) + output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...) lastOutput = output if err != nil { return fmt.Errorf("%s: %s", err, output) @@ -285,8 +288,8 @@ func checkStates(t *testing.T, startState, endState string) { require.Contains(t, lastOutput, fmt.Sprintf("Current State: %s", endState)) } -func getCurrentState(t *testing.T) string { - if err := tstWorkflowAction(t, "GetState", "", ""); err != nil { +func getCurrentStatus(t *testing.T) string { + if err := tstWorkflowAction(t, "status", "", ""); err != nil { return err.Error() } return strings.TrimSpace(strings.Trim(lastOutput, "\n")) @@ -335,7 +338,7 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { // at this point the unsharded product and sharded customer keyspaces are created by previous tests // use MoveTables to move customer2 from product to customer using - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables err := tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, "customer2", workflowActionCreate, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) @@ -432,7 +435,7 @@ func testReplicatingWithPKEnumCols(t *testing.T) { func testReshardV2Workflow(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() - currentWorkflowType = wrangler.ReshardWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard // create internal tables on the original customer shards that should be // ignored and not show up on the new shards @@ -441,7 +444,7 @@ func testReshardV2Workflow(t *testing.T) { createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-") createReshardWorkflow(t, "-80,80-", "-40,40-80,80-c0,c0-") - if !strings.Contains(lastOutput, "Workflow started successfully") { + if !strings.Contains(lastOutput, "Status: Running") { t.Fail() } validateReadsRouteToSource(t, "replica") @@ -461,16 +464,15 @@ func testReshardV2Workflow(t *testing.T) { func testMoveTablesV2Workflow(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables // test basic forward and reverse flows setupCustomerKeyspace(t) // The purge table should get skipped/ignored // If it's not then we'll get an error as the table doesn't exist in the vschema createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") - if !strings.Contains(lastOutput, "Workflow started successfully") { - t.Fail() - } + require.Contains(t, lastOutput, "Status: Running") + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) validateReadsRouteToSource(t, "replica") validateWritesRouteToSource(t) @@ -485,26 +487,29 @@ func testMoveTablesV2Workflow(t *testing.T) { testRestOfWorkflow(t) - listAllArgs := []string{"workflow", "customer", "listall"} - output, _ := vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "No workflows found in keyspace customer") + listAllArgs := []string{"workflow", "--keyspace", "customer", "list"} + output, err := vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.Equal(t, output, "[]") testVSchemaForSequenceAfterMoveTables(t) createMoveTablesWorkflow(t, "Lead,Lead-1") - output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1") + output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.Contains(t, output, "wf1") - err := tstWorkflowCancel(t) + err = tstWorkflowCancel(t) require.NoError(t, err) - output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "No workflows found in keyspace customer") + output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.Equal(t, output, "[]") } func testPartialSwitches(t *testing.T) { // nothing switched - require.Equal(t, getCurrentState(t), wrangler.WorkflowStateNotSwitched) + require.Contains(t, getCurrentStatus(t), wrangler.WorkflowStateNotSwitched) tstWorkflowSwitchReads(t, "replica,rdonly", "zone1") nextState := "Reads partially switched. Replica switched in cells: zone1. Rdonly switched in cells: zone1. Writes Not Switched" checkStates(t, wrangler.WorkflowStateNotSwitched, nextState) @@ -526,7 +531,7 @@ func testPartialSwitches(t *testing.T) { checkStates(t, nextState, nextState) // idempotency keyspace := "product" - if currentWorkflowType == wrangler.ReshardWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_Reshard { keyspace = "customer" } waitForLowLag(t, keyspace, "wf1_reverse") @@ -563,7 +568,7 @@ func testRestOfWorkflow(t *testing.T) { // this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces keyspace := "product" - if currentWorkflowType == wrangler.ReshardWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_Reshard { keyspace = "customer" } waitForLowLag(t, keyspace, "wf1_reverse") diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 5c7e01aa155..407fe12c3ed 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) type iWorkflow interface { @@ -117,7 +117,7 @@ func newVtctlMoveTables(mt *moveTablesWorkflow) *VtctlMoveTables { } func (vmt *VtctlMoveTables) Create() { - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables vmt.exec(workflowActionCreate) } @@ -314,7 +314,7 @@ func newVtctlReshard(rs *reshardWorkflow) *VtctlReshard { } func (vrs *VtctlReshard) Create() { - currentWorkflowType = wrangler.ReshardWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard vrs.exec(workflowActionCreate) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index f63fb69dea7..0aa9a84d569 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3005,13 +3005,15 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } } + cellsStr := strings.Join(req.Cells, ",") + // Consistently handle errors by logging and returning them. handleError := func(message string, err error) (*[]string, error) { - ts.Logger().Error(err) + ts.Logger().Errorf("%s: %v", message, err) return nil, err } - log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, ts.optCells, state.String()) + log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String()) if !switchReplica && !switchRdonly { return handleError("invalid tablet types", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr)) } @@ -3023,14 +3025,6 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")) } } - cells := req.Cells - // If no cells were provided in the command then use the value from the workflow. - if len(cells) == 0 && ts.optCells != "" { - cells = strings.Split(strings.TrimSpace(ts.optCells), ",") - } - for i, cell := range cells { - cells[i] = strings.TrimSpace(cell) - } // If there are no rdonly tablets in the cells ask to switch rdonly tablets as well so that routing rules // are updated for rdonly as well. Otherwise vitess will not know that the workflow has completed and will @@ -3038,7 +3032,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // rdonly tablets. if switchReplica && !switchRdonly { var err error - rdonlyTabletsExist, err := topotools.DoCellsHaveRdonlyTablets(ctx, s.ts, cells) + rdonlyTabletsExist, err := topotools.DoCellsHaveRdonlyTablets(ctx, s.ts, req.Cells) if err != nil { return nil, err } @@ -3076,20 +3070,20 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { if ts.isPartialMigration { ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.") - } else if err := sw.switchTableReads(ctx, cells, roTabletTypes, direction); err != nil { + } else if err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the tables", err) } return sw.logs(), nil } - ts.Logger().Infof("About to switchShardReads: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction) - if err := sw.switchShardReads(ctx, cells, roTabletTypes, direction); err != nil { + ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) + if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the shards", err) } - ts.Logger().Infof("switchShardReads Completed: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction) - if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, strings.Join(cells, ",")); err != nil { + ts.Logger().Infof("switchShardReads Completed: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) + if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.targetKeyspace, strings.Join(cells, ",")) + ts.targetKeyspace, cellsStr) return handleError("failed to validate SrvKeyspace record", err2) } return sw.logs(), nil @@ -3108,7 +3102,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // Consistently handle errors by logging and returning them. handleError := func(message string, err error) (int64, *[]string, error) { - werr := vterrors.Errorf(vtrpcpb.Code_INTERNAL, fmt.Sprintf("%s: %v", message, err)) + werr := vterrors.Wrapf(err, message) ts.Logger().Error(werr) return 0, nil, werr } @@ -3175,8 +3169,27 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return 0, sw.logs(), nil } + // We stop writes on the source before stopping the streams so that the catchup + // time is lessened and other workflows that we have to migrate such as + // materialize workflows that are within a single keyspace (source and target) + // also have a chance to catch up as well as those are internally generated + // GTIDs within the shard. For materialization streams that we migrate where + // the source and target are the keyspace being resharded, we wait for those + // to catchup in the stopStreams path before we actually stop them. + ts.Logger().Infof("Stopping source writes") + if err := sw.stopSourceWrites(ctx); err != nil { + sw.cancelMigration(ctx, sm) + return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) + } + ts.Logger().Infof("Stopping streams") - sourceWorkflows, err = sw.stopStreams(ctx, sm) + // Use a shorter context for this since since when doing a Reshard, if there are intra keyspace + // materializations then we have to wait for them to catchup before switching traffic for the + // Reshard workflow. We use the timeout where which is used to limit the amount of time that we + // wait for VReplication to catch up. + stopCtx, stopCancel := context.WithTimeout(ctx, timeout) + defer stopCancel() + sourceWorkflows, err = sw.stopStreams(stopCtx, sm) if err != nil { for key, streams := range sm.Streams() { for _, stream := range streams { @@ -3187,12 +3200,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError("failed to stop the workflow streams", err) } - ts.Logger().Infof("Stopping source writes") - if err := sw.stopSourceWrites(ctx); err != nil { - sw.cancelMigration(ctx, sm) - return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) - } - if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { ts.Logger().Infof("Executing LOCK TABLES on source tables %d times", lockTablesCycles) // Doing this twice with a pause in-between to catch any writes that may have raced in between diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index c8551c5ff73..12a15f4147b 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -547,15 +547,12 @@ func (ts *trafficSwitcher) dropSourceShards(ctx context.Context) error { } func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { - var fromShards, toShards []*topo.ShardInfo - if direction == DirectionForward { - fromShards, toShards = ts.SourceShards(), ts.TargetShards() - } else { - fromShards, toShards = ts.TargetShards(), ts.SourceShards() - } - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), strings.Join(cells, ",")); err != nil { + cellsStr := strings.Join(cells, ",") + log.Infof("switchShardReads: cells: %s, tablet types: %+v, direction %d", cellsStr, servedTypes, direction) + fromShards, toShards := ts.SourceShards(), ts.TargetShards() + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.TargetKeyspaceName(), strings.Join(cells, ",")) + ts.TargetKeyspaceName(), cellsStr) log.Errorf("%w", err2) return err2 } @@ -571,9 +568,9 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, return err } } - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), strings.Join(cells, ",")); err != nil { + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.TargetKeyspaceName(), strings.Join(cells, ",")) + ts.TargetKeyspaceName(), cellsStr) log.Errorf("%w", err2) return err2 } @@ -581,7 +578,7 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, } func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { - log.Infof("switchTableReads: servedTypes: %+v, direction %t", servedTypes, direction) + log.Infof("switchTableReads: cells: %s, tablet types: %+v, direction %d", strings.Join(cells, ","), servedTypes, direction) rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err @@ -939,7 +936,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati }); err != nil { return err } - // all targets have caught up, record their positions for setting up reverse workflows + // All targets have caught up, record their positions for setting up reverse workflows. return ts.ForAllTargets(func(target *MigrationTarget) error { var err error target.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, target.GetPrimary().Tablet) @@ -1005,7 +1002,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { - ts.Logger().Errorf("Cancel migration failed:", err) + ts.Logger().Errorf("Cancel migration failed: %v", err) } sm.CancelStreamMigrations(ctx) @@ -1369,7 +1366,7 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa for _, table := range ts.Tables() { vs, ok := vschema.Tables[table] - if !ok || vs == nil || vs.AutoIncrement == nil || vs.AutoIncrement.Sequence == "" { + if !ok || vs.GetAutoIncrement() == nil || vs.GetAutoIncrement().GetSequence() == "" { continue } sm := &sequenceMetadata{ @@ -1411,7 +1408,7 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa // the primary tablet serving the sequence to refresh/reset its cache to // be sure that it does not provide a value that is less than the current max. func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata) error { - initSequenceTable := func(ictx context.Context, sequenceTableName string, sequenceMetadata *sequenceMetadata) error { + initSequenceTable := func(ictx context.Context, sequenceMetadata *sequenceMetadata) error { // Now we need to run this query on the target shards in order // to get the max value and set the next id for the sequence to // a higher value. @@ -1458,6 +1455,10 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen return ictx.Err() default: } + if len(shardResults) == 0 { // This should never happen + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get any results for the max used sequence value for target table %s.%s in order to initialize the backing sequence table", + ts.targetKeyspace, sequenceMetadata.usingTableName) + } // Sort the values to find the max value across all shards. sort.Slice(shardResults, func(i, j int) bool { return shardResults[i] < shardResults[j] @@ -1492,12 +1493,7 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen ) // Now execute this on the primary tablet of the unsharded keyspace // housing the backing table. - primaryTablet, ierr := ts.TopoServer().GetTablet(ictx, sequenceShard.PrimaryAlias) - if ierr != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for %s.%s using alias %s: %v", - sequenceShard.Keyspace(), sequenceShard.ShardName(), sequenceShard.PrimaryAlias, ierr) - } - qr, ierr := ts.ws.tmc.ExecuteFetchAsApp(ictx, primaryTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ + qr, ierr := ts.ws.tmc.ExecuteFetchAsApp(ictx, sequenceTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ Query: []byte(query.Query), MaxRows: 1, }) @@ -1532,10 +1528,9 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen } initGroup, gctx := errgroup.WithContext(ctx) - for sequenceTableName, sequenceMetadata := range sequencesByBackingTable { - sequenceTableName, sequenceMetadata := sequenceTableName, sequenceMetadata // https://golang.org/doc/faq#closures_and_goroutines + for _, sequenceMetadata := range sequencesByBackingTable { initGroup.Go(func() error { - return initSequenceTable(gctx, sequenceTableName, sequenceMetadata) + return initSequenceTable(gctx, sequenceMetadata) }) } return initGroup.Wait() diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index 1489d6eccb7..de4a9c21c8d 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -79,7 +79,7 @@ func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *ta } // Use the local cell if none are specified. if len(req.Cells) == 0 || strings.TrimSpace(req.Cells[0]) == "" { - req.Cells = append(req.Cells, tm.Tablet().Alias.Cell) + req.Cells = []string{tm.Tablet().Alias.Cell} } wfState := binlogdatapb.VReplicationWorkflowState_Stopped.String() tabletTypesStr := topoproto.MakeStringTypeCSV(req.TabletTypes) diff --git a/test/config.json b/test/config.json index 7d429ce6978..9bbab938f77 100644 --- a/test/config.json +++ b/test/config.json @@ -1085,15 +1085,6 @@ "RetryMax": 1, "Tags": [] }, - "vreplication_vtctldclient_cli": { - "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldclientCLI", "-timeout", "20m"], - "Command": [], - "Manual": false, - "Shard": "vreplication_basic", - "RetryMax": 1, - "Tags": [] - }, "vreplication_copy_parallel": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVreplicationCopyParallel", "-timeout", "20m"], @@ -1256,12 +1247,21 @@ "RetryMax": 1, "Tags": [] }, + "vreplication_vtctldclient_cli": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldclientCLI", "-timeout", "20m"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", + "RetryMax": 1, + "Tags": [] + }, "vreplication_vtctl_migrate": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctlMigrate", "-timeout", "30m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1270,7 +1270,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldMigrate", "-timeout", "30m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1279,7 +1279,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVDiff2", "-timeout", "30m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1288,7 +1288,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesTZ"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] },