Skip to content

Commit

Permalink
Move the Reshard v2 workflow to vtctldclient
Browse files Browse the repository at this point in the history
And get the tests passing again with minor fixes.

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Mar 26, 2024
1 parent 308f1fc commit 1d49744
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func commandSwitchTraffic(cmd *cobra.Command, args []string) error {
req := &vtctldatapb.WorkflowSwitchTrafficRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
Cells: SwitchTrafficOptions.Cells,
TabletTypes: SwitchTrafficOptions.TabletTypes,
MaxReplicationLagAllowed: protoutil.DurationToProto(SwitchTrafficOptions.MaxReplicationLagAllowed),
Timeout: protoutil.DurationToProto(SwitchTrafficOptions.Timeout),
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const testWorkflowFlavor = workflowFlavorRandom
const testWorkflowFlavor = workflowFlavorVtctld

// TestFKWorkflow runs a MoveTables workflow with atomic copy for a db with foreign key constraints.
// It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without,
Expand Down
10 changes: 9 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/proto/topodata"
)

const (
Expand Down Expand Up @@ -410,7 +412,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
// as a CSV have secondary keys. This is useful when testing the
// --defer-secondary-keys flag to confirm that the secondary keys
// were re-added by the time the workflow hits the running phase.
// For a Reshard workflow, where no tables are specififed, pass
// For a Reshard workflow, where no tables are specified, pass
// an empty string for the tables and all tables in the target
// keyspace will be checked.
func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletProcess, ksName string, tables string) {
Expand All @@ -430,6 +432,12 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro
}
}
for _, tablet := range tablets {
// Be sure that the schema is up to date.
err := vc.VtctldClient.ExecuteCommand("ReloadSchema", topoproto.TabletAliasString(&topodata.TabletAlias{
Cell: tablet.Cell,
Uid: uint32(tablet.TabletUID),
}))
require.NoError(t, err)
for _, table := range tableArr {
if schema.IsInternalOperationTableName(table) {
continue
Expand Down
3 changes: 1 addition & 2 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"
)

func TestMoveTablesBuffering(t *testing.T) {
defaultRdonly = 1
vc = setupMinimalCluster(t)
defer vc.TearDown()

currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
setupMinimalCustomerKeyspace(t)
tables := "loadtest"
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
Expand Down
9 changes: 4 additions & 5 deletions go/test/endtoend/vreplication/partial_movetables_seq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand Down Expand Up @@ -228,12 +227,12 @@ func (wf *workflow) create() {
cell := wf.tc.defaultCellName
switch typ {
case "movetables":
currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
sourceShards := strings.Join(wf.options.sourceShards, ",")
err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace,
strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", defaultWorkflowExecOptions)
case "reshard":
currentWorkflowType = wrangler.ReshardWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard
sourceShards := strings.Join(wf.options.sourceShards, ",")
targetShards := strings.Join(wf.options.targetShards, ",")
if targetShards == "" {
Expand Down Expand Up @@ -389,7 +388,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {

// Switch all traffic for the shard
wf80Dash.switchTraffic()
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
targetKs, wfName, shard, shard)
require.Equal(t, expectedSwitchOutput, lastOutput)
currentCustomerCount = getCustomerCount(t, "")
Expand Down Expand Up @@ -449,7 +448,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
wfDash80.create()
wfDash80.switchTraffic()

expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
targetKs, wfName)
require.Equal(t, expectedSwitchOutput, lastOutput)

Expand Down
93 changes: 49 additions & 44 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var (
sourceTab, sourceReplicaTab, sourceRdonlyTab *cluster.VttabletProcess

lastOutput string
currentWorkflowType wrangler.VReplicationWorkflowType
currentWorkflowType binlogdatapb.VReplicationWorkflowType
)

type workflowExecOptions struct {
Expand Down Expand Up @@ -103,59 +103,62 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
sourceShards, targetShards string, options *workflowExecOptions) error {

var args []string
if currentWorkflowType == wrangler.MoveTablesWorkflow {
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "MoveTables")
} else {
args = append(args, "Reshard")
}

args = append(args, "--")
args = append(args, "--workflow", workflow, "--target-keyspace", targetKs, action)

if BypassLagCheck {
args = append(args, "--max_replication_lag_allowed=2542087h")
}
if options.atomicCopy {
args = append(args, "--atomic-copy")
}
switch action {
case workflowActionCreate:
if currentWorkflowType == wrangler.MoveTablesWorkflow {
args = append(args, "--source", sourceKs)
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "--source-keyspace", sourceKs)
if tables != "" {
args = append(args, "--tables", tables)
} else {
args = append(args, "--all")
args = append(args, "--all-tables")
}
if sourceShards != "" {
args = append(args, "--source_shards", sourceShards)
args = append(args, "--source-shards", sourceShards)
}
} else {
args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards)
args = append(args, "--source-shards", sourceShards, "--target-shards", targetShards)
}
// Test new experimental --defer-secondary-keys flag
switch currentWorkflowType {
case wrangler.MoveTablesWorkflow, wrangler.MigrateWorkflow, wrangler.ReshardWorkflow:

case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard:
if !options.atomicCopy && options.deferSecondaryKeys {
args = append(args, "--defer-secondary-keys")
}
args = append(args, "--initialize-target-sequences") // Only used for MoveTables
}
default:
if options.shardSubset != "" {
args = append(args, "--shards", options.shardSubset)
}
}
if cells != "" {
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && action == workflowActionSwitchTraffic {
args = append(args, "--initialize-target-sequences")
}
if action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic {
if BypassLagCheck {
args = append(args, "--max-replication-lag-allowed=2542087h")
}
args = append(args, "--timeout", time.Minute.String())
}
if action == workflowActionCreate && options.atomicCopy {
args = append(args, "--atomic-copy")
}
if (action == workflowActionCreate || action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic) && cells != "" {
args = append(args, "--cells", cells)
}
if tabletTypes != "" {
args = append(args, "--tablet_types", tabletTypes)
if action != workflowActionComplete && tabletTypes != "" {
args = append(args, "--tablet-types", tabletTypes)
}
args = append(args, "--timeout", time.Minute.String())
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
args = append(args, action, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...)
args = append(args, "--action_timeout=2m")
t.Logf("Executing workflow command: vtctldclient %v", args)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
lastOutput = output
if err != nil {
return fmt.Errorf("%s: %s", err, output)
Expand Down Expand Up @@ -285,8 +288,8 @@ func checkStates(t *testing.T, startState, endState string) {
require.Contains(t, lastOutput, fmt.Sprintf("Current State: %s", endState))
}

func getCurrentState(t *testing.T) string {
if err := tstWorkflowAction(t, "GetState", "", ""); err != nil {
func getCurrentStatus(t *testing.T) string {
if err := tstWorkflowAction(t, "status", "", ""); err != nil {
return err.Error()
}
return strings.TrimSpace(strings.Trim(lastOutput, "\n"))
Expand Down Expand Up @@ -335,7 +338,7 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
// at this point the unsharded product and sharded customer keyspaces are created by previous tests

// use MoveTables to move customer2 from product to customer using
currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
err := tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs,
"customer2", workflowActionCreate, "", "", "", defaultWorkflowExecOptions)
require.NoError(t, err)
Expand Down Expand Up @@ -432,7 +435,7 @@ func testReplicatingWithPKEnumCols(t *testing.T) {
func testReshardV2Workflow(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
currentWorkflowType = wrangler.ReshardWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard

// create internal tables on the original customer shards that should be
// ignored and not show up on the new shards
Expand All @@ -441,7 +444,7 @@ func testReshardV2Workflow(t *testing.T) {

createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-")
createReshardWorkflow(t, "-80,80-", "-40,40-80,80-c0,c0-")
if !strings.Contains(lastOutput, "Workflow started successfully") {
if !strings.Contains(lastOutput, "Status: Running") {
t.Fail()
}
validateReadsRouteToSource(t, "replica")
Expand All @@ -461,16 +464,15 @@ func testReshardV2Workflow(t *testing.T) {
func testMoveTablesV2Workflow(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables

// test basic forward and reverse flows
setupCustomerKeyspace(t)
// The purge table should get skipped/ignored
// If it's not then we'll get an error as the table doesn't exist in the vschema
createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431")
if !strings.Contains(lastOutput, "Workflow started successfully") {
t.Fail()
}
require.Contains(t, lastOutput, "Status: Running")
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
validateReadsRouteToSource(t, "replica")
validateWritesRouteToSource(t)

Expand All @@ -485,26 +487,29 @@ func testMoveTablesV2Workflow(t *testing.T) {

testRestOfWorkflow(t)

listAllArgs := []string{"workflow", "customer", "listall"}
output, _ := vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
require.Contains(t, output, "No workflows found in keyspace customer")
listAllArgs := []string{"workflow", "--keyspace", "customer", "list"}
output, err := vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.Equal(t, output, "[]")

testVSchemaForSequenceAfterMoveTables(t)

createMoveTablesWorkflow(t, "Lead,Lead-1")
output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1")
output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.Contains(t, output, "wf1")

err := tstWorkflowCancel(t)
err = tstWorkflowCancel(t)
require.NoError(t, err)

output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
require.Contains(t, output, "No workflows found in keyspace customer")
output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.Equal(t, output, "[]")
}

func testPartialSwitches(t *testing.T) {
// nothing switched
require.Equal(t, getCurrentState(t), wrangler.WorkflowStateNotSwitched)
require.Contains(t, getCurrentStatus(t), wrangler.WorkflowStateNotSwitched)
tstWorkflowSwitchReads(t, "replica,rdonly", "zone1")
nextState := "Reads partially switched. Replica switched in cells: zone1. Rdonly switched in cells: zone1. Writes Not Switched"
checkStates(t, wrangler.WorkflowStateNotSwitched, nextState)
Expand All @@ -526,7 +531,7 @@ func testPartialSwitches(t *testing.T) {
checkStates(t, nextState, nextState) // idempotency

keyspace := "product"
if currentWorkflowType == wrangler.ReshardWorkflow {
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_Reshard {
keyspace = "customer"
}
waitForLowLag(t, keyspace, "wf1_reverse")
Expand Down Expand Up @@ -563,7 +568,7 @@ func testRestOfWorkflow(t *testing.T) {

// this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces
keyspace := "product"
if currentWorkflowType == wrangler.ReshardWorkflow {
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_Reshard {
keyspace = "customer"
}
waitForLowLag(t, keyspace, "wf1_reverse")
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

type iWorkflow interface {
Expand Down Expand Up @@ -117,7 +117,7 @@ func newVtctlMoveTables(mt *moveTablesWorkflow) *VtctlMoveTables {
}

func (vmt *VtctlMoveTables) Create() {
currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
vmt.exec(workflowActionCreate)
}

Expand Down Expand Up @@ -314,7 +314,7 @@ func newVtctlReshard(rs *reshardWorkflow) *VtctlReshard {
}

func (vrs *VtctlReshard) Create() {
currentWorkflowType = wrangler.ReshardWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard
vrs.exec(workflowActionCreate)
}

Expand Down
Loading

0 comments on commit 1d49744

Please sign in to comment.