Skip to content

Commit

Permalink
Cherry-pick df1285c with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] committed Mar 27, 2024
1 parent 1ef05bd commit 6b1046e
Show file tree
Hide file tree
Showing 12 changed files with 261 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func commandSwitchTraffic(cmd *cobra.Command, args []string) error {
req := &vtctldatapb.WorkflowSwitchTrafficRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
Cells: SwitchTrafficOptions.Cells,
TabletTypes: SwitchTrafficOptions.TabletTypes,
MaxReplicationLagAllowed: protoutil.DurationToProto(SwitchTrafficOptions.MaxReplicationLagAllowed),
Timeout: protoutil.DurationToProto(SwitchTrafficOptions.Timeout),
Expand Down
8 changes: 4 additions & 4 deletions go/streamlog/streamlog_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestFile(t *testing.T) {
logger.Send(&logMessage{"test 2"})

// Allow time for propagation
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

want := "test 1\ntest 2\n"
contents, _ := os.ReadFile(logPath)
Expand All @@ -227,7 +227,7 @@ func TestFile(t *testing.T) {
os.Rename(logPath, rotatedPath)

logger.Send(&logMessage{"test 3"})
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

want = "test 1\ntest 2\ntest 3\n"
contents, _ = os.ReadFile(rotatedPath)
Expand All @@ -241,10 +241,10 @@ func TestFile(t *testing.T) {
if err := syscall.Kill(syscall.Getpid(), syscall.SIGUSR2); err != nil {
t.Logf("failed to send streamlog rotate signal: %v", err)
}
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

logger.Send(&logMessage{"test 4"})
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

want = "test 1\ntest 2\ntest 3\n"
contents, _ = os.ReadFile(rotatedPath)
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ func VttabletProcessInstance(port, grpcPort, tabletUID int, cell, shard, keyspac
Binary: "vttablet",
FileToLogQueries: path.Join(tmpDirectory, fmt.Sprintf("/vt_%010d_querylog.txt", tabletUID)),
Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)),
Cell: cell,
TabletPath: fmt.Sprintf("%s-%010d", cell, tabletUID),
ServiceMap: "grpc-queryservice,grpc-tabletmanager,grpc-updatestream,grpc-throttler",
LogDir: tmpDirectory,
Expand Down
13 changes: 12 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
Expand Down Expand Up @@ -318,6 +320,9 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
}
}
}
if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && attributeValue.Get("Pos").String() == "" {
done = false
}
} else {
done = false
}
Expand Down Expand Up @@ -351,7 +356,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
// as a CSV have secondary keys. This is useful when testing the
// --defer-secondary-keys flag to confirm that the secondary keys
// were re-added by the time the workflow hits the running phase.
// For a Reshard workflow, where no tables are specififed, pass
// For a Reshard workflow, where no tables are specified, pass
// an empty string for the tables and all tables in the target
// keyspace will be checked.
func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletProcess, ksName string, tables string) {
Expand All @@ -371,6 +376,12 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro
}
}
for _, tablet := range tablets {
// Be sure that the schema is up to date.
err := vc.VtctldClient.ExecuteCommand("ReloadSchema", topoproto.TabletAliasString(&topodatapb.TabletAlias{
Cell: tablet.Cell,
Uid: uint32(tablet.TabletUID),
}))
require.NoError(t, err)
for _, table := range tableArr {
if schema.IsInternalOperationTableName(table) {
continue
Expand Down
3 changes: 1 addition & 2 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"
)

func TestMoveTablesBuffering(t *testing.T) {
Expand All @@ -17,7 +16,7 @@ func TestMoveTablesBuffering(t *testing.T) {
defer vtgateConn.Close()
defer vc.TearDown(t)

currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
setupMinimalCustomerKeyspace(t)
tables := "loadtest"
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
Expand Down
9 changes: 4 additions & 5 deletions go/test/endtoend/vreplication/partial_movetables_seq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand Down Expand Up @@ -236,12 +235,12 @@ func (wf *workflow) create() {
cell := wf.tc.defaultCellName
switch typ {
case "movetables":
currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
sourceShards := strings.Join(wf.options.sourceShards, ",")
err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace,
strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", false)
case "reshard":
currentWorkflowType = wrangler.ReshardWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard
sourceShards := strings.Join(wf.options.sourceShards, ",")
targetShards := strings.Join(wf.options.targetShards, ",")
if targetShards == "" {
Expand Down Expand Up @@ -395,7 +394,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {

// Switch all traffic for the shard
wf80Dash.switchTraffic()
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
targetKs, wfName, shard, shard)
require.Equal(t, expectedSwitchOutput, lastOutput)
currentCustomerCount = getCustomerCount(t, "")
Expand Down Expand Up @@ -455,7 +454,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
wfDash80.create()
wfDash80.switchTraffic()

expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
targetKs, wfName)
require.Equal(t, expectedSwitchOutput, lastOutput)

Expand Down
Loading

0 comments on commit 6b1046e

Please sign in to comment.