diff --git a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go index 019367fe82b..4004afc0ac0 100644 --- a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go +++ b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go @@ -91,6 +91,7 @@ func commandSwitchTraffic(cmd *cobra.Command, args []string) error { req := &vtctldatapb.WorkflowSwitchTrafficRequest{ Keyspace: BaseOptions.TargetKeyspace, Workflow: BaseOptions.Workflow, + Cells: SwitchTrafficOptions.Cells, TabletTypes: SwitchTrafficOptions.TabletTypes, MaxReplicationLagAllowed: protoutil.DurationToProto(SwitchTrafficOptions.MaxReplicationLagAllowed), Timeout: protoutil.DurationToProto(SwitchTrafficOptions.Timeout), diff --git a/go/streamlog/streamlog_test.go b/go/streamlog/streamlog_test.go index 9c0b0366a1d..17d3c4148e2 100644 --- a/go/streamlog/streamlog_test.go +++ b/go/streamlog/streamlog_test.go @@ -213,7 +213,7 @@ func TestFile(t *testing.T) { logger.Send(&logMessage{"test 2"}) // Allow time for propagation - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) want := "test 1\ntest 2\n" contents, _ := os.ReadFile(logPath) @@ -227,7 +227,7 @@ func TestFile(t *testing.T) { os.Rename(logPath, rotatedPath) logger.Send(&logMessage{"test 3"}) - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) want = "test 1\ntest 2\ntest 3\n" contents, _ = os.ReadFile(rotatedPath) @@ -241,10 +241,10 @@ func TestFile(t *testing.T) { if err := syscall.Kill(syscall.Getpid(), syscall.SIGUSR2); err != nil { t.Logf("failed to send streamlog rotate signal: %v", err) } - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) logger.Send(&logMessage{"test 4"}) - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) want = "test 1\ntest 2\ntest 3\n" contents, _ = os.ReadFile(rotatedPath) diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index f92382d5f2d..dade584750f 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -660,6 +660,7 @@ func VttabletProcessInstance(port, grpcPort, tabletUID int, cell, shard, keyspac Binary: "vttablet", FileToLogQueries: path.Join(tmpDirectory, fmt.Sprintf("/vt_%010d_querylog.txt", tabletUID)), Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)), + Cell: cell, TabletPath: fmt.Sprintf("%s-%010d", cell, tabletUID), ServiceMap: "grpc-queryservice,grpc-tabletmanager,grpc-updatestream,grpc-throttler", LogDir: tmpDirectory, diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 54d057fe6e9..f5e89e157da 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -46,9 +46,11 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) const ( @@ -377,6 +379,9 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa } } } + if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && attributeValue.Get("Pos").String() == "" { + done = false + } } else { done = false } @@ -410,7 +415,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa // as a CSV have secondary keys. This is useful when testing the // --defer-secondary-keys flag to confirm that the secondary keys // were re-added by the time the workflow hits the running phase. -// For a Reshard workflow, where no tables are specififed, pass +// For a Reshard workflow, where no tables are specified, pass // an empty string for the tables and all tables in the target // keyspace will be checked. func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletProcess, ksName string, tables string) { @@ -430,6 +435,12 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro } } for _, tablet := range tablets { + // Be sure that the schema is up to date. + err := vc.VtctldClient.ExecuteCommand("ReloadSchema", topoproto.TabletAliasString(&topodatapb.TabletAlias{ + Cell: tablet.Cell, + Uid: uint32(tablet.TabletUID), + })) + require.NoError(t, err) for _, table := range tableArr { if schema.IsInternalOperationTableName(table) { continue diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index e853022bfd4..88231d4be87 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" ) func TestMoveTablesBuffering(t *testing.T) { @@ -16,7 +15,7 @@ func TestMoveTablesBuffering(t *testing.T) { vc = setupMinimalCluster(t) defer vc.TearDown() - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables setupMinimalCustomerKeyspace(t) tables := "loadtest" err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs, diff --git a/go/test/endtoend/vreplication/partial_movetables_seq_test.go b/go/test/endtoend/vreplication/partial_movetables_seq_test.go index bb354a5ec01..eec304e0a4d 100644 --- a/go/test/endtoend/vreplication/partial_movetables_seq_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_seq_test.go @@ -27,7 +27,6 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -228,12 +227,12 @@ func (wf *workflow) create() { cell := wf.tc.defaultCellName switch typ { case "movetables": - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables sourceShards := strings.Join(wf.options.sourceShards, ",") err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace, strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", defaultWorkflowExecOptions) case "reshard": - currentWorkflowType = wrangler.ReshardWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard sourceShards := strings.Join(wf.options.sourceShards, ",") targetShards := strings.Join(wf.options.targetShards, ",") if targetShards == "" { @@ -389,7 +388,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { // Switch all traffic for the shard wf80Dash.switchTraffic() - expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", + expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", targetKs, wfName, shard, shard) require.Equal(t, expectedSwitchOutput, lastOutput) currentCustomerCount = getCustomerCount(t, "") @@ -449,7 +448,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { wfDash80.create() wfDash80.switchTraffic() - expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", + expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", targetKs, wfName) require.Equal(t, expectedSwitchOutput, lastOutput) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 0acb922cd30..b14b649d6b7 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -17,12 +17,12 @@ limitations under the License. package vreplication import ( + "encoding/json" "fmt" "net" "strconv" "strings" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -56,7 +56,7 @@ var ( sourceTab, sourceReplicaTab, sourceRdonlyTab *cluster.VttabletProcess lastOutput string - currentWorkflowType wrangler.VReplicationWorkflowType + currentWorkflowType binlogdatapb.VReplicationWorkflowType ) type workflowExecOptions struct { @@ -103,59 +103,64 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, sourceShards, targetShards string, options *workflowExecOptions) error { var args []string - if currentWorkflowType == wrangler.MoveTablesWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { args = append(args, "MoveTables") } else { args = append(args, "Reshard") } - args = append(args, "--") + args = append(args, "--workflow", workflow, "--target-keyspace", targetKs, action) - if BypassLagCheck { - args = append(args, "--max_replication_lag_allowed=2542087h") - } - if options.atomicCopy { - args = append(args, "--atomic-copy") - } switch action { case workflowActionCreate: - if currentWorkflowType == wrangler.MoveTablesWorkflow { - args = append(args, "--source", sourceKs) + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { + args = append(args, "--source-keyspace", sourceKs) if tables != "" { args = append(args, "--tables", tables) } else { - args = append(args, "--all") + args = append(args, "--all-tables") } if sourceShards != "" { - args = append(args, "--source_shards", sourceShards) + args = append(args, "--source-shards", sourceShards) } } else { - args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards) + args = append(args, "--source-shards", sourceShards, "--target-shards", targetShards) } // Test new experimental --defer-secondary-keys flag switch currentWorkflowType { - case wrangler.MoveTablesWorkflow, wrangler.MigrateWorkflow, wrangler.ReshardWorkflow: - + case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard: if !options.atomicCopy && options.deferSecondaryKeys { args = append(args, "--defer-secondary-keys") } - args = append(args, "--initialize-target-sequences") // Only used for MoveTables } default: if options.shardSubset != "" { args = append(args, "--shards", options.shardSubset) } } - if cells != "" { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && action == workflowActionSwitchTraffic { + args = append(args, "--initialize-target-sequences") + } + if action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic { + if BypassLagCheck { + args = append(args, "--max-replication-lag-allowed=2542087h") + } + args = append(args, "--timeout=90s") + } + if action == workflowActionCreate && options.atomicCopy { + args = append(args, "--atomic-copy") + } + if (action == workflowActionCreate || action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic) && cells != "" { args = append(args, "--cells", cells) } - if tabletTypes != "" { - args = append(args, "--tablet_types", tabletTypes) + if action != workflowActionComplete && tabletTypes != "" { + args = append(args, "--tablet-types", tabletTypes) } - args = append(args, "--timeout", time.Minute.String()) - ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) - args = append(args, action, ksWorkflow) - output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...) + args = append(args, "--action_timeout=10m") // At this point something is up so fail the test + if debugMode { + t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " ")) + } + output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...) lastOutput = output if err != nil { return fmt.Errorf("%s: %s", err, output) @@ -285,8 +290,8 @@ func checkStates(t *testing.T, startState, endState string) { require.Contains(t, lastOutput, fmt.Sprintf("Current State: %s", endState)) } -func getCurrentState(t *testing.T) string { - if err := tstWorkflowAction(t, "GetState", "", ""); err != nil { +func getCurrentStatus(t *testing.T) string { + if err := tstWorkflowAction(t, "status", "", ""); err != nil { return err.Error() } return strings.TrimSpace(strings.Trim(lastOutput, "\n")) @@ -335,7 +340,7 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { // at this point the unsharded product and sharded customer keyspaces are created by previous tests // use MoveTables to move customer2 from product to customer using - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables err := tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, "customer2", workflowActionCreate, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) @@ -353,13 +358,13 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() // sanity check - output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "product") require.NoError(t, err) assert.NotContains(t, output, "customer2\"", "customer2 still found in keyspace product") waitForRowCount(t, vtgateConn, "customer", "customer2", 3) // check that customer2 has the sequence tag - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "customer") + output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "customer") require.NoError(t, err) assert.Contains(t, output, "\"sequence\": \"customer_seq2\"", "customer2 sequence missing in keyspace customer") @@ -387,12 +392,12 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { require.NoError(t, err) // sanity check - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "product") require.NoError(t, err) assert.Contains(t, output, "customer2\"", "customer2 not found in keyspace product ") // check that customer2 still has the sequence tag - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "product") require.NoError(t, err) assert.Contains(t, output, "\"sequence\": \"customer_seq2\"", "customer2 still found in keyspace product") @@ -432,7 +437,7 @@ func testReplicatingWithPKEnumCols(t *testing.T) { func testReshardV2Workflow(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() - currentWorkflowType = wrangler.ReshardWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard // create internal tables on the original customer shards that should be // ignored and not show up on the new shards @@ -441,9 +446,6 @@ func testReshardV2Workflow(t *testing.T) { createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-") createReshardWorkflow(t, "-80,80-", "-40,40-80,80-c0,c0-") - if !strings.Contains(lastOutput, "Workflow started successfully") { - t.Fail() - } validateReadsRouteToSource(t, "replica") validateWritesRouteToSource(t) @@ -461,16 +463,14 @@ func testReshardV2Workflow(t *testing.T) { func testMoveTablesV2Workflow(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables // test basic forward and reverse flows setupCustomerKeyspace(t) // The purge table should get skipped/ignored // If it's not then we'll get an error as the table doesn't exist in the vschema createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") - if !strings.Contains(lastOutput, "Workflow started successfully") { - t.Fail() - } + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) validateReadsRouteToSource(t, "replica") validateWritesRouteToSource(t) @@ -485,26 +485,47 @@ func testMoveTablesV2Workflow(t *testing.T) { testRestOfWorkflow(t) - listAllArgs := []string{"workflow", "customer", "listall"} - output, _ := vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "No workflows found in keyspace customer") + listOutputContainsWorkflow := func(output string, workflow string) bool { + workflows := []string{} + err := json.Unmarshal([]byte(output), &workflows) + require.NoError(t, err) + for _, w := range workflows { + if w == workflow { + return true + } + } + return false + } + listOutputIsEmpty := func(output string) bool { + workflows := []string{} + err := json.Unmarshal([]byte(output), &workflows) + require.NoError(t, err) + return len(workflows) == 0 + } + + listAllArgs := []string{"workflow", "--keyspace", "customer", "list"} + output, err := vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.True(t, listOutputIsEmpty(output)) testVSchemaForSequenceAfterMoveTables(t) createMoveTablesWorkflow(t, "Lead,Lead-1") - output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1") + output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.True(t, listOutputContainsWorkflow(output, "wf1")) - err := tstWorkflowCancel(t) + err = tstWorkflowCancel(t) require.NoError(t, err) - output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "No workflows found in keyspace customer") + output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.True(t, listOutputIsEmpty(output)) } func testPartialSwitches(t *testing.T) { // nothing switched - require.Equal(t, getCurrentState(t), wrangler.WorkflowStateNotSwitched) + require.Contains(t, getCurrentStatus(t), wrangler.WorkflowStateNotSwitched) tstWorkflowSwitchReads(t, "replica,rdonly", "zone1") nextState := "Reads partially switched. Replica switched in cells: zone1. Rdonly switched in cells: zone1. Writes Not Switched" checkStates(t, wrangler.WorkflowStateNotSwitched, nextState) @@ -526,7 +547,7 @@ func testPartialSwitches(t *testing.T) { checkStates(t, nextState, nextState) // idempotency keyspace := "product" - if currentWorkflowType == wrangler.ReshardWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_Reshard { keyspace = "customer" } waitForLowLag(t, keyspace, "wf1_reverse") @@ -563,7 +584,7 @@ func testRestOfWorkflow(t *testing.T) { // this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces keyspace := "product" - if currentWorkflowType == wrangler.ReshardWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_Reshard { keyspace = "customer" } waitForLowLag(t, keyspace, "wf1_reverse") @@ -707,8 +728,11 @@ func switchReadsNew(t *testing.T, workflowType, cells, ksWorkflow string, revers if reverse { command = "ReverseTraffic" } - output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, - "--tablet_types=rdonly,replica", command, ksWorkflow) + parts := strings.Split(ksWorkflow, ".") + require.Len(t, parts, 2) + ks, wf := parts[0], parts[1] + output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command, + "--cells", cells, "--tablet-types=rdonly,replica") require.NoError(t, err, fmt.Sprintf("SwitchReads Error: %s: %s", err, output)) if output != "" { fmt.Printf("SwitchReads output: %s\n", output) @@ -824,7 +848,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) { } func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) { - err := vc.VtctlClient.ExecuteCommand("ApplySchema", "--", "--ddl_strategy=online", + err := vc.VtctldClient.ExecuteCommand("ApplySchema", "--ddl-strategy=online", "--sql", sql, keyspace) require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err)) } diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 2d4949b60dc..fb87b21bde5 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) type iWorkflow interface { @@ -117,7 +117,7 @@ func newVtctlMoveTables(mt *moveTablesWorkflow) *VtctlMoveTables { } func (vmt *VtctlMoveTables) Create() { - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables vmt.exec(workflowActionCreate) } @@ -314,7 +314,7 @@ func newVtctlReshard(rs *reshardWorkflow) *VtctlReshard { } func (vrs *VtctlReshard) Create() { - currentWorkflowType = wrangler.ReshardWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard vrs.exec(workflowActionCreate) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index fc215b84c22..af0f16b2138 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3082,13 +3082,16 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } } + cellsStr := strings.Join(req.Cells, ",") + // Consistently handle errors by logging and returning them. handleError := func(message string, err error) (*[]string, error) { - ts.Logger().Error(err) - return nil, err + werr := vterrors.Wrapf(err, message) + ts.Logger().Error(werr) + return nil, werr } - log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, ts.optCells, state.String()) + log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String()) if !switchReplica && !switchRdonly { return handleError("invalid tablet types", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr)) } @@ -3100,14 +3103,6 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")) } } - cells := req.Cells - // If no cells were provided in the command then use the value from the workflow. - if len(cells) == 0 && ts.optCells != "" { - cells = strings.Split(strings.TrimSpace(ts.optCells), ",") - } - for i, cell := range cells { - cells[i] = strings.TrimSpace(cell) - } // If there are no rdonly tablets in the cells ask to switch rdonly tablets as well so that routing rules // are updated for rdonly as well. Otherwise vitess will not know that the workflow has completed and will @@ -3115,7 +3110,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // rdonly tablets. if switchReplica && !switchRdonly { var err error - rdonlyTabletsExist, err := topotools.DoCellsHaveRdonlyTablets(ctx, s.ts, cells) + rdonlyTabletsExist, err := topotools.DoCellsHaveRdonlyTablets(ctx, s.ts, req.Cells) if err != nil { return nil, err } @@ -3153,20 +3148,20 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { if ts.isPartialMigration { ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.") - } else if err := sw.switchTableReads(ctx, cells, roTabletTypes, direction); err != nil { + } else if err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the tables", err) } return sw.logs(), nil } - ts.Logger().Infof("About to switchShardReads: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction) - if err := sw.switchShardReads(ctx, cells, roTabletTypes, direction); err != nil { + ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) + if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the shards", err) } - ts.Logger().Infof("switchShardReads Completed: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction) - if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, strings.Join(cells, ",")); err != nil { + ts.Logger().Infof("switchShardReads Completed: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) + if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.targetKeyspace, strings.Join(cells, ",")) + ts.targetKeyspace, cellsStr) return handleError("failed to validate SrvKeyspace record", err2) } return sw.logs(), nil @@ -3185,7 +3180,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // Consistently handle errors by logging and returning them. handleError := func(message string, err error) (int64, *[]string, error) { - werr := vterrors.Errorf(vtrpcpb.Code_INTERNAL, fmt.Sprintf("%s: %v", message, err)) + werr := vterrors.Wrapf(err, message) ts.Logger().Error(werr) return 0, nil, werr } diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index c8551c5ff73..2f6e0659025 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -547,15 +547,12 @@ func (ts *trafficSwitcher) dropSourceShards(ctx context.Context) error { } func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { - var fromShards, toShards []*topo.ShardInfo - if direction == DirectionForward { - fromShards, toShards = ts.SourceShards(), ts.TargetShards() - } else { - fromShards, toShards = ts.TargetShards(), ts.SourceShards() - } - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), strings.Join(cells, ",")); err != nil { + cellsStr := strings.Join(cells, ",") + log.Infof("switchShardReads: cells: %s, tablet types: %+v, direction %d", cellsStr, servedTypes, direction) + fromShards, toShards := ts.SourceShards(), ts.TargetShards() + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.TargetKeyspaceName(), strings.Join(cells, ",")) + ts.TargetKeyspaceName(), cellsStr) log.Errorf("%w", err2) return err2 } @@ -571,9 +568,9 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, return err } } - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), strings.Join(cells, ",")); err != nil { + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.TargetKeyspaceName(), strings.Join(cells, ",")) + ts.TargetKeyspaceName(), cellsStr) log.Errorf("%w", err2) return err2 } @@ -581,7 +578,7 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, } func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { - log.Infof("switchTableReads: servedTypes: %+v, direction %t", servedTypes, direction) + log.Infof("switchTableReads: cells: %s, tablet types: %+v, direction %d", strings.Join(cells, ","), servedTypes, direction) rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err @@ -939,7 +936,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati }); err != nil { return err } - // all targets have caught up, record their positions for setting up reverse workflows + // All targets have caught up, record their positions for setting up reverse workflows. return ts.ForAllTargets(func(target *MigrationTarget) error { var err error target.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, target.GetPrimary().Tablet) @@ -1005,7 +1002,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { - ts.Logger().Errorf("Cancel migration failed:", err) + ts.Logger().Errorf("Cancel migration failed: %v", err) } sm.CancelStreamMigrations(ctx) @@ -1369,7 +1366,7 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa for _, table := range ts.Tables() { vs, ok := vschema.Tables[table] - if !ok || vs == nil || vs.AutoIncrement == nil || vs.AutoIncrement.Sequence == "" { + if !ok || vs.GetAutoIncrement().GetSequence() == "" { continue } sm := &sequenceMetadata{ @@ -1411,7 +1408,7 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa // the primary tablet serving the sequence to refresh/reset its cache to // be sure that it does not provide a value that is less than the current max. func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata) error { - initSequenceTable := func(ictx context.Context, sequenceTableName string, sequenceMetadata *sequenceMetadata) error { + initSequenceTable := func(ictx context.Context, sequenceMetadata *sequenceMetadata) error { // Now we need to run this query on the target shards in order // to get the max value and set the next id for the sequence to // a higher value. @@ -1458,6 +1455,10 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen return ictx.Err() default: } + if len(shardResults) == 0 { // This should never happen + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get any results for the max used sequence value for target table %s.%s in order to initialize the backing sequence table", + ts.targetKeyspace, sequenceMetadata.usingTableName) + } // Sort the values to find the max value across all shards. sort.Slice(shardResults, func(i, j int) bool { return shardResults[i] < shardResults[j] @@ -1492,12 +1493,7 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen ) // Now execute this on the primary tablet of the unsharded keyspace // housing the backing table. - primaryTablet, ierr := ts.TopoServer().GetTablet(ictx, sequenceShard.PrimaryAlias) - if ierr != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for %s.%s using alias %s: %v", - sequenceShard.Keyspace(), sequenceShard.ShardName(), sequenceShard.PrimaryAlias, ierr) - } - qr, ierr := ts.ws.tmc.ExecuteFetchAsApp(ictx, primaryTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ + qr, ierr := ts.ws.tmc.ExecuteFetchAsApp(ictx, sequenceTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ Query: []byte(query.Query), MaxRows: 1, }) @@ -1532,10 +1528,9 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen } initGroup, gctx := errgroup.WithContext(ctx) - for sequenceTableName, sequenceMetadata := range sequencesByBackingTable { - sequenceTableName, sequenceMetadata := sequenceTableName, sequenceMetadata // https://golang.org/doc/faq#closures_and_goroutines + for _, sequenceMetadata := range sequencesByBackingTable { initGroup.Go(func() error { - return initSequenceTable(gctx, sequenceTableName, sequenceMetadata) + return initSequenceTable(gctx, sequenceMetadata) }) } return initGroup.Wait() diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index 0319146321a..02c2eb10056 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -63,7 +63,7 @@ func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *ta } // Use the local cell if none are specified. if len(req.Cells) == 0 || strings.TrimSpace(req.Cells[0]) == "" { - req.Cells = append(req.Cells, tm.Tablet().Alias.Cell) + req.Cells = []string{tm.Tablet().Alias.Cell} } wfState := binlogdatapb.VReplicationWorkflowState_Stopped.String() tabletTypesStr := topoproto.MakeStringTypeCSV(req.TabletTypes) diff --git a/test/config.json b/test/config.json index 14c05cb8df6..7effb7b396f 100644 --- a/test/config.json +++ b/test/config.json @@ -1085,15 +1085,6 @@ "RetryMax": 1, "Tags": [] }, - "vreplication_vtctldclient_cli": { - "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldclientCLI", "-timeout", "20m"], - "Command": [], - "Manual": false, - "Shard": "vreplication_basic", - "RetryMax": 1, - "Tags": [] - }, "vreplication_copy_parallel": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVreplicationCopyParallel", "-timeout", "20m"], @@ -1247,12 +1238,21 @@ "RetryMax": 1, "Tags": [] }, + "vreplication_vtctldclient_cli": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldclientCLI", "-timeout", "20m"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", + "RetryMax": 1, + "Tags": [] + }, "vreplication_vtctl_migrate": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctlMigrate", "-timeout", "30m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1261,7 +1261,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldMigrate", "-timeout", "30m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1270,7 +1270,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVDiff2", "-timeout", "30m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1279,7 +1279,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesTZ"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] },