Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Fix vtctldclient SwitchReads related bugs and move the TestBasicV2Workflows e2e test to vtctldclient #15579

Merged
merged 8 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func commandSwitchTraffic(cmd *cobra.Command, args []string) error {
req := &vtctldatapb.WorkflowSwitchTrafficRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
Cells: SwitchTrafficOptions.Cells,
TabletTypes: SwitchTrafficOptions.TabletTypes,
MaxReplicationLagAllowed: protoutil.DurationToProto(SwitchTrafficOptions.MaxReplicationLagAllowed),
Timeout: protoutil.DurationToProto(SwitchTrafficOptions.Timeout),
Expand Down
8 changes: 4 additions & 4 deletions go/streamlog/streamlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestFile(t *testing.T) {
logger.Send(&logMessage{"test 2"})

// Allow time for propagation
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

want := "test 1\ntest 2\n"
contents, _ := os.ReadFile(logPath)
Expand All @@ -230,7 +230,7 @@ func TestFile(t *testing.T) {
os.Rename(logPath, rotatedPath)

logger.Send(&logMessage{"test 3"})
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

want = "test 1\ntest 2\ntest 3\n"
contents, _ = os.ReadFile(rotatedPath)
Expand All @@ -244,10 +244,10 @@ func TestFile(t *testing.T) {
if err := syscall.Kill(syscall.Getpid(), syscall.SIGUSR2); err != nil {
t.Logf("failed to send streamlog rotate signal: %v", err)
}
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

logger.Send(&logMessage{"test 4"})
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

want = "test 1\ntest 2\ntest 3\n"
contents, _ = os.ReadFile(rotatedPath)
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ func VttabletProcessInstance(port, grpcPort, tabletUID int, cell, shard, keyspac
Binary: "vttablet",
FileToLogQueries: path.Join(tmpDirectory, fmt.Sprintf("/vt_%010d_querylog.txt", tabletUID)),
Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)),
Cell: cell,
TabletPath: fmt.Sprintf("%s-%010d", cell, tabletUID),
ServiceMap: "grpc-queryservice,grpc-tabletmanager,grpc-updatestream,grpc-throttler",
LogDir: tmpDirectory,
Expand Down
13 changes: 12 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
Expand Down Expand Up @@ -377,6 +379,9 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
}
}
}
if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && attributeValue.Get("Pos").String() == "" {
done = false
}
} else {
done = false
}
Expand Down Expand Up @@ -410,7 +415,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
// as a CSV have secondary keys. This is useful when testing the
// --defer-secondary-keys flag to confirm that the secondary keys
// were re-added by the time the workflow hits the running phase.
// For a Reshard workflow, where no tables are specififed, pass
// For a Reshard workflow, where no tables are specified, pass
// an empty string for the tables and all tables in the target
// keyspace will be checked.
func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletProcess, ksName string, tables string) {
Expand All @@ -430,6 +435,12 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro
}
}
for _, tablet := range tablets {
// Be sure that the schema is up to date.
err := vc.VtctldClient.ExecuteCommand("ReloadSchema", topoproto.TabletAliasString(&topodatapb.TabletAlias{
Cell: tablet.Cell,
Uid: uint32(tablet.TabletUID),
}))
require.NoError(t, err)
for _, table := range tableArr {
if schema.IsInternalOperationTableName(table) {
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"
)

func TestMoveTablesBuffering(t *testing.T) {
defaultRdonly = 1
vc = setupMinimalCluster(t)
defer vc.TearDown()

currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
setupMinimalCustomerKeyspace(t)
tables := "loadtest"
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand Down Expand Up @@ -228,12 +227,12 @@ func (wf *workflow) create() {
cell := wf.tc.defaultCellName
switch typ {
case "movetables":
currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
sourceShards := strings.Join(wf.options.sourceShards, ",")
err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace,
strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", defaultWorkflowExecOptions)
case "reshard":
currentWorkflowType = wrangler.ReshardWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard
sourceShards := strings.Join(wf.options.sourceShards, ",")
targetShards := strings.Join(wf.options.targetShards, ",")
if targetShards == "" {
Expand Down Expand Up @@ -389,7 +388,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {

// Switch all traffic for the shard
wf80Dash.switchTraffic()
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
targetKs, wfName, shard, shard)
require.Equal(t, expectedSwitchOutput, lastOutput)
currentCustomerCount = getCustomerCount(t, "")
Expand Down Expand Up @@ -449,7 +448,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
wfDash80.create()
wfDash80.switchTraffic()

expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
targetKs, wfName)
require.Equal(t, expectedSwitchOutput, lastOutput)

Expand Down
Loading
Loading