Skip to content

Commit

Permalink
[release-19.0] VReplication: Move the Reshard v2 workflow to vtctldcl…
Browse files Browse the repository at this point in the history
…ient (#15579) (#15584)

Signed-off-by: Matt Lord <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
  • Loading branch information
vitess-bot[bot] authored Mar 27, 2024
1 parent 3ce8f87 commit 52ef808
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func commandSwitchTraffic(cmd *cobra.Command, args []string) error {
req := &vtctldatapb.WorkflowSwitchTrafficRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
Cells: SwitchTrafficOptions.Cells,
TabletTypes: SwitchTrafficOptions.TabletTypes,
MaxReplicationLagAllowed: protoutil.DurationToProto(SwitchTrafficOptions.MaxReplicationLagAllowed),
Timeout: protoutil.DurationToProto(SwitchTrafficOptions.Timeout),
Expand Down
8 changes: 4 additions & 4 deletions go/streamlog/streamlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestFile(t *testing.T) {
logger.Send(&logMessage{"test 2"})

// Allow time for propagation
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

want := "test 1\ntest 2\n"
contents, _ := os.ReadFile(logPath)
Expand All @@ -227,7 +227,7 @@ func TestFile(t *testing.T) {
os.Rename(logPath, rotatedPath)

logger.Send(&logMessage{"test 3"})
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

want = "test 1\ntest 2\ntest 3\n"
contents, _ = os.ReadFile(rotatedPath)
Expand All @@ -241,10 +241,10 @@ func TestFile(t *testing.T) {
if err := syscall.Kill(syscall.Getpid(), syscall.SIGUSR2); err != nil {
t.Logf("failed to send streamlog rotate signal: %v", err)
}
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

logger.Send(&logMessage{"test 4"})
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

want = "test 1\ntest 2\ntest 3\n"
contents, _ = os.ReadFile(rotatedPath)
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ func VttabletProcessInstance(port, grpcPort, tabletUID int, cell, shard, keyspac
Binary: "vttablet",
FileToLogQueries: path.Join(tmpDirectory, fmt.Sprintf("/vt_%010d_querylog.txt", tabletUID)),
Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)),
Cell: cell,
TabletPath: fmt.Sprintf("%s-%010d", cell, tabletUID),
ServiceMap: "grpc-queryservice,grpc-tabletmanager,grpc-updatestream,grpc-throttler",
LogDir: tmpDirectory,
Expand Down
13 changes: 12 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
Expand Down Expand Up @@ -377,6 +379,9 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
}
}
}
if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && attributeValue.Get("Pos").String() == "" {
done = false
}
} else {
done = false
}
Expand Down Expand Up @@ -410,7 +415,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
// as a CSV have secondary keys. This is useful when testing the
// --defer-secondary-keys flag to confirm that the secondary keys
// were re-added by the time the workflow hits the running phase.
// For a Reshard workflow, where no tables are specififed, pass
// For a Reshard workflow, where no tables are specified, pass
// an empty string for the tables and all tables in the target
// keyspace will be checked.
func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletProcess, ksName string, tables string) {
Expand All @@ -430,6 +435,12 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro
}
}
for _, tablet := range tablets {
// Be sure that the schema is up to date.
err := vc.VtctldClient.ExecuteCommand("ReloadSchema", topoproto.TabletAliasString(&topodatapb.TabletAlias{
Cell: tablet.Cell,
Uid: uint32(tablet.TabletUID),
}))
require.NoError(t, err)
for _, table := range tableArr {
if schema.IsInternalOperationTableName(table) {
continue
Expand Down
3 changes: 1 addition & 2 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"
)

func TestMoveTablesBuffering(t *testing.T) {
defaultRdonly = 1
vc = setupMinimalCluster(t)
defer vc.TearDown()

currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
setupMinimalCustomerKeyspace(t)
tables := "loadtest"
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
Expand Down
9 changes: 4 additions & 5 deletions go/test/endtoend/vreplication/partial_movetables_seq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand Down Expand Up @@ -228,12 +227,12 @@ func (wf *workflow) create() {
cell := wf.tc.defaultCellName
switch typ {
case "movetables":
currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
sourceShards := strings.Join(wf.options.sourceShards, ",")
err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace,
strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", defaultWorkflowExecOptions)
case "reshard":
currentWorkflowType = wrangler.ReshardWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard
sourceShards := strings.Join(wf.options.sourceShards, ",")
targetShards := strings.Join(wf.options.targetShards, ",")
if targetShards == "" {
Expand Down Expand Up @@ -389,7 +388,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {

// Switch all traffic for the shard
wf80Dash.switchTraffic()
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
targetKs, wfName, shard, shard)
require.Equal(t, expectedSwitchOutput, lastOutput)
currentCustomerCount = getCustomerCount(t, "")
Expand Down Expand Up @@ -449,7 +448,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
wfDash80.create()
wfDash80.switchTraffic()

expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
targetKs, wfName)
require.Equal(t, expectedSwitchOutput, lastOutput)

Expand Down
Loading

0 comments on commit 52ef808

Please sign in to comment.