Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] VtctldClient Reshard: add e2e tests to confirm CLI options and fix discovered issues. (#15353) #15364

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions go/cmd/vtctldclient/command/vreplication/reshard/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,18 @@
cli.FinishedParsing(cmd)

req := &vtctldatapb.ReshardCreateRequest{
Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,

Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,

Check warning on line 64 in go/cmd/vtctldclient/command/vreplication/reshard/create.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vtctldclient/command/vreplication/reshard/create.go#L63-L64

Added lines #L63 - L64 were not covered by tests
TabletTypes: common.CreateOptions.TabletTypes,
TabletSelectionPreference: tsp,
Cells: common.CreateOptions.Cells,
OnDdl: common.CreateOptions.OnDDL,
DeferSecondaryKeys: common.CreateOptions.DeferSecondaryKeys,
AutoStart: common.CreateOptions.AutoStart,
StopAfterCopy: common.CreateOptions.StopAfterCopy,

SourceShards: reshardCreateOptions.sourceShards,
TargetShards: reshardCreateOptions.targetShards,
SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy,
SourceShards: reshardCreateOptions.sourceShards,
TargetShards: reshardCreateOptions.targetShards,
SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy,

Check warning on line 74 in go/cmd/vtctldclient/command/vreplication/reshard/create.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vtctldclient/command/vreplication/reshard/create.go#L72-L74

Added lines #L72 - L74 were not covered by tests
}
resp, err := common.GetClient().ReshardCreate(common.GetCommandCtx(), req)
if err != nil {
Expand Down
191 changes: 165 additions & 26 deletions go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/test/endtoend/cluster"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand All @@ -54,20 +53,33 @@ func TestVtctldclientCLI(t *testing.T) {
require.NotNil(t, zone2)
defer vc.TearDown()

sourceKeyspace := "product"
targetKeyspace := "customer"
sourceKeyspaceName := "product"
targetKeyspaceName := "customer"
var mt iMoveTables
workflowName := "wf1"
targetTabs := setupMinimalCustomerKeyspace(t)

t.Run("MoveTablesCreateFlags1", func(t *testing.T) {
testMoveTablesFlags1(t, &mt, sourceKeyspace, targetKeyspace, workflowName, targetTabs)
testMoveTablesFlags1(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs)
})
t.Run("MoveTablesCreateFlags2", func(t *testing.T) {
testMoveTablesFlags2(t, &mt, sourceKeyspace, targetKeyspace, workflowName, targetTabs)
testMoveTablesFlags2(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs)
})
t.Run("MoveTablesCompleteFlags3", func(t *testing.T) {
testMoveTablesFlags3(t, sourceKeyspaceName, targetKeyspaceName, targetTabs)
})
t.Run("MoveTablesCompleteFlags", func(t *testing.T) {
testMoveTablesFlags3(t, sourceKeyspace, targetKeyspace, targetTabs)
t.Run("Reshard", func(t *testing.T) {
cell := vc.Cells["zone1"]
targetKeyspace := cell.Keyspaces[targetKeyspaceName]
sourceShard := "-80"
newShards := "-40,40-80"
require.NoError(t, vc.AddShards(t, []*Cell{cell}, targetKeyspace, newShards, 1, 0, 400, nil))
reshardWorkflowName := "reshard"
tablets := map[string]*cluster.VttabletProcess{
"-40": targetKeyspace.Shards["-40"].Tablets["zone1-400"].Vttablet,
"40-80": targetKeyspace.Shards["40-80"].Tablets["zone1-500"].Vttablet,
}
splitShard(t, targetKeyspaceName, reshardWorkflowName, sourceShard, newShards, tablets)
})
}

Expand All @@ -81,34 +93,31 @@ func testMoveTablesFlags1(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK
}
completeFlags := []string{"--keep-routing-rules", "--keep-data"}
switchFlags := []string{}
// Test one set of MoveTable flags.
*mt = createMoveTables(t, sourceKeyspace, targetKeyspace, workflowName, tables, createFlags, completeFlags, switchFlags)
(*mt).Show()
moveTablesOutput := (*mt).GetLastOutput()
// Test one set of MoveTable flags.
moveTablesResponse := getMoveTablesShowResponse(mt)
workflowResponse := getWorkflow(targetKeyspace, workflowName)

workflowOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", "customer", "show", "--workflow", "wf1")
require.NoError(t, err)
var moveTablesResponse vtctldatapb.GetWorkflowsResponse
err = protojson.Unmarshal([]byte(moveTablesOutput), &moveTablesResponse)
require.NoError(t, err)

var workflowResponse vtctldatapb.GetWorkflowsResponse
err = protojson.Unmarshal([]byte(workflowOutput), &workflowResponse)
require.NoError(t, err)

moveTablesResponse.Workflows[0].MaxVReplicationTransactionLag = 0
moveTablesResponse.Workflows[0].MaxVReplicationLag = 0
workflowResponse.Workflows[0].MaxVReplicationTransactionLag = 0
workflowResponse.Workflows[0].MaxVReplicationLag = 0
// also validates that MoveTables Show and Workflow Show return the same output.
require.EqualValues(t, moveTablesResponse.CloneVT(), workflowResponse.CloneVT())
require.EqualValues(t, moveTablesResponse.CloneVT(), workflowResponse)

// Validate that the flags are set correctly in the database.
validateWorkflow1(t, workflowResponse.Workflows)
validateMoveTablesWorkflow(t, workflowResponse.Workflows)
// Since we used --no-routing-rules, there should be no routing rules.
confirmNoRoutingRules(t)
}

func getMoveTablesShowResponse(mt *iMoveTables) *vtctldatapb.GetWorkflowsResponse {
moveTablesOutput := (*mt).GetLastOutput()
var moveTablesResponse vtctldatapb.GetWorkflowsResponse
err := protojson.Unmarshal([]byte(moveTablesOutput), &moveTablesResponse)
require.NoError(vc.t, err)
moveTablesResponse.Workflows[0].MaxVReplicationTransactionLag = 0
moveTablesResponse.Workflows[0].MaxVReplicationLag = 0
return moveTablesResponse.CloneVT()
}

// Validates some of the flags created from the previous test.
func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetKeyspace, workflowName string, targetTabs map[string]*cluster.VttabletProcess) {
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
Expand Down Expand Up @@ -184,6 +193,135 @@ func createMoveTables(t *testing.T, sourceKeyspace, targetKeyspace, workflowName
return mt
}

// reshard helpers

func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards string, targetTabs map[string]*cluster.VttabletProcess) {
createFlags := []string{"--auto-start=false", "--defer-secondary-keys=false", "--stop-after-copy",
"--on-ddl", "STOP", "--tablet-types", "primary,rdonly", "--tablet-types-in-preference-order=true",
"--all-cells", "--format=json",
}
rs := newReshard(vc, &reshardWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
workflowName: workflowName,
targetKeyspace: keyspace,
},
sourceShards: sourceShards,
targetShards: targetShards,
createFlags: createFlags,
}, workflowFlavorVtctld)

ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName)
rs.Create()
validateReshardResponse(rs)
workflowResponse := getWorkflow(keyspace, workflowName)
reshardShowResponse := getReshardShowResponse(&rs)
require.EqualValues(t, reshardShowResponse, workflowResponse)
validateReshardWorkflow(t, workflowResponse.Workflows)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Stopped.String())
rs.Start()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String())
for _, tab := range targetTabs {
alias := fmt.Sprintf("zone1-%d", tab.TabletUID)
query := "update _vt.vreplication set source := replace(source, 'stop_after_copy:true', 'stop_after_copy:false') where db_name = 'vt_customer' and workflow = '" + workflowName + "'"
output, err := vc.VtctlClient.ExecuteCommandWithOutput("ExecuteFetchAsDba", alias, query)
require.NoError(t, err, output)
}
rs.Start()
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
rs.Stop()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String())
rs.Start()
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
for _, targetTab := range targetTabs {
catchup(t, targetTab, workflowName, "Reshard")
}
vdiff(t, keyspace, workflowName, "zone1", false, true, nil)

rs.SwitchReadsAndWrites()
waitForLowLag(t, keyspace, workflowName+"_reverse")
vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil)

rs.ReverseReadsAndWrites()
waitForLowLag(t, keyspace, workflowName)
vdiff(t, keyspace, workflowName, "zone1", false, true, nil)
rs.SwitchReadsAndWrites()
rs.Complete()
}

func getReshardShowResponse(rs *iReshard) *vtctldatapb.GetWorkflowsResponse {
(*rs).Show()
reshardOutput := (*rs).GetLastOutput()
var reshardResponse vtctldatapb.GetWorkflowsResponse
err := protojson.Unmarshal([]byte(reshardOutput), &reshardResponse)
require.NoError(vc.t, err)
reshardResponse.Workflows[0].MaxVReplicationTransactionLag = 0
reshardResponse.Workflows[0].MaxVReplicationLag = 0
return reshardResponse.CloneVT()
}

func validateReshardResponse(rs iReshard) {
resp := getReshardResponse(rs)
require.NotNil(vc.t, resp)
require.NotNil(vc.t, resp.ShardStreams)
require.Equal(vc.t, len(resp.ShardStreams), 2)
keyspace := "customer"
for _, shard := range []string{"-40", "40-80"} {
streams := resp.ShardStreams[fmt.Sprintf("%s/%s", keyspace, shard)]
require.Equal(vc.t, 1, len(streams.Streams))
require.Equal(vc.t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), streams.Streams[0].Status)
}
}

func validateReshardWorkflow(t *testing.T, workflows []*vtctldatapb.Workflow) {
require.Equal(t, 1, len(workflows))
wf := workflows[0]
require.Equal(t, "reshard", wf.Name)
require.Equal(t, binlogdatapb.VReplicationWorkflowType_Reshard.String(), wf.WorkflowType)
require.Equal(t, "None", wf.WorkflowSubType)
require.Equal(t, "customer", wf.Target.Keyspace)
require.Equal(t, 2, len(wf.Target.Shards))
require.Equal(t, "customer", wf.Source.Keyspace)
require.Equal(t, 1, len(wf.Source.Shards))
require.False(t, wf.DeferSecondaryKeys)

require.GreaterOrEqual(t, len(wf.ShardStreams), int(1))
oneStream := maps.Values(wf.ShardStreams)[0]
require.NotNil(t, oneStream)

stream := oneStream.Streams[0]
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), stream.State)
require.Equal(t, stream.TabletSelectionPreference, tabletmanagerdatapb.TabletSelectionPreference_INORDER)
require.True(t, slices.Equal([]topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_RDONLY}, stream.TabletTypes))
require.True(t, slices.Equal([]string{"zone1", "zone2"}, stream.Cells))

bls := stream.BinlogSource
require.Equal(t, binlogdatapb.OnDDLAction_STOP, bls.OnDdl)
require.True(t, bls.StopAfterCopy)

}

func getReshardResponse(rs iReshard) *vtctldatapb.WorkflowStatusResponse {
reshardOutput := rs.GetLastOutput()
var reshardResponse vtctldatapb.WorkflowStatusResponse
err := protojson.Unmarshal([]byte(reshardOutput), &reshardResponse)
require.NoError(vc.t, err)
return reshardResponse.CloneVT()
}

// helper functions

func getWorkflow(targetKeyspace, workflow string) *vtctldatapb.GetWorkflowsResponse {
workflowOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKeyspace, "show", "--workflow", workflow)
require.NoError(vc.t, err)
var workflowResponse vtctldatapb.GetWorkflowsResponse
err = protojson.Unmarshal([]byte(workflowOutput), &workflowResponse)
require.NoError(vc.t, err)
workflowResponse.Workflows[0].MaxVReplicationTransactionLag = 0
workflowResponse.Workflows[0].MaxVReplicationLag = 0
return workflowResponse.CloneVT()
}

func checkTablesExist(t *testing.T, tabletAlias string, tables []string) bool {
tablesResponse, err := vc.VtctldClient.ExecuteCommandWithOutput("GetSchema", tabletAlias, "--tables", strings.Join(tables, ","), "--table-names-only")
require.NoError(t, err)
Expand Down Expand Up @@ -211,6 +349,7 @@ func getRoutingRules(t *testing.T) *vschemapb.RoutingRules {
require.NoError(t, err)
return &routingRulesResponse
}

func confirmNoRoutingRules(t *testing.T) {
routingRulesResponse := getRoutingRules(t)
require.Zero(t, len(routingRulesResponse.Rules))
Expand All @@ -223,7 +362,7 @@ func confirmRoutingRulesExist(t *testing.T) {

// We only want to validate non-standard attributes that are set by the CLI. The other end-to-end tests validate the rest.
// We also check some of the standard attributes to make sure they are set correctly.
func validateWorkflow1(t *testing.T, workflows []*vtctldatapb.Workflow) {
func validateMoveTablesWorkflow(t *testing.T, workflows []*vtctldatapb.Workflow) {
require.Equal(t, 1, len(workflows))
wf := workflows[0]
require.Equal(t, "wf1", wf.Name)
Expand Down
31 changes: 22 additions & 9 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ type moveTablesWorkflow struct {
tables string
atomicCopy bool
sourceShards string
createFlags []string // currently only used by vtctld

// currently only used by vtctld
lastOutput string
createFlags []string
completeFlags []string
switchFlags []string
}
Expand Down Expand Up @@ -270,7 +271,12 @@ type reshardWorkflow struct {
targetShards string
skipSchemaCopy bool

lastOutput string
// currently only used by vtctld
lastOutput string
createFlags []string
completeFlags []string
cancelFlags []string
switchFlags []string
}

type iReshard interface {
Expand Down Expand Up @@ -379,8 +385,9 @@ func (v VtctldReshard) Flavor() string {
func (v VtctldReshard) exec(args ...string) {
args2 := []string{"Reshard", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace}
args2 = append(args2, args...)
if err := vc.VtctldClient.ExecuteCommand(args2...); err != nil {
v.vc.t.Fatalf("failed to create Reshard workflow: %v", err)
var err error
if v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...); err != nil {
v.vc.t.Fatalf("failed to create Reshard workflow: %v: %s", err, v.lastOutput)
}
}

Expand All @@ -395,20 +402,22 @@ func (v VtctldReshard) Create() {
if v.skipSchemaCopy {
args = append(args, "--skip-schema-copy="+strconv.FormatBool(v.skipSchemaCopy))
}
args = append(args, v.createFlags...)
v.exec(args...)
}

func (v VtctldReshard) SwitchReadsAndWrites() {
v.exec("SwitchTraffic")
args := []string{"SwitchTraffic"}
args = append(args, v.switchFlags...)
v.exec(args...)
}

func (v VtctldReshard) ReverseReadsAndWrites() {
v.exec("ReverseTraffic")
}

func (v VtctldReshard) Show() {
//TODO implement me
panic("implement me")
v.exec("Show")
}

func (v VtctldReshard) SwitchReads() {
Expand All @@ -422,11 +431,15 @@ func (v VtctldReshard) SwitchWrites() {
}

func (v VtctldReshard) Cancel() {
v.exec("Cancel")
args := []string{"Cancel"}
args = append(args, v.cancelFlags...)
v.exec(args...)
}

func (v VtctldReshard) Complete() {
v.exec("Complete")
args := []string{"Complete"}
args = append(args, v.completeFlags...)
v.exec(args...)
}

func (v VtctldReshard) GetLastOutput() string {
Expand Down
11 changes: 9 additions & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,7 +1672,11 @@
log.Errorf("%w", err2)
return nil, err
}
rs, err := s.buildResharder(ctx, keyspace, req.Workflow, req.SourceShards, req.TargetShards, strings.Join(cells, ","), "")
tabletTypesStr := topoproto.MakeStringTypeCSV(req.TabletTypes)
if req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER {
tabletTypesStr = discovery.InOrderHint + tabletTypesStr

Check warning on line 1677 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L1675-L1677

Added lines #L1675 - L1677 were not covered by tests
}
rs, err := s.buildResharder(ctx, keyspace, req.Workflow, req.SourceShards, req.TargetShards, strings.Join(cells, ","), tabletTypesStr)

Check warning on line 1679 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L1679

Added line #L1679 was not covered by tests
if err != nil {
return nil, vterrors.Wrap(err, "buildResharder")
}
Expand All @@ -1695,7 +1699,10 @@
} else {
log.Warningf("Streams will not be started since --auto-start is set to false")
}
return nil, nil
return s.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{
Keyspace: keyspace,
Workflow: req.Workflow,
})

Check warning on line 1705 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L1702-L1705

Added lines #L1702 - L1705 were not covered by tests
}

// VDiffCreate is part of the vtctlservicepb.VtctldServer interface.
Expand Down
Loading