diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index a75b6727fdc..09692930c5c 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -54,6 +54,7 @@ func TestFKWorkflow(t *testing.T) { sourceKeyspace := "fksource" shardName := "0" + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables defer vc.TearDown() diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index 88231d4be87..a977320ec4a 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -3,11 +3,11 @@ package vreplication import ( "testing" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) func TestMoveTablesBuffering(t *testing.T) { diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 7155dc9be91..2f0c7c71d29 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -297,9 +297,15 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { _, err = vtgateConn.ExecuteFetch(shardDash80RoutedQuery, 0, false) require.Error(t, err) require.Contains(t, err.Error(), "target: customer.-80.replica", "Query was routed to the target before partial SwitchTraffic") + + workflowExec := tstWorkflowExec + if flavor == workflowFlavorVtctl { + workflowExec = tstWorkflowExecVtctl + } + // We cannot Complete a partial move tables at the moment because // it will find that all traffic has (obviously) not been switched. - err = tstWorkflowExec(t, "", workflowName, "", targetKs, "", workflowActionComplete, "", "", "", workflowExecOptsPartial80Dash) + err = workflowExec(t, "", workflowName, "", targetKs, "", workflowActionComplete, "", "", "", workflowExecOptsPartial80Dash) require.Error(t, err) // Confirm global routing rules: -80 should still be be routed to customer @@ -348,7 +354,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { } reverseWf := wf + "_reverse" reverseKs := sourceKeyspace - err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", opts) + err = workflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", opts) require.NoError(t, err) output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "--", "--shards", opts.shardSubset, fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show") @@ -377,6 +383,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { // customer shard -- -80,80- -- once a a time to customer2. // We test with both the vtctlclient and vtctldclient flavors. func TestPartialMoveTablesBasic(t *testing.T) { + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables for _, flavor := range workflowFlavors { t.Run(workflowFlavorNames[flavor], func(t *testing.T) { testPartialMoveTablesBasic(t, flavor) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index c63fc245535..f2976d7ab02 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -170,6 +171,69 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, return nil } +func tstWorkflowExecVtctl(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, + sourceShards, targetShards string, options *workflowExecOptions) error { + + var args []string + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { + args = append(args, "MoveTables") + } else { + args = append(args, "Reshard") + } + + args = append(args, "--") + + if BypassLagCheck { + args = append(args, "--max_replication_lag_allowed=2542087h") + } + if options.atomicCopy { + args = append(args, "--atomic-copy") + } + switch action { + case workflowActionCreate: + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { + args = append(args, "--source", sourceKs) + if tables != "" { + args = append(args, "--tables", tables) + } else { + args = append(args, "--all") + } + if sourceShards != "" { + args = append(args, "--source_shards", sourceShards) + } + } else { + args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards) + } + // Test new experimental --defer-secondary-keys flag + switch currentWorkflowType { + case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard: + if !options.atomicCopy && options.deferSecondaryKeys { + args = append(args, "--defer-secondary-keys") + } + args = append(args, "--initialize-target-sequences") // Only used for MoveTables + } + default: + if options.shardSubset != "" { + args = append(args, "--shards", options.shardSubset) + } + } + if cells != "" { + args = append(args, "--cells", cells) + } + if tabletTypes != "" { + args = append(args, "--tablet_types", tabletTypes) + } + args = append(args, "--timeout", time.Minute.String()) + ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) + args = append(args, action, ksWorkflow) + output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...) + lastOutput = output + if err != nil { + return fmt.Errorf("%s: %s", err, output) + } + return nil +} + func tstWorkflowSwitchReads(t *testing.T, tabletTypes, cells string) { if tabletTypes == "" { tabletTypes = "replica,rdonly" diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 407fe12c3ed..cadcaed261f 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -122,13 +122,13 @@ func (vmt *VtctlMoveTables) Create() { } func (vmt *VtctlMoveTables) SwitchReadsAndWrites() { - err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, vmt.tables, workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions) require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) ReverseReadsAndWrites() { - err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, vmt.tables, workflowActionReverseTraffic, "", "", "", defaultWorkflowExecOptions) require.NoError(vmt.vc.t, err) } @@ -143,7 +143,7 @@ func (vmt *VtctlMoveTables) exec(action string) { deferSecondaryKeys: false, atomicCopy: vmt.atomicCopy, } - err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, vmt.tables, action, vmt.tabletTypes, vmt.sourceShards, "", options) require.NoError(vmt.vc.t, err) } @@ -333,7 +333,7 @@ func (vrs *VtctlReshard) Show() { func (vrs *VtctlReshard) exec(action string) { options := &workflowExecOptions{} - err := tstWorkflowExec(vrs.vc.t, "", vrs.workflowName, "", vrs.targetKeyspace, + err := tstWorkflowExecVtctl(vrs.vc.t, "", vrs.workflowName, "", vrs.targetKeyspace, "", action, vrs.tabletTypes, vrs.sourceShards, vrs.targetShards, options) require.NoError(vrs.vc.t, err) }