Skip to content

Commit

Permalink
Get workflowFlavorVtctl working properly again
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Apr 3, 2024
1 parent 0e2f175 commit a2d114c
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 8 deletions.
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestFKWorkflow(t *testing.T) {

sourceKeyspace := "fksource"
shardName := "0"
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables

defer vc.TearDown()

Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package vreplication
import (
"testing"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

func TestMoveTablesBuffering(t *testing.T) {
Expand Down
11 changes: 9 additions & 2 deletions go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,15 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
_, err = vtgateConn.ExecuteFetch(shardDash80RoutedQuery, 0, false)
require.Error(t, err)
require.Contains(t, err.Error(), "target: customer.-80.replica", "Query was routed to the target before partial SwitchTraffic")

workflowExec := tstWorkflowExec
if flavor == workflowFlavorVtctl {
workflowExec = tstWorkflowExecVtctl
}

// We cannot Complete a partial move tables at the moment because
// it will find that all traffic has (obviously) not been switched.
err = tstWorkflowExec(t, "", workflowName, "", targetKs, "", workflowActionComplete, "", "", "", workflowExecOptsPartial80Dash)
err = workflowExec(t, "", workflowName, "", targetKs, "", workflowActionComplete, "", "", "", workflowExecOptsPartial80Dash)
require.Error(t, err)

// Confirm global routing rules: -80 should still be be routed to customer
Expand Down Expand Up @@ -348,7 +354,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
}
reverseWf := wf + "_reverse"
reverseKs := sourceKeyspace
err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", opts)
err = workflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", opts)
require.NoError(t, err)

output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "--", "--shards", opts.shardSubset, fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show")
Expand Down Expand Up @@ -377,6 +383,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
// customer shard -- -80,80- -- once a a time to customer2.
// We test with both the vtctlclient and vtctldclient flavors.
func TestPartialMoveTablesBasic(t *testing.T) {
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
for _, flavor := range workflowFlavors {
t.Run(workflowFlavorNames[flavor], func(t *testing.T) {
testPartialMoveTablesBasic(t, flavor)
Expand Down
64 changes: 64 additions & 0 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -170,6 +171,69 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
return nil
}

func tstWorkflowExecVtctl(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes,
sourceShards, targetShards string, options *workflowExecOptions) error {

var args []string
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "MoveTables")
} else {
args = append(args, "Reshard")
}

args = append(args, "--")

if BypassLagCheck {
args = append(args, "--max_replication_lag_allowed=2542087h")
}
if options.atomicCopy {
args = append(args, "--atomic-copy")
}
switch action {
case workflowActionCreate:
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "--source", sourceKs)
if tables != "" {
args = append(args, "--tables", tables)
} else {
args = append(args, "--all")
}
if sourceShards != "" {
args = append(args, "--source_shards", sourceShards)
}
} else {
args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards)
}
// Test new experimental --defer-secondary-keys flag
switch currentWorkflowType {
case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard:
if !options.atomicCopy && options.deferSecondaryKeys {
args = append(args, "--defer-secondary-keys")
}
args = append(args, "--initialize-target-sequences") // Only used for MoveTables
}
default:
if options.shardSubset != "" {
args = append(args, "--shards", options.shardSubset)
}
}
if cells != "" {
args = append(args, "--cells", cells)
}
if tabletTypes != "" {
args = append(args, "--tablet_types", tabletTypes)
}
args = append(args, "--timeout", time.Minute.String())
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
args = append(args, action, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...)
lastOutput = output
if err != nil {
return fmt.Errorf("%s: %s", err, output)
}
return nil
}

func tstWorkflowSwitchReads(t *testing.T, tabletTypes, cells string) {
if tabletTypes == "" {
tabletTypes = "replica,rdonly"
Expand Down
8 changes: 4 additions & 4 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ func (vmt *VtctlMoveTables) Create() {
}

func (vmt *VtctlMoveTables) SwitchReadsAndWrites() {
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions)
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) ReverseReadsAndWrites() {
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionReverseTraffic, "", "", "", defaultWorkflowExecOptions)
require.NoError(vmt.vc.t, err)
}
Expand All @@ -143,7 +143,7 @@ func (vmt *VtctlMoveTables) exec(action string) {
deferSecondaryKeys: false,
atomicCopy: vmt.atomicCopy,
}
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, action, vmt.tabletTypes, vmt.sourceShards, "", options)
require.NoError(vmt.vc.t, err)
}
Expand Down Expand Up @@ -333,7 +333,7 @@ func (vrs *VtctlReshard) Show() {

func (vrs *VtctlReshard) exec(action string) {
options := &workflowExecOptions{}
err := tstWorkflowExec(vrs.vc.t, "", vrs.workflowName, "", vrs.targetKeyspace,
err := tstWorkflowExecVtctl(vrs.vc.t, "", vrs.workflowName, "", vrs.targetKeyspace,
"", action, vrs.tabletTypes, vrs.sourceShards, vrs.targetShards, options)
require.NoError(vrs.vc.t, err)
}
Expand Down

0 comments on commit a2d114c

Please sign in to comment.