Skip to content

Commit

Permalink
[release-18.0] VReplication: Get workflowFlavorVtctl endtoend testing…
Browse files Browse the repository at this point in the history
… working properly again (#15636) (#15666)

Signed-off-by: Matt Lord <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
vitess-bot[bot] and mattlord authored Apr 4, 2024
1 parent 7495fd0 commit 536071a
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 9 deletions.
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestFKWorkflow(t *testing.T) {
defaultCell = vc.Cells[defaultCellName]
sourceKeyspace := "fksource"
shardName := "0"
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables

defer vc.TearDown(t)

Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package vreplication
import (
"testing"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

func TestMoveTablesBuffering(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// We cannot Complete a partial move tables at the moment because
// it will find that all traffic has (obviously) not been switched.
err = tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", false)
err = tstWorkflowExecVtctl(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", false)
require.Error(t, err)

// Confirm global routing rules: -80 should still be be routed to customer
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {
// We switched traffic, so it's the reverse workflow we want to cancel.
reverseWf := wf + "_reverse"
reverseKs := sourceKs // customer
err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", false)
err = tstWorkflowExecVtctl(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", false)
require.NoError(t, err)

output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show")
Expand Down
65 changes: 64 additions & 1 deletion go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func tstWorkflowAction(t *testing.T, action, tabletTypes, cells string) error {
return tstWorkflowExec(t, cells, workflowName, sourceKs, targetKs, tablesToMove, action, tabletTypes, "", "", false)
}

// tstWorkflowExec executes a MoveTables or Reshard workflow command using
// vtctldclient. If you need to use the legacy vtctlclient, use
// tstWorkflowExecVtctl instead.
func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, sourceShards, targetShards string, atomicCopy bool) error {
var args []string
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
Expand Down Expand Up @@ -136,7 +139,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
}
args = append(args, "--timeout=90s")
}
if action == workflowActionCreate && atomicCopy {
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && action == workflowActionCreate && atomicCopy {
args = append(args, "--atomic-copy")
}
if (action == workflowActionCreate || action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic) && cells != "" {
Expand All @@ -157,6 +160,66 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
return nil
}

// tstWorkflowExecVtctl executes a MoveTables or Reshard workflow command using
// vtctlclient. It should operate exactly the same way as tstWorkflowExec, but
// using the legacy client.
func tstWorkflowExecVtctl(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, sourceShards, targetShards string, atomicCopy bool) error {
var args []string
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "MoveTables")
} else {
args = append(args, "Reshard")
}

args = append(args, "--")

if BypassLagCheck {
args = append(args, "--max_replication_lag_allowed=2542087h")
}
if atomicCopy {
args = append(args, "--atomic-copy")
}
switch action {
case workflowActionCreate:
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "--source", sourceKs)
if tables != "" {
args = append(args, "--tables", tables)
} else {
args = append(args, "--all")
}
if sourceShards != "" {
args = append(args, "--source_shards", sourceShards)
}
} else {
args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards)
}
// Test new experimental --defer-secondary-keys flag
switch currentWorkflowType {
case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard:
if !atomicCopy {
args = append(args, "--defer-secondary-keys")
}
args = append(args, "--initialize-target-sequences") // Only used for MoveTables
}
}
if cells != "" {
args = append(args, "--cells", cells)
}
if tabletTypes != "" {
args = append(args, "--tablet_types", tabletTypes)
}
args = append(args, "--timeout", time.Minute.String())
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
args = append(args, action, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...)
lastOutput = output
if err != nil {
return fmt.Errorf("%s: %s", err, output)
}
return nil
}

func tstWorkflowSwitchReads(t *testing.T, tabletTypes, cells string) {
if tabletTypes == "" {
tabletTypes = "replica,rdonly"
Expand Down
8 changes: 4 additions & 4 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,19 @@ func newVtctlMoveTables(mt *moveTables) *VtctlMoveTables {

func (vmt *VtctlMoveTables) Create() {
log.Infof("vmt is %+v", vmt.vc, vmt.tables)
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionCreate, "", vmt.sourceShards, "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) SwitchReadsAndWrites() {
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionSwitchTraffic, "", "", "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) ReverseReadsAndWrites() {
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionReverseTraffic, "", "", "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}
Expand All @@ -125,7 +125,7 @@ func (vmt *VtctlMoveTables) SwitchWrites() {
}

func (vmt *VtctlMoveTables) Cancel() {
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionCancel, "", "", "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}
Expand Down

0 comments on commit 536071a

Please sign in to comment.