Skip to content

Commit

Permalink
Add end to end test for vtctlclient
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Jan 2, 2024
1 parent 2fcdaf2 commit 171db02
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
33 changes: 23 additions & 10 deletions go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,13 @@ func TestPartialMoveTablesBasic(t *testing.T) {
setupCustomer2Keyspace(t)

testCancel(t)

workflowExecOptsPartialDash80 := &workflowExecOptions{
deferSecondaryKeys: true,
shardSubset: "-80",
}
workflowExecOptsPartial80Dash := &workflowExecOptions{
deferSecondaryKeys: true,
}
currentWorkflowType = wrangler.MoveTablesWorkflow
wfName := "partial80Dash"
sourceKs := "customer"
Expand All @@ -143,7 +149,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// start the partial movetables for 80-
err := tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs,
"customer,loadtest", workflowActionCreate, "", shard, "", defaultWorkflowExecOptions)
"customer,loadtest", workflowActionCreate, "", shard, "", workflowExecOptsPartial80Dash)
require.NoError(t, err)
var lg *loadGenerator
if runWithLoad { // start load after routing rules are set, otherwise we end up with ambiguous tables
Expand Down Expand Up @@ -218,7 +224,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {
require.Contains(t, err.Error(), "target: customer.-80.primary", "Query was routed to the target before any SwitchTraffic")

// Switch all traffic for the shard
require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions))
require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", workflowExecOptsPartial80Dash))
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
targetKs, wfName, shard, shard)
require.Equal(t, expectedSwitchOutput, lastOutput)
Expand Down Expand Up @@ -276,7 +282,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// We cannot Complete a partial move tables at the moment because
// it will find that all traffic has (obviously) not been switched.
err = tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", defaultWorkflowExecOptions)
err = tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", workflowExecOptsPartial80Dash)
require.Error(t, err)

// Confirm global routing rules: -80 should still be be routed to customer
Expand All @@ -289,14 +295,14 @@ func TestPartialMoveTablesBasic(t *testing.T) {
ksWf = fmt.Sprintf("%s.%s", targetKs, wfName)
// Start the partial movetables for -80, 80- has already been switched
err = tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs,
"customer,loadtest", workflowActionCreate, "", shard, "", defaultWorkflowExecOptions)
"customer,loadtest", workflowActionCreate, "", shard, "", workflowExecOptsPartialDash80)
require.NoError(t, err)
targetTab2 := vc.getPrimaryTablet(t, targetKs, shard)
catchup(t, targetTab2, wfName, "Partial MoveTables Customer to Customer2: -80")
vdiffSideBySide(t, ksWf, "")

// Switch all traffic for the shard
require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions))
require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", workflowExecOptsPartialDash80))
expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
targetKs, wfName)
require.Equal(t, expectedSwitchOutput, lastOutput)
Expand All @@ -315,20 +321,27 @@ func TestPartialMoveTablesBasic(t *testing.T) {
// clean up both the global routing rules and the shard routing rules.
for _, wf := range []string{"partialDash80", "partial80Dash"} {
// We switched traffic, so it's the reverse workflow we want to cancel.
var opts *workflowExecOptions
switch wf {
case "partialDash80":
opts = workflowExecOptsPartialDash80
case "partial80Dash":
opts = workflowExecOptsPartial80Dash
}
reverseWf := wf + "_reverse"
reverseKs := sourceKs // customer
err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", defaultWorkflowExecOptions)
err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", opts)
require.NoError(t, err)

output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show")
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "--", "--shards", opts.shardSubset, fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show")
require.Error(t, err)
require.Contains(t, output, "no streams found")

// Delete the original workflow
originalKsWf := fmt.Sprintf("%s.%s", targetKs, wf)
_, err = vc.VtctlClient.ExecuteCommandWithOutput("Workflow", originalKsWf, "delete")
_, err = vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "--", "--shards", opts.shardSubset, originalKsWf, "delete")
require.NoError(t, err)
output, err = vc.VtctlClient.ExecuteCommandWithOutput("Workflow", originalKsWf, "show")
output, err = vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "--", "--shards", opts.shardSubset, originalKsWf, "show")
require.Error(t, err)
require.Contains(t, output, "no streams found")
}
Expand Down
5 changes: 5 additions & 0 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (
type workflowExecOptions struct {
deferSecondaryKeys bool
atomicCopy bool
shardSubset string
}

var defaultWorkflowExecOptions = &workflowExecOptions{
Expand Down Expand Up @@ -140,6 +141,10 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
}
args = append(args, "--initialize-target-sequences") // Only used for MoveTables
}
default:
if options.shardSubset != "" {
args = append(args, "--shards", options.shardSubset)
}
}
if cells != "" {
args = append(args, "--cells", cells)
Expand Down

0 comments on commit 171db02

Please sign in to comment.