Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Support reversing read-only traffic in vtctldclient #16920

Merged
merged 32 commits into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3a59aac
Only block workflow cancel if writes have been switched
mattlord Oct 8, 2024
a48daf6
Support reversetraffic for read-only tablets
mattlord Oct 8, 2024
1d53be5
Revert cancel specific change
mattlord Oct 10, 2024
0cea47d
Add unit test case
mattlord Oct 10, 2024
fbd7ad6
Fixes and improvements
mattlord Oct 10, 2024
cdb5323
Cleanup and unify traffic switching handling
mattlord Oct 10, 2024
0e4e39b
Shrink diff
mattlord Oct 10, 2024
8297aae
Adjust and add to dry run unit test
mattlord Oct 10, 2024
07aca76
Fix e2e test
mattlord Oct 10, 2024
6984f5a
Merge remote-tracking branch 'origin/main' into vrepl_reads_switched_…
mattlord Oct 11, 2024
d83164a
Fix e2e test
mattlord Oct 11, 2024
ef645d2
Fix switchShardReads for reverse
mattlord Oct 12, 2024
e8622d6
Changes from self review
mattlord Oct 12, 2024
0ed6965
Incorporate additional tests from Rohit
mattlord Oct 14, 2024
c264a25
Fixes from new tests
mattlord Oct 14, 2024
dec4aa4
Corrections and improvements
mattlord Oct 14, 2024
20a6f7c
Minimize changes
mattlord Oct 14, 2024
4dd57d3
Line up wrangler and workflow impls
mattlord Oct 14, 2024
f10bb1e
Minor changes
mattlord Oct 14, 2024
4d2a992
Add workflow to log message
mattlord Oct 14, 2024
e5b45dc
Remove accidental change
mattlord Oct 14, 2024
a629cbb
Preserve previous TestMoveTablesBuffering behavior
mattlord Oct 14, 2024
7628c57
Remove unnecessary test changes
mattlord Oct 14, 2024
dba55f8
Merge remote-tracking branch 'origin/main' into vrepl_reads_switched_…
mattlord Oct 14, 2024
394ce81
Restore globals
mattlord Oct 14, 2024
9dbb24e
Check workflow states and do a final vdiff in new tests
mattlord Oct 14, 2024
8f241e8
Update TestBasicV2Workflows so that it fails on main
mattlord Oct 15, 2024
3fdfd6e
Merge remote-tracking branch 'origin/main' into vrepl_reads_switched_…
mattlord Oct 15, 2024
e8edd10
kick CI
mattlord Oct 15, 2024
118e0b3
Restore unintentionally deleted condition
mattlord Oct 16, 2024
3fa88df
Fix validateRoutingRule which had a bug causing the wrong value to be…
rohit-nayak-ps Oct 19, 2024
e9685d5
Minor changes after final self review
mattlord Oct 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,11 @@ func executeOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletPro

func assertQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
t.Helper()
rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules")
require.NoError(t, err)
count0, body0, count1, body1 := executeOnTablet(t, conn, tablet, ksName, query, matchQuery)
assert.Equalf(t, count0+1, count1, "query %q did not execute in target;\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\n", query, matchQuery, body0, body1)
require.Equalf(t, count0+1, count1, "query %q did not execute on destination %s (%s-%d);\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\nrouting rules:\n%s\n\n",
query, ksName, tablet.Cell, tablet.TabletUID, matchQuery, body0, body1, rr)
}

func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ import (
)

func TestMoveTablesBuffering(t *testing.T) {
defaultRdonly = 1
ogReplicas := defaultReplicas
ogRdOnly := defaultRdonly
defer func() {
defaultReplicas = ogReplicas
defaultRdonly = ogRdOnly
}()
defaultRdonly = 0
defaultReplicas = 0
vc = setupMinimalCluster(t)
defer vc.TearDown()

Expand Down
129 changes: 94 additions & 35 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math/rand/v2"
"net"
"slices"
"strconv"
"strings"
"sync"
Expand All @@ -35,9 +36,11 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

Expand Down Expand Up @@ -169,9 +172,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
args = append(args, "--tablet-types", tabletTypes)
}
args = append(args, "--action_timeout=10m") // At this point something is up so fail the test
if debugMode {
t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " "))
}
t.Logf("Executing workflow command: vtctldclient %s", strings.Join(args, " "))
output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
lastOutput = output
if err != nil {
Expand Down Expand Up @@ -334,33 +335,44 @@ func tstWorkflowCancel(t *testing.T) error {
return tstWorkflowAction(t, workflowActionCancel, "", "")
}

func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.VttabletProcess) {
func validateReadsRoute(t *testing.T, tabletType string, tablet *cluster.VttabletProcess) {
if tablet == nil {
return
}
if tabletTypes == "" {
tabletTypes = "replica,rdonly"
}
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
for _, tt := range []string{"replica", "rdonly"} {
destination := fmt.Sprintf("%s:%s@%s", tablet.Keyspace, tablet.Shard, tt)
if strings.Contains(tabletTypes, tt) {
readQuery := "select cid from customer limit 10"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1")
}
}
// We do NOT want to target a shard as that goes around the routing rules and
// defeats the purpose here. We are using a query w/o a WHERE clause so for
// sharded keyspaces it should hit all shards as a SCATTER query. So all we
// care about is the keyspace and tablet type.
destination := fmt.Sprintf("%s@%s", tablet.Keyspace, strings.ToLower(tabletType))
readQuery := "select cid from customer limit 50"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1")
}

func validateReadsRouteToSource(t *testing.T, tabletTypes string) {
if sourceReplicaTab != nil {
validateReadsRoute(t, tabletTypes, sourceReplicaTab)
tt, err := topoproto.ParseTabletTypes(tabletTypes)
require.NoError(t, err)
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
require.NotNil(t, sourceReplicaTab)
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), sourceReplicaTab)
}
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
require.NotNil(t, sourceRdonlyTab)
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), sourceRdonlyTab)
}
}

func validateReadsRouteToTarget(t *testing.T, tabletTypes string) {
if targetReplicaTab1 != nil {
validateReadsRoute(t, tabletTypes, targetReplicaTab1)
tt, err := topoproto.ParseTabletTypes(tabletTypes)
require.NoError(t, err)
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
require.NotNil(t, targetReplicaTab1)
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), targetReplicaTab1)
}
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
require.NotNil(t, targetRdonlyTab1)
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), targetRdonlyTab1)
}
}

Expand Down Expand Up @@ -411,6 +423,13 @@ func getCurrentStatus(t *testing.T) string {
// but CI currently fails on creating multiple clusters even after the previous ones are torn down

func TestBasicV2Workflows(t *testing.T) {
ogReplicas := defaultReplicas
ogRdOnly := defaultRdonly
defer func() {
defaultReplicas = ogReplicas
defaultRdonly = ogRdOnly
}()
defaultReplicas = 1
defaultRdonly = 1
extraVTTabletArgs = []string{
parallelInsertWorkers,
Expand Down Expand Up @@ -664,7 +683,7 @@ func testMoveTablesV2Workflow(t *testing.T) {
// If it's not then we'll get an error as the table doesn't exist in the vschema
createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431")
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

// Verify that we've properly ignored any internal operational tables
Expand Down Expand Up @@ -725,6 +744,12 @@ func testPartialSwitches(t *testing.T) {
tstWorkflowSwitchReads(t, "", "")
checkStates(t, nextState, nextState) // idempotency

tstWorkflowReverseReads(t, "replica,rdonly", "")
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

tstWorkflowSwitchReads(t, "", "")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)

tstWorkflowSwitchWrites(t)
currentState = nextState
nextState = wrangler.WorkflowStateAllSwitched
Expand Down Expand Up @@ -771,12 +796,12 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReads(t, "", "")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowSwitchWrites(t)
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateAllSwitched)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)

// this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces
Expand All @@ -787,42 +812,45 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseReads(t, "", "")
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToTarget(t)

tstWorkflowReverseWrites(t)
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchWrites(t)
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateWritesSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToTarget(t)

waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseWrites(t)
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReads(t, "", "")
validateReadsRouteToTarget(t, "replica")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowReverseReads(t, "", "")
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowSwitchReadsAndWrites(t)
validateReadsRouteToTarget(t, "replica")
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)
waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseReadsAndWrites(t)
validateReadsRoute(t, "rdonly", sourceRdonlyTab)
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

// trying to complete an unswitched workflow should error
Expand All @@ -835,8 +863,7 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, "customer", "customer_name")
waitForLowLag(t, "customer", "enterprise_customer")
tstWorkflowSwitchReadsAndWrites(t)
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)

err = tstWorkflowComplete(t)
Expand Down Expand Up @@ -899,7 +926,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {

zone1 := vc.Cells["zone1"]

vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)

verifyClusterHealth(t, vc)
insertInitialData(t)
Expand All @@ -912,7 +939,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {
func setupMinimalCustomerKeyspace(t *testing.T) map[string]*cluster.VttabletProcess {
tablets := make(map[string]*cluster.VttabletProcess)
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, "customer", "-80,80-",
customerVSchema, customerSchema, 0, 0, 200, nil); err != nil {
customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, nil); err != nil {
t.Fatal(err)
}
defaultCell := vc.Cells[vc.CellNames[0]]
Expand Down Expand Up @@ -1048,6 +1075,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) {
targetTab2 = custKs.Shards["80-c0"].Tablets["zone1-600"].Vttablet
targetTab1 = custKs.Shards["40-80"].Tablets["zone1-500"].Vttablet
targetReplicaTab1 = custKs.Shards["-40"].Tablets["zone1-401"].Vttablet
targetRdonlyTab1 = custKs.Shards["-40"].Tablets["zone1-402"].Vttablet

sourceTab = custKs.Shards["-80"].Tablets["zone1-200"].Vttablet
sourceReplicaTab = custKs.Shards["-80"].Tablets["zone1-201"].Vttablet
Expand All @@ -1059,3 +1087,34 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) {
"--sql", sql, keyspace)
require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err))
}

func validateTableRoutingRule(t *testing.T, table, tabletType, fromKeyspace, toKeyspace string) {
tabletType = strings.ToLower(strings.TrimSpace(tabletType))
rr := getRoutingRules(t)
// We set matched = true by default because it is possible, if --no-routing-rules is set while creating
// a workflow, that the routing rules are empty when the workflow starts.
// We set it to false below when the rule is found, but before matching the routed keyspace.
matched := true
for _, r := range rr.GetRules() {
fromRule := fmt.Sprintf("%s.%s", fromKeyspace, table)
if tabletType != "" && tabletType != "primary" {
fromRule = fmt.Sprintf("%s@%s", fromRule, tabletType)
}
if r.FromTable == fromRule {
// We found the rule, so we can set matched to false here and check for the routed keyspace below.
matched = false
require.NotEmpty(t, r.ToTables)
toTable := r.ToTables[0]
// The ToTables value is of the form "routedKeyspace.table".
routedKeyspace, routedTable, ok := strings.Cut(toTable, ".")
require.True(t, ok)
require.Equal(t, table, routedTable)
if routedKeyspace == toKeyspace {
// We found the rule, the table and keyspace matches, so our search is done.
matched = true
break
}
}
}
require.Truef(t, matched, "routing rule for %s.%s from %s to %s not found", fromKeyspace, table, tabletType, toKeyspace)
}
Loading
Loading