From 1d49744d4d4ca0f609c8dff5625b4f71c5e09fc6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 26 Mar 2024 18:02:21 -0400 Subject: [PATCH 1/8] Move the Reshard v2 workflow to vtctldclient And get the tests passing again with minor fixes. Signed-off-by: Matt Lord --- .../vreplication/common/switchtraffic.go | 1 + go/test/endtoend/vreplication/fk_test.go | 2 +- go/test/endtoend/vreplication/helper_test.go | 10 +- .../vreplication/movetables_buffering_test.go | 3 +- .../partial_movetables_seq_test.go | 9 +- .../resharding_workflows_v2_test.go | 93 ++++++++++--------- .../endtoend/vreplication/wrappers_test.go | 6 +- go/vt/vtctl/workflow/server.go | 57 +++++++----- go/vt/vtctl/workflow/traffic_switcher.go | 43 ++++----- .../tabletmanager/rpc_vreplication.go | 2 +- test/config.json | 26 +++--- 11 files changed, 133 insertions(+), 119 deletions(-) diff --git a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go index 019367fe82b..4004afc0ac0 100644 --- a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go +++ b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go @@ -91,6 +91,7 @@ func commandSwitchTraffic(cmd *cobra.Command, args []string) error { req := &vtctldatapb.WorkflowSwitchTrafficRequest{ Keyspace: BaseOptions.TargetKeyspace, Workflow: BaseOptions.Workflow, + Cells: SwitchTrafficOptions.Cells, TabletTypes: SwitchTrafficOptions.TabletTypes, MaxReplicationLagAllowed: protoutil.DurationToProto(SwitchTrafficOptions.MaxReplicationLagAllowed), Timeout: protoutil.DurationToProto(SwitchTrafficOptions.Timeout), diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index a75b6727fdc..189857b71ee 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -34,7 +34,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) -const testWorkflowFlavor = workflowFlavorRandom +const testWorkflowFlavor = workflowFlavorVtctld // TestFKWorkflow runs a MoveTables workflow with atomic copy for a db with foreign key constraints. // It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without, diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 022262d4b15..b4b31836af3 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -46,9 +46,11 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/proto/topodata" ) const ( @@ -410,7 +412,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa // as a CSV have secondary keys. This is useful when testing the // --defer-secondary-keys flag to confirm that the secondary keys // were re-added by the time the workflow hits the running phase. -// For a Reshard workflow, where no tables are specififed, pass +// For a Reshard workflow, where no tables are specified, pass // an empty string for the tables and all tables in the target // keyspace will be checked. func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletProcess, ksName string, tables string) { @@ -430,6 +432,12 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro } } for _, tablet := range tablets { + // Be sure that the schema is up to date. + err := vc.VtctldClient.ExecuteCommand("ReloadSchema", topoproto.TabletAliasString(&topodata.TabletAlias{ + Cell: tablet.Cell, + Uid: uint32(tablet.TabletUID), + })) + require.NoError(t, err) for _, table := range tableArr { if schema.IsInternalOperationTableName(table) { continue diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index e853022bfd4..88231d4be87 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" ) func TestMoveTablesBuffering(t *testing.T) { @@ -16,7 +15,7 @@ func TestMoveTablesBuffering(t *testing.T) { vc = setupMinimalCluster(t) defer vc.TearDown() - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables setupMinimalCustomerKeyspace(t) tables := "loadtest" err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs, diff --git a/go/test/endtoend/vreplication/partial_movetables_seq_test.go b/go/test/endtoend/vreplication/partial_movetables_seq_test.go index bb354a5ec01..eec304e0a4d 100644 --- a/go/test/endtoend/vreplication/partial_movetables_seq_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_seq_test.go @@ -27,7 +27,6 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -228,12 +227,12 @@ func (wf *workflow) create() { cell := wf.tc.defaultCellName switch typ { case "movetables": - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables sourceShards := strings.Join(wf.options.sourceShards, ",") err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace, strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", defaultWorkflowExecOptions) case "reshard": - currentWorkflowType = wrangler.ReshardWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard sourceShards := strings.Join(wf.options.sourceShards, ",") targetShards := strings.Join(wf.options.targetShards, ",") if targetShards == "" { @@ -389,7 +388,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { // Switch all traffic for the shard wf80Dash.switchTraffic() - expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", + expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", targetKs, wfName, shard, shard) require.Equal(t, expectedSwitchOutput, lastOutput) currentCustomerCount = getCustomerCount(t, "") @@ -449,7 +448,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { wfDash80.create() wfDash80.switchTraffic() - expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", + expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", targetKs, wfName) require.Equal(t, expectedSwitchOutput, lastOutput) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 0acb922cd30..956710edd8a 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -56,7 +56,7 @@ var ( sourceTab, sourceReplicaTab, sourceRdonlyTab *cluster.VttabletProcess lastOutput string - currentWorkflowType wrangler.VReplicationWorkflowType + currentWorkflowType binlogdatapb.VReplicationWorkflowType ) type workflowExecOptions struct { @@ -103,59 +103,62 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, sourceShards, targetShards string, options *workflowExecOptions) error { var args []string - if currentWorkflowType == wrangler.MoveTablesWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { args = append(args, "MoveTables") } else { args = append(args, "Reshard") } - args = append(args, "--") + args = append(args, "--workflow", workflow, "--target-keyspace", targetKs, action) - if BypassLagCheck { - args = append(args, "--max_replication_lag_allowed=2542087h") - } - if options.atomicCopy { - args = append(args, "--atomic-copy") - } switch action { case workflowActionCreate: - if currentWorkflowType == wrangler.MoveTablesWorkflow { - args = append(args, "--source", sourceKs) + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { + args = append(args, "--source-keyspace", sourceKs) if tables != "" { args = append(args, "--tables", tables) } else { - args = append(args, "--all") + args = append(args, "--all-tables") } if sourceShards != "" { - args = append(args, "--source_shards", sourceShards) + args = append(args, "--source-shards", sourceShards) } } else { - args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards) + args = append(args, "--source-shards", sourceShards, "--target-shards", targetShards) } // Test new experimental --defer-secondary-keys flag switch currentWorkflowType { - case wrangler.MoveTablesWorkflow, wrangler.MigrateWorkflow, wrangler.ReshardWorkflow: - + case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard: if !options.atomicCopy && options.deferSecondaryKeys { args = append(args, "--defer-secondary-keys") } - args = append(args, "--initialize-target-sequences") // Only used for MoveTables } default: if options.shardSubset != "" { args = append(args, "--shards", options.shardSubset) } } - if cells != "" { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && action == workflowActionSwitchTraffic { + args = append(args, "--initialize-target-sequences") + } + if action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic { + if BypassLagCheck { + args = append(args, "--max-replication-lag-allowed=2542087h") + } + args = append(args, "--timeout", time.Minute.String()) + } + if action == workflowActionCreate && options.atomicCopy { + args = append(args, "--atomic-copy") + } + if (action == workflowActionCreate || action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic) && cells != "" { args = append(args, "--cells", cells) } - if tabletTypes != "" { - args = append(args, "--tablet_types", tabletTypes) + if action != workflowActionComplete && tabletTypes != "" { + args = append(args, "--tablet-types", tabletTypes) } - args = append(args, "--timeout", time.Minute.String()) - ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) - args = append(args, action, ksWorkflow) - output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...) + args = append(args, "--action_timeout=2m") + t.Logf("Executing workflow command: vtctldclient %v", args) + output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...) lastOutput = output if err != nil { return fmt.Errorf("%s: %s", err, output) @@ -285,8 +288,8 @@ func checkStates(t *testing.T, startState, endState string) { require.Contains(t, lastOutput, fmt.Sprintf("Current State: %s", endState)) } -func getCurrentState(t *testing.T) string { - if err := tstWorkflowAction(t, "GetState", "", ""); err != nil { +func getCurrentStatus(t *testing.T) string { + if err := tstWorkflowAction(t, "status", "", ""); err != nil { return err.Error() } return strings.TrimSpace(strings.Trim(lastOutput, "\n")) @@ -335,7 +338,7 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { // at this point the unsharded product and sharded customer keyspaces are created by previous tests // use MoveTables to move customer2 from product to customer using - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables err := tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, "customer2", workflowActionCreate, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) @@ -432,7 +435,7 @@ func testReplicatingWithPKEnumCols(t *testing.T) { func testReshardV2Workflow(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() - currentWorkflowType = wrangler.ReshardWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard // create internal tables on the original customer shards that should be // ignored and not show up on the new shards @@ -441,7 +444,7 @@ func testReshardV2Workflow(t *testing.T) { createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-") createReshardWorkflow(t, "-80,80-", "-40,40-80,80-c0,c0-") - if !strings.Contains(lastOutput, "Workflow started successfully") { + if !strings.Contains(lastOutput, "Status: Running") { t.Fail() } validateReadsRouteToSource(t, "replica") @@ -461,16 +464,15 @@ func testReshardV2Workflow(t *testing.T) { func testMoveTablesV2Workflow(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables // test basic forward and reverse flows setupCustomerKeyspace(t) // The purge table should get skipped/ignored // If it's not then we'll get an error as the table doesn't exist in the vschema createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") - if !strings.Contains(lastOutput, "Workflow started successfully") { - t.Fail() - } + require.Contains(t, lastOutput, "Status: Running") + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) validateReadsRouteToSource(t, "replica") validateWritesRouteToSource(t) @@ -485,26 +487,29 @@ func testMoveTablesV2Workflow(t *testing.T) { testRestOfWorkflow(t) - listAllArgs := []string{"workflow", "customer", "listall"} - output, _ := vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "No workflows found in keyspace customer") + listAllArgs := []string{"workflow", "--keyspace", "customer", "list"} + output, err := vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.Equal(t, output, "[]") testVSchemaForSequenceAfterMoveTables(t) createMoveTablesWorkflow(t, "Lead,Lead-1") - output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1") + output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.Contains(t, output, "wf1") - err := tstWorkflowCancel(t) + err = tstWorkflowCancel(t) require.NoError(t, err) - output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "No workflows found in keyspace customer") + output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.Equal(t, output, "[]") } func testPartialSwitches(t *testing.T) { // nothing switched - require.Equal(t, getCurrentState(t), wrangler.WorkflowStateNotSwitched) + require.Contains(t, getCurrentStatus(t), wrangler.WorkflowStateNotSwitched) tstWorkflowSwitchReads(t, "replica,rdonly", "zone1") nextState := "Reads partially switched. Replica switched in cells: zone1. Rdonly switched in cells: zone1. Writes Not Switched" checkStates(t, wrangler.WorkflowStateNotSwitched, nextState) @@ -526,7 +531,7 @@ func testPartialSwitches(t *testing.T) { checkStates(t, nextState, nextState) // idempotency keyspace := "product" - if currentWorkflowType == wrangler.ReshardWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_Reshard { keyspace = "customer" } waitForLowLag(t, keyspace, "wf1_reverse") @@ -563,7 +568,7 @@ func testRestOfWorkflow(t *testing.T) { // this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces keyspace := "product" - if currentWorkflowType == wrangler.ReshardWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_Reshard { keyspace = "customer" } waitForLowLag(t, keyspace, "wf1_reverse") diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 5c7e01aa155..407fe12c3ed 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) type iWorkflow interface { @@ -117,7 +117,7 @@ func newVtctlMoveTables(mt *moveTablesWorkflow) *VtctlMoveTables { } func (vmt *VtctlMoveTables) Create() { - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables vmt.exec(workflowActionCreate) } @@ -314,7 +314,7 @@ func newVtctlReshard(rs *reshardWorkflow) *VtctlReshard { } func (vrs *VtctlReshard) Create() { - currentWorkflowType = wrangler.ReshardWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard vrs.exec(workflowActionCreate) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index f63fb69dea7..0aa9a84d569 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3005,13 +3005,15 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } } + cellsStr := strings.Join(req.Cells, ",") + // Consistently handle errors by logging and returning them. handleError := func(message string, err error) (*[]string, error) { - ts.Logger().Error(err) + ts.Logger().Errorf("%s: %v", message, err) return nil, err } - log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, ts.optCells, state.String()) + log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String()) if !switchReplica && !switchRdonly { return handleError("invalid tablet types", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr)) } @@ -3023,14 +3025,6 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")) } } - cells := req.Cells - // If no cells were provided in the command then use the value from the workflow. - if len(cells) == 0 && ts.optCells != "" { - cells = strings.Split(strings.TrimSpace(ts.optCells), ",") - } - for i, cell := range cells { - cells[i] = strings.TrimSpace(cell) - } // If there are no rdonly tablets in the cells ask to switch rdonly tablets as well so that routing rules // are updated for rdonly as well. Otherwise vitess will not know that the workflow has completed and will @@ -3038,7 +3032,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // rdonly tablets. if switchReplica && !switchRdonly { var err error - rdonlyTabletsExist, err := topotools.DoCellsHaveRdonlyTablets(ctx, s.ts, cells) + rdonlyTabletsExist, err := topotools.DoCellsHaveRdonlyTablets(ctx, s.ts, req.Cells) if err != nil { return nil, err } @@ -3076,20 +3070,20 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { if ts.isPartialMigration { ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.") - } else if err := sw.switchTableReads(ctx, cells, roTabletTypes, direction); err != nil { + } else if err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the tables", err) } return sw.logs(), nil } - ts.Logger().Infof("About to switchShardReads: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction) - if err := sw.switchShardReads(ctx, cells, roTabletTypes, direction); err != nil { + ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) + if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the shards", err) } - ts.Logger().Infof("switchShardReads Completed: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction) - if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, strings.Join(cells, ",")); err != nil { + ts.Logger().Infof("switchShardReads Completed: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) + if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.targetKeyspace, strings.Join(cells, ",")) + ts.targetKeyspace, cellsStr) return handleError("failed to validate SrvKeyspace record", err2) } return sw.logs(), nil @@ -3108,7 +3102,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // Consistently handle errors by logging and returning them. handleError := func(message string, err error) (int64, *[]string, error) { - werr := vterrors.Errorf(vtrpcpb.Code_INTERNAL, fmt.Sprintf("%s: %v", message, err)) + werr := vterrors.Wrapf(err, message) ts.Logger().Error(werr) return 0, nil, werr } @@ -3175,8 +3169,27 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return 0, sw.logs(), nil } + // We stop writes on the source before stopping the streams so that the catchup + // time is lessened and other workflows that we have to migrate such as + // materialize workflows that are within a single keyspace (source and target) + // also have a chance to catch up as well as those are internally generated + // GTIDs within the shard. For materialization streams that we migrate where + // the source and target are the keyspace being resharded, we wait for those + // to catchup in the stopStreams path before we actually stop them. + ts.Logger().Infof("Stopping source writes") + if err := sw.stopSourceWrites(ctx); err != nil { + sw.cancelMigration(ctx, sm) + return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) + } + ts.Logger().Infof("Stopping streams") - sourceWorkflows, err = sw.stopStreams(ctx, sm) + // Use a shorter context for this since since when doing a Reshard, if there are intra keyspace + // materializations then we have to wait for them to catchup before switching traffic for the + // Reshard workflow. We use the timeout where which is used to limit the amount of time that we + // wait for VReplication to catch up. + stopCtx, stopCancel := context.WithTimeout(ctx, timeout) + defer stopCancel() + sourceWorkflows, err = sw.stopStreams(stopCtx, sm) if err != nil { for key, streams := range sm.Streams() { for _, stream := range streams { @@ -3187,12 +3200,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError("failed to stop the workflow streams", err) } - ts.Logger().Infof("Stopping source writes") - if err := sw.stopSourceWrites(ctx); err != nil { - sw.cancelMigration(ctx, sm) - return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) - } - if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { ts.Logger().Infof("Executing LOCK TABLES on source tables %d times", lockTablesCycles) // Doing this twice with a pause in-between to catch any writes that may have raced in between diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index c8551c5ff73..12a15f4147b 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -547,15 +547,12 @@ func (ts *trafficSwitcher) dropSourceShards(ctx context.Context) error { } func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { - var fromShards, toShards []*topo.ShardInfo - if direction == DirectionForward { - fromShards, toShards = ts.SourceShards(), ts.TargetShards() - } else { - fromShards, toShards = ts.TargetShards(), ts.SourceShards() - } - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), strings.Join(cells, ",")); err != nil { + cellsStr := strings.Join(cells, ",") + log.Infof("switchShardReads: cells: %s, tablet types: %+v, direction %d", cellsStr, servedTypes, direction) + fromShards, toShards := ts.SourceShards(), ts.TargetShards() + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.TargetKeyspaceName(), strings.Join(cells, ",")) + ts.TargetKeyspaceName(), cellsStr) log.Errorf("%w", err2) return err2 } @@ -571,9 +568,9 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, return err } } - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), strings.Join(cells, ",")); err != nil { + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.TargetKeyspaceName(), strings.Join(cells, ",")) + ts.TargetKeyspaceName(), cellsStr) log.Errorf("%w", err2) return err2 } @@ -581,7 +578,7 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, } func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { - log.Infof("switchTableReads: servedTypes: %+v, direction %t", servedTypes, direction) + log.Infof("switchTableReads: cells: %s, tablet types: %+v, direction %d", strings.Join(cells, ","), servedTypes, direction) rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err @@ -939,7 +936,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati }); err != nil { return err } - // all targets have caught up, record their positions for setting up reverse workflows + // All targets have caught up, record their positions for setting up reverse workflows. return ts.ForAllTargets(func(target *MigrationTarget) error { var err error target.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, target.GetPrimary().Tablet) @@ -1005,7 +1002,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { - ts.Logger().Errorf("Cancel migration failed:", err) + ts.Logger().Errorf("Cancel migration failed: %v", err) } sm.CancelStreamMigrations(ctx) @@ -1369,7 +1366,7 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa for _, table := range ts.Tables() { vs, ok := vschema.Tables[table] - if !ok || vs == nil || vs.AutoIncrement == nil || vs.AutoIncrement.Sequence == "" { + if !ok || vs.GetAutoIncrement() == nil || vs.GetAutoIncrement().GetSequence() == "" { continue } sm := &sequenceMetadata{ @@ -1411,7 +1408,7 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa // the primary tablet serving the sequence to refresh/reset its cache to // be sure that it does not provide a value that is less than the current max. func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata) error { - initSequenceTable := func(ictx context.Context, sequenceTableName string, sequenceMetadata *sequenceMetadata) error { + initSequenceTable := func(ictx context.Context, sequenceMetadata *sequenceMetadata) error { // Now we need to run this query on the target shards in order // to get the max value and set the next id for the sequence to // a higher value. @@ -1458,6 +1455,10 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen return ictx.Err() default: } + if len(shardResults) == 0 { // This should never happen + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get any results for the max used sequence value for target table %s.%s in order to initialize the backing sequence table", + ts.targetKeyspace, sequenceMetadata.usingTableName) + } // Sort the values to find the max value across all shards. sort.Slice(shardResults, func(i, j int) bool { return shardResults[i] < shardResults[j] @@ -1492,12 +1493,7 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen ) // Now execute this on the primary tablet of the unsharded keyspace // housing the backing table. - primaryTablet, ierr := ts.TopoServer().GetTablet(ictx, sequenceShard.PrimaryAlias) - if ierr != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for %s.%s using alias %s: %v", - sequenceShard.Keyspace(), sequenceShard.ShardName(), sequenceShard.PrimaryAlias, ierr) - } - qr, ierr := ts.ws.tmc.ExecuteFetchAsApp(ictx, primaryTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ + qr, ierr := ts.ws.tmc.ExecuteFetchAsApp(ictx, sequenceTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ Query: []byte(query.Query), MaxRows: 1, }) @@ -1532,10 +1528,9 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen } initGroup, gctx := errgroup.WithContext(ctx) - for sequenceTableName, sequenceMetadata := range sequencesByBackingTable { - sequenceTableName, sequenceMetadata := sequenceTableName, sequenceMetadata // https://golang.org/doc/faq#closures_and_goroutines + for _, sequenceMetadata := range sequencesByBackingTable { initGroup.Go(func() error { - return initSequenceTable(gctx, sequenceTableName, sequenceMetadata) + return initSequenceTable(gctx, sequenceMetadata) }) } return initGroup.Wait() diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index 1489d6eccb7..de4a9c21c8d 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -79,7 +79,7 @@ func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *ta } // Use the local cell if none are specified. if len(req.Cells) == 0 || strings.TrimSpace(req.Cells[0]) == "" { - req.Cells = append(req.Cells, tm.Tablet().Alias.Cell) + req.Cells = []string{tm.Tablet().Alias.Cell} } wfState := binlogdatapb.VReplicationWorkflowState_Stopped.String() tabletTypesStr := topoproto.MakeStringTypeCSV(req.TabletTypes) diff --git a/test/config.json b/test/config.json index 7d429ce6978..9bbab938f77 100644 --- a/test/config.json +++ b/test/config.json @@ -1085,15 +1085,6 @@ "RetryMax": 1, "Tags": [] }, - "vreplication_vtctldclient_cli": { - "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldclientCLI", "-timeout", "20m"], - "Command": [], - "Manual": false, - "Shard": "vreplication_basic", - "RetryMax": 1, - "Tags": [] - }, "vreplication_copy_parallel": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVreplicationCopyParallel", "-timeout", "20m"], @@ -1256,12 +1247,21 @@ "RetryMax": 1, "Tags": [] }, + "vreplication_vtctldclient_cli": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldclientCLI", "-timeout", "20m"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", + "RetryMax": 1, + "Tags": [] + }, "vreplication_vtctl_migrate": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctlMigrate", "-timeout", "30m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1270,7 +1270,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldMigrate", "-timeout", "30m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1279,7 +1279,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVDiff2", "-timeout", "30m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1288,7 +1288,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesTZ"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, From c9655b24c9cb1f69dbd1615519686c98fb970f30 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 26 Mar 2024 18:12:57 -0400 Subject: [PATCH 2/8] Only print command in debug mode Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/resharding_workflows_v2_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 956710edd8a..888de7f701d 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -157,7 +157,9 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, args = append(args, "--tablet-types", tabletTypes) } args = append(args, "--action_timeout=2m") - t.Logf("Executing workflow command: vtctldclient %v", args) + if debugMode { + t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " ")) + } output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...) lastOutput = output if err != nil { From 30430117f5379e391fdd1f8ddcfcedd0d50deb1c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 26 Mar 2024 19:48:53 -0400 Subject: [PATCH 3/8] More porting and fixing Signed-off-by: Matt Lord --- go/test/endtoend/cluster/vttablet_process.go | 1 + .../resharding_workflows_v2_test.go | 42 ++++++++++++++----- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index c98ed37afc0..bd016b1c1c4 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -690,6 +690,7 @@ func VttabletProcessInstance(port, grpcPort, tabletUID int, cell, shard, keyspac Binary: "vttablet", FileToLogQueries: path.Join(tmpDirectory, fmt.Sprintf("/vt_%010d_querylog.txt", tabletUID)), Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)), + Cell: cell, TabletPath: fmt.Sprintf("%s-%010d", cell, tabletUID), ServiceMap: "grpc-queryservice,grpc-tabletmanager,grpc-updatestream,grpc-throttler", LogDir: tmpDirectory, diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 888de7f701d..73dda1801e5 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -17,6 +17,7 @@ limitations under the License. package vreplication import ( + "encoding/json" "fmt" "net" "strconv" @@ -358,13 +359,13 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() // sanity check - output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "product") require.NoError(t, err) assert.NotContains(t, output, "customer2\"", "customer2 still found in keyspace product") waitForRowCount(t, vtgateConn, "customer", "customer2", 3) // check that customer2 has the sequence tag - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "customer") + output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "customer") require.NoError(t, err) assert.Contains(t, output, "\"sequence\": \"customer_seq2\"", "customer2 sequence missing in keyspace customer") @@ -392,12 +393,12 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { require.NoError(t, err) // sanity check - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "product") require.NoError(t, err) assert.Contains(t, output, "customer2\"", "customer2 not found in keyspace product ") // check that customer2 still has the sequence tag - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "product") require.NoError(t, err) assert.Contains(t, output, "\"sequence\": \"customer_seq2\"", "customer2 still found in keyspace product") @@ -489,24 +490,42 @@ func testMoveTablesV2Workflow(t *testing.T) { testRestOfWorkflow(t) + listOutputContainsWorkflow := func(output string, workflow string) bool { + workflows := []string{} + err := json.Unmarshal([]byte(output), &workflows) + require.NoError(t, err) + for _, w := range workflows { + if w == workflow { + return true + } + } + return false + } + listOutputIsEmpty := func(output string) bool { + workflows := []string{} + err := json.Unmarshal([]byte(output), &workflows) + require.NoError(t, err) + return len(workflows) == 0 + } + listAllArgs := []string{"workflow", "--keyspace", "customer", "list"} output, err := vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) require.NoError(t, err) - require.Equal(t, output, "[]") + require.True(t, listOutputIsEmpty(output)) testVSchemaForSequenceAfterMoveTables(t) createMoveTablesWorkflow(t, "Lead,Lead-1") output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) require.NoError(t, err) - require.Contains(t, output, "wf1") + require.True(t, listOutputContainsWorkflow(output, "wf1")) err = tstWorkflowCancel(t) require.NoError(t, err) output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) require.NoError(t, err) - require.Equal(t, output, "[]") + require.True(t, listOutputIsEmpty(output)) } func testPartialSwitches(t *testing.T) { @@ -714,8 +733,11 @@ func switchReadsNew(t *testing.T, workflowType, cells, ksWorkflow string, revers if reverse { command = "ReverseTraffic" } - output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, - "--tablet_types=rdonly,replica", command, ksWorkflow) + parts := strings.Split(ksWorkflow, ".") + require.Len(t, parts, 2) + ks, wf := parts[0], parts[1] + output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command, + "--cells", cells, "--tablet-types=rdonly,replica") require.NoError(t, err, fmt.Sprintf("SwitchReads Error: %s: %s", err, output)) if output != "" { fmt.Printf("SwitchReads output: %s\n", output) @@ -831,7 +853,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) { } func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) { - err := vc.VtctlClient.ExecuteCommand("ApplySchema", "--", "--ddl_strategy=online", + err := vc.VtctldClient.ExecuteCommand("ApplySchema", "--ddl-strategy=online", "--sql", sql, keyspace) require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err)) } From 2f066de034412171b102e405a4bbb9002cd5df8c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 26 Mar 2024 20:18:23 -0400 Subject: [PATCH 4/8] Changes after self review Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/helper_test.go | 4 +-- .../resharding_workflows_v2_test.go | 5 ++- go/vt/vtctl/workflow/server.go | 32 ++++++------------- go/vt/vtctl/workflow/traffic_switcher.go | 2 +- 4 files changed, 15 insertions(+), 28 deletions(-) diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index b4b31836af3..19a4f731ec6 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -50,7 +50,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - "vitess.io/vitess/go/vt/proto/topodata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) const ( @@ -433,7 +433,7 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro } for _, tablet := range tablets { // Be sure that the schema is up to date. - err := vc.VtctldClient.ExecuteCommand("ReloadSchema", topoproto.TabletAliasString(&topodata.TabletAlias{ + err := vc.VtctldClient.ExecuteCommand("ReloadSchema", topoproto.TabletAliasString(&topodatapb.TabletAlias{ Cell: tablet.Cell, Uid: uint32(tablet.TabletUID), })) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 73dda1801e5..f84f8d09da5 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -23,7 +23,6 @@ import ( "strconv" "strings" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -146,7 +145,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, if BypassLagCheck { args = append(args, "--max-replication-lag-allowed=2542087h") } - args = append(args, "--timeout", time.Minute.String()) + args = append(args, "--timeout=1m") } if action == workflowActionCreate && options.atomicCopy { args = append(args, "--atomic-copy") @@ -157,7 +156,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, if action != workflowActionComplete && tabletTypes != "" { args = append(args, "--tablet-types", tabletTypes) } - args = append(args, "--action_timeout=2m") + args = append(args, "--action_timeout=10m") // At this point something is up so fail the test if debugMode { t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " ")) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 0aa9a84d569..c29cd3228db 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3009,8 +3009,9 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // Consistently handle errors by logging and returning them. handleError := func(message string, err error) (*[]string, error) { - ts.Logger().Errorf("%s: %v", message, err) - return nil, err + werr := vterrors.Wrapf(err, message) + ts.Logger().Error(werr) + return nil, werr } log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String()) @@ -3169,27 +3170,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return 0, sw.logs(), nil } - // We stop writes on the source before stopping the streams so that the catchup - // time is lessened and other workflows that we have to migrate such as - // materialize workflows that are within a single keyspace (source and target) - // also have a chance to catch up as well as those are internally generated - // GTIDs within the shard. For materialization streams that we migrate where - // the source and target are the keyspace being resharded, we wait for those - // to catchup in the stopStreams path before we actually stop them. - ts.Logger().Infof("Stopping source writes") - if err := sw.stopSourceWrites(ctx); err != nil { - sw.cancelMigration(ctx, sm) - return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) - } - ts.Logger().Infof("Stopping streams") - // Use a shorter context for this since since when doing a Reshard, if there are intra keyspace - // materializations then we have to wait for them to catchup before switching traffic for the - // Reshard workflow. We use the timeout where which is used to limit the amount of time that we - // wait for VReplication to catch up. - stopCtx, stopCancel := context.WithTimeout(ctx, timeout) - defer stopCancel() - sourceWorkflows, err = sw.stopStreams(stopCtx, sm) + sourceWorkflows, err = sw.stopStreams(ctx, sm) if err != nil { for key, streams := range sm.Streams() { for _, stream := range streams { @@ -3200,6 +3182,12 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError("failed to stop the workflow streams", err) } + ts.Logger().Infof("Stopping source writes") + if err := sw.stopSourceWrites(ctx); err != nil { + sw.cancelMigration(ctx, sm) + return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) + } + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { ts.Logger().Infof("Executing LOCK TABLES on source tables %d times", lockTablesCycles) // Doing this twice with a pause in-between to catch any writes that may have raced in between diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 12a15f4147b..2f6e0659025 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1366,7 +1366,7 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa for _, table := range ts.Tables() { vs, ok := vschema.Tables[table] - if !ok || vs.GetAutoIncrement() == nil || vs.GetAutoIncrement().GetSequence() == "" { + if !ok || vs.GetAutoIncrement().GetSequence() == "" { continue } sm := &sequenceMetadata{ From d65c3453160d9a2163b03d511f786296a1d34e03 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 26 Mar 2024 22:58:21 -0400 Subject: [PATCH 5/8] Debug logging for flake hunting Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/helper_test.go | 4 ++++ go/test/endtoend/vreplication/resharding_workflows_v2_test.go | 4 ---- go/vt/vtctl/workflow/server.go | 4 ++++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 19a4f731ec6..659c5330811 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -368,6 +368,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream // we need to wait for all streams to have the desired state state = attributeValue.Get("State").String() + t.Logf("Workflow %s, on %s, state: %s", ksWorkflow, tabletId.String(), state) if state == wantState { for i := 0; i < len(fieldEqualityChecks); i++ { if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 { @@ -379,6 +380,9 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa } } } + if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && attributeValue.Get("Pos").String() == "" { + done = false + } } else { done = false } diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index f84f8d09da5..06127b628c7 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -446,9 +446,6 @@ func testReshardV2Workflow(t *testing.T) { createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-") createReshardWorkflow(t, "-80,80-", "-40,40-80,80-c0,c0-") - if !strings.Contains(lastOutput, "Status: Running") { - t.Fail() - } validateReadsRouteToSource(t, "replica") validateWritesRouteToSource(t) @@ -473,7 +470,6 @@ func testMoveTablesV2Workflow(t *testing.T) { // The purge table should get skipped/ignored // If it's not then we'll get an error as the table doesn't exist in the vschema createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") - require.Contains(t, lastOutput, "Status: Running") waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) validateReadsRouteToSource(t, "replica") validateWritesRouteToSource(t) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index c29cd3228db..5cfe876eb9d 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3285,10 +3285,14 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat log.Infof("writes already switched no need to check lag") return "", nil } + ts.Logger().Infof("Checking if we can switch traffic for workflow %s.%s with starting state: %s", + state.TargetKeyspace, state.Workflow, state.String()) wf, err := s.GetWorkflow(ctx, state.TargetKeyspace, state.Workflow, false, shards) if err != nil { return "", err } + ts.Logger().Infof("Checking if we can switch traffic for workflow %s.%s with GetWorkflow result: %+v", + state.TargetKeyspace, state.Workflow, wf) for _, stream := range wf.ShardStreams { for _, st := range stream.GetStreams() { if st.Message == Frozen { From 40a94239404c5bd0d6669c19448c65e482fb3d1d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 27 Mar 2024 01:22:59 -0400 Subject: [PATCH 6/8] Remove debug logging Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/helper_test.go | 1 - go/test/endtoend/vreplication/resharding_workflows_v2_test.go | 2 +- go/vt/vtctl/workflow/server.go | 4 ---- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 659c5330811..e187c8398b6 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -368,7 +368,6 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream // we need to wait for all streams to have the desired state state = attributeValue.Get("State").String() - t.Logf("Workflow %s, on %s, state: %s", ksWorkflow, tabletId.String(), state) if state == wantState { for i := 0; i < len(fieldEqualityChecks); i++ { if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 { diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 06127b628c7..b14b649d6b7 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -145,7 +145,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, if BypassLagCheck { args = append(args, "--max-replication-lag-allowed=2542087h") } - args = append(args, "--timeout=1m") + args = append(args, "--timeout=90s") } if action == workflowActionCreate && options.atomicCopy { args = append(args, "--atomic-copy") diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 5cfe876eb9d..c29cd3228db 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3285,14 +3285,10 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat log.Infof("writes already switched no need to check lag") return "", nil } - ts.Logger().Infof("Checking if we can switch traffic for workflow %s.%s with starting state: %s", - state.TargetKeyspace, state.Workflow, state.String()) wf, err := s.GetWorkflow(ctx, state.TargetKeyspace, state.Workflow, false, shards) if err != nil { return "", err } - ts.Logger().Infof("Checking if we can switch traffic for workflow %s.%s with GetWorkflow result: %+v", - state.TargetKeyspace, state.Workflow, wf) for _, stream := range wf.ShardStreams { for _, st := range stream.GetStreams() { if st.Message == Frozen { From ef8d4fe0b8cefa0bbb81877c10763cf2f1ebe6fd Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 27 Mar 2024 10:45:51 -0400 Subject: [PATCH 7/8] Go back to random flavor for FK tests Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/fk_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index 189857b71ee..a75b6727fdc 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -34,7 +34,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) -const testWorkflowFlavor = workflowFlavorVtctld +const testWorkflowFlavor = workflowFlavorRandom // TestFKWorkflow runs a MoveTables workflow with atomic copy for a db with foreign key constraints. // It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without, From 0548af707f2a99c71cc33c7634fcbb3f7132974d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 27 Mar 2024 11:38:42 -0400 Subject: [PATCH 8/8] Deflake unit test Signed-off-by: Matt Lord --- go/streamlog/streamlog_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/streamlog/streamlog_test.go b/go/streamlog/streamlog_test.go index ab01dd4ccb2..538cae99b54 100644 --- a/go/streamlog/streamlog_test.go +++ b/go/streamlog/streamlog_test.go @@ -216,7 +216,7 @@ func TestFile(t *testing.T) { logger.Send(&logMessage{"test 2"}) // Allow time for propagation - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) want := "test 1\ntest 2\n" contents, _ := os.ReadFile(logPath) @@ -230,7 +230,7 @@ func TestFile(t *testing.T) { os.Rename(logPath, rotatedPath) logger.Send(&logMessage{"test 3"}) - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) want = "test 1\ntest 2\ntest 3\n" contents, _ = os.ReadFile(rotatedPath) @@ -244,10 +244,10 @@ func TestFile(t *testing.T) { if err := syscall.Kill(syscall.Getpid(), syscall.SIGUSR2); err != nil { t.Logf("failed to send streamlog rotate signal: %v", err) } - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) logger.Send(&logMessage{"test 4"}) - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) want = "test 1\ntest 2\ntest 3\n" contents, _ = os.ReadFile(rotatedPath)