From 054f445e2d2b0e7e47104757bf35dfd30f207d0b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 6 Sep 2024 11:32:48 +0200 Subject: [PATCH 1/9] Add unit test to confirm workflow state is set correctly based on routing rule state for given workflow Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/framework_test.go | 28 ++-- go/vt/vtctl/workflow/server_test.go | 11 +- go/vt/vtctl/workflow/workflow_state_test.go | 149 ++++++++++++++++++++ 3 files changed, 169 insertions(+), 19 deletions(-) create mode 100644 go/vt/vtctl/workflow/workflow_state_test.go diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index fb3c547964e..30979c98506 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -198,30 +198,36 @@ func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspac return tablet } -// addTableRoutingRules adds routing rules from the test env's source keyspace to -// its target keyspace for the given tablet types and tables. -func (env *testEnv) addTableRoutingRules(t *testing.T, ctx context.Context, tabletTypes []topodatapb.TabletType, tables []string) { - ks := env.targetKeyspace.KeyspaceName +func (env *testEnv) saveRoutingRules(t *testing.T, rules map[string][]string) { + err := topotools.SaveRoutingRules(context.Background(), env.ts, rules) + require.NoError(t, err) + err = env.ts.RebuildSrvVSchema(context.Background(), nil) + require.NoError(t, err) +} + +func (env *testEnv) updateTableRoutingRules(t *testing.T, ctx context.Context, + tabletTypes []topodatapb.TabletType, tables []string, toKeyspace string) { + + if len(tabletTypes) == 0 { + tabletTypes = []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY} + } rules := make(map[string][]string, len(tables)*(len(tabletTypes)*3)) for _, tabletType := range tabletTypes { for _, tableName := range tables { - toTarget := []string{ks + "." + tableName} + toTarget := []string{toKeyspace + "." + tableName} tt := strings.ToLower(tabletType.String()) if tabletType == topodatapb.TabletType_PRIMARY { rules[tableName] = toTarget - rules[ks+"."+tableName] = toTarget + rules[env.targetKeyspace.KeyspaceName+"."+tableName] = toTarget rules[env.sourceKeyspace.KeyspaceName+"."+tableName] = toTarget } else { rules[tableName+"@"+tt] = toTarget - rules[ks+"."+tableName+"@"+tt] = toTarget + rules[env.targetKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget rules[env.sourceKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget } } } - err := topotools.SaveRoutingRules(ctx, env.ts, rules) - require.NoError(t, err) - err = env.ts.RebuildSrvVSchema(ctx, nil) - require.NoError(t, err) + env.saveRoutingRules(t, rules) } func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) { diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index c9be9a4cc7e..a51bbf2ef35 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -416,11 +416,6 @@ func TestMoveTablesComplete(t *testing.T) { tableTemplate := "CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))" sourceKeyspaceName := "sourceks" targetKeyspaceName := "targetks" - tabletTypes := []topodatapb.TabletType{ - topodatapb.TabletType_PRIMARY, - topodatapb.TabletType_REPLICA, - topodatapb.TabletType_RDONLY, - } lockName := fmt.Sprintf("%s/%s", targetKeyspaceName, workflowName) schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ table1Name: { @@ -640,7 +635,7 @@ func TestMoveTablesComplete(t *testing.T) { tc.preFunc(t, env) } // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.addTableRoutingRules(t, ctx, tabletTypes, []string{table1Name, table2Name, table3Name}) + env.updateTableRoutingRules(t, ctx, nil, []string{table1Name, table2Name, table3Name}, tc.targetKeyspace.KeyspaceName) got, err := env.ws.MoveTablesComplete(ctx, tc.req) if tc.wantErr != "" { require.EqualError(t, err, tc.wantErr) @@ -1103,7 +1098,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { } else { env.tmc.reverse.Store(true) // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.addTableRoutingRules(t, ctx, tabletTypes, []string{tableName}) + env.updateTableRoutingRules(t, ctx, tabletTypes, []string{tableName}, tc.targetKeyspace.KeyspaceName) env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR) @@ -1317,7 +1312,7 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { } else { env.tmc.reverse.Store(true) // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.addTableRoutingRules(t, ctx, tabletTypes, tables) + env.updateTableRoutingRules(t, ctx, tabletTypes, tables, tc.targetKeyspace.KeyspaceName) env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) diff --git a/go/vt/vtctl/workflow/workflow_state_test.go b/go/vt/vtctl/workflow/workflow_state_test.go new file mode 100644 index 00000000000..039788c4993 --- /dev/null +++ b/go/vt/vtctl/workflow/workflow_state_test.go @@ -0,0 +1,149 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +func setupMoveTables(t *testing.T) (context.Context, *testEnv) { + ctx := context.Background() + schema := map[string]*tabletmanagerdata.SchemaDefinition{ + "t1": { + TableDefinitions: []*tabletmanagerdata.TableDefinition{ + { + Name: "t1", + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", "t1"), + }, + }, + }, + } + sourceKeyspace := &testKeyspace{ + KeyspaceName: "source", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "target", + ShardNames: []string{"0"}, + } + te := newTestEnv(t, ctx, "zone1", sourceKeyspace, targetKeyspace) + te.tmc.schema = schema + for k := range te.tablets { + for k2 := range te.tablets[k] { + fmt.Println(k, k2) + } + } + var wfs tabletmanagerdata.ReadVReplicationWorkflowsResponse + wfName := "wf1" + id := int32(1) + wfs.Workflows = append(wfs.Workflows, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ + Workflow: wfName, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + }) + wfs.Workflows[0].Streams = append(wfs.Workflows[0].Streams, &tabletmanagerdata.ReadVReplicationWorkflowResponse_Stream{ + Id: id, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: te.sourceKeyspace.KeyspaceName, + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "t1", Filter: "select * from t1"}, + }, + }, + Tables: []string{"t1"}, + }, + Pos: position, + State: binlogdatapb.VReplicationWorkflowState_Running, + }) + workflowKey := te.tmc.GetWorkflowKey("target", "wf1") + workflowResponses := []*tabletmanagerdata.ReadVReplicationWorkflowsResponse{ + nil, // this is the response for getting stopped workflows + &wfs, &wfs, &wfs, // return the full list for subsequent GetWorkflows calls + } + for _, resp := range workflowResponses { + te.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) + } + te.tmc.readVReplicationWorkflowRequests[200] = &tabletmanagerdata.ReadVReplicationWorkflowRequest{ + Workflow: wfName, + } + te.updateTableRoutingRules(t, ctx, nil, []string{"t1"}, te.sourceKeyspace.KeyspaceName) + return ctx, te +} + +func TestWorkflowStateMoveTables(t *testing.T) { + ctx, te := setupMoveTables(t) + require.NotNil(t, te) + type testCase struct { + name string + tabletTypes []topodatapb.TabletType + wantState string + } + testCases := []testCase{ + { + name: "switch reads", + tabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, + wantState: "All Reads Switched. Writes Not Switched", + }, + { + name: "switch writes", + tabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, + wantState: "Reads Not Switched. Writes Switched", + }, + { + name: "switch reads and writes", + tabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, + wantState: "All Reads Switched. Writes Switched", + }, + { + name: "switch rdonly only", + tabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, + wantState: "Reads partially switched. Replica not switched. All Rdonly Reads Switched. Writes Not Switched", + }, + { + name: "switch replica only", + tabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA}, + wantState: "Reads partially switched. All Replica Reads Switched. Rdonly not switched. Writes Not Switched", + }, + } + tables := []string{"t1"} + + getStateString := func() string { + tsw, state, err := te.ws.getWorkflowState(ctx, te.targetKeyspace.KeyspaceName, "wf1") + require.NoError(t, err) + require.NotNil(t, tsw) + require.NotNil(t, state) + return state.String() + } + initState := getStateString() + require.Equal(t, "Reads Not Switched. Writes Not Switched", initState) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + te.updateTableRoutingRules(t, ctx, tc.tabletTypes, tables, te.targetKeyspace.KeyspaceName) + require.Equal(t, tc.wantState, getStateString()) + // reset to initial state + te.updateTableRoutingRules(t, ctx, tc.tabletTypes, tables, te.sourceKeyspace.KeyspaceName) + }) + } +} From 00ae6d194cf592ab41e8ee4eafcfd9f4512cdb25 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 6 Sep 2024 12:40:47 +0200 Subject: [PATCH 2/9] Use rule for the source keyspace rather than the global routing one to check for where writes are routed Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 0a855e8094e..2b7c7c4c79c 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1027,9 +1027,9 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN return nil, nil, err } for _, table := range ts.Tables() { - rr := globalRules[table] - // If a rule exists for the table and points to the target keyspace, then - // writes have been switched. + // If a rule exists for any table and points the source to the target keyspace, + // then writes have been switched. + rr := globalRules[fmt.Sprintf("%s.%s", ts.sourceKeyspace, table)] if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", targetKeyspace, table) { state.WritesSwitched = true break From 0f1bcb4119c329320609fcd0665b6743be317243 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 6 Sep 2024 18:26:07 +0200 Subject: [PATCH 3/9] Improve logic to handle multiple workflows with independent source/target keyspaces. Add test to simulate this in routing rules Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/framework_test.go | 22 ++--- go/vt/vtctl/workflow/server.go | 15 ++-- go/vt/vtctl/workflow/server_test.go | 9 +- go/vt/vtctl/workflow/workflow_state_test.go | 98 +++++++++++++++------ go/vt/wrangler/traffic_switcher.go | 4 +- 5 files changed, 97 insertions(+), 51 deletions(-) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 30979c98506..44f7a6c6b04 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -206,24 +206,26 @@ func (env *testEnv) saveRoutingRules(t *testing.T, rules map[string][]string) { } func (env *testEnv) updateTableRoutingRules(t *testing.T, ctx context.Context, - tabletTypes []topodatapb.TabletType, tables []string, toKeyspace string) { + tabletTypes []topodatapb.TabletType, tables []string, sourceKeyspace, targetKeyspace, toKeyspace string) { if len(tabletTypes) == 0 { tabletTypes = []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY} } - rules := make(map[string][]string, len(tables)*(len(tabletTypes)*3)) + rr, err := env.ts.GetRoutingRules(ctx) + require.NoError(t, err) + rules := topotools.GetRoutingRulesMap(rr) for _, tabletType := range tabletTypes { for _, tableName := range tables { toTarget := []string{toKeyspace + "." + tableName} tt := strings.ToLower(tabletType.String()) if tabletType == topodatapb.TabletType_PRIMARY { rules[tableName] = toTarget - rules[env.targetKeyspace.KeyspaceName+"."+tableName] = toTarget - rules[env.sourceKeyspace.KeyspaceName+"."+tableName] = toTarget + rules[targetKeyspace+"."+tableName] = toTarget + rules[sourceKeyspace+"."+tableName] = toTarget } else { rules[tableName+"@"+tt] = toTarget - rules[env.targetKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget - rules[env.sourceKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget + rules[targetKeyspace+"."+tableName+"@"+tt] = toTarget + rules[sourceKeyspace+"."+tableName+"@"+tt] = toTarget } } } @@ -259,7 +261,7 @@ type testTMClient struct { mu sync.Mutex vrQueries map[int][]*queryResult createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest - readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest + readVReplicationWorkflowRequests map[string]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest primaryPositions map[uint32]string vdiffRequests map[uint32]*vdiffRequestResponse @@ -280,7 +282,7 @@ func newTestTMClient(env *testEnv) *testTMClient { schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), vrQueries: make(map[int][]*queryResult), createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), - readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), + readVReplicationWorkflowRequests: make(map[string]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse), primaryPositions: make(map[uint32]string), env: env, @@ -307,8 +309,8 @@ func (tmc *testTMClient) GetWorkflowKey(keyspace, shard string) string { func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { tmc.mu.Lock() defer tmc.mu.Unlock() - - if expect := tmc.readVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { + key := fmt.Sprintf("%d/%s", tablet.Alias.Uid, req.Workflow) + if expect := tmc.readVReplicationWorkflowRequests[key]; expect != nil { if !proto.Equal(expect, req) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ReadVReplicationWorkflow request: got %+v, want %+v", req, expect) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 2b7c7c4c79c..13afa5e2bc5 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -299,7 +299,8 @@ func (s *Server) GetCellsWithShardReadsSwitched( // keyspace. func (s *Server) GetCellsWithTableReadsSwitched( ctx context.Context, - keyspace string, + sourceKeyspace string, + targetKeyspace string, table string, tabletType topodatapb.TabletType, ) (cellsSwitched []string, cellsNotSwitched []string, err error) { @@ -329,7 +330,7 @@ func (s *Server) GetCellsWithTableReadsSwitched( ) for _, rule := range srvVSchema.RoutingRules.Rules { - ruleName := fmt.Sprintf("%s.%s@%s", keyspace, table, strings.ToLower(tabletType.String())) + ruleName := fmt.Sprintf("%s.%s@%s", sourceKeyspace, table, strings.ToLower(tabletType.String())) if rule.FromTable == ruleName { found = true @@ -340,7 +341,7 @@ func (s *Server) GetCellsWithTableReadsSwitched( return nil, nil, err } - if ks == keyspace { + if ks != sourceKeyspace { switched = true break // if one table in the workflow switched, we are done. } @@ -1013,12 +1014,12 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN } } } else { - state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_RDONLY) + state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_RDONLY) if err != nil { return nil, nil, err } - state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_REPLICA) + state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_REPLICA) if err != nil { return nil, nil, err } @@ -1027,10 +1028,10 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN return nil, nil, err } for _, table := range ts.Tables() { - // If a rule exists for any table and points the source to the target keyspace, + // If a rule for primary exists for any table and points to the target keyspace, // then writes have been switched. rr := globalRules[fmt.Sprintf("%s.%s", ts.sourceKeyspace, table)] - if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", targetKeyspace, table) { + if len(rr) > 0 && rr[0] != fmt.Sprintf("%s.%s", ts.sourceKeyspace, table) { state.WritesSwitched = true break } diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index a51bbf2ef35..8a86e89a412 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -635,7 +635,8 @@ func TestMoveTablesComplete(t *testing.T) { tc.preFunc(t, env) } // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.updateTableRoutingRules(t, ctx, nil, []string{table1Name, table2Name, table3Name}, tc.targetKeyspace.KeyspaceName) + env.updateTableRoutingRules(t, ctx, nil, []string{table1Name, table2Name, table3Name}, + tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName) got, err := env.ws.MoveTablesComplete(ctx, tc.req) if tc.wantErr != "" { require.EqualError(t, err, tc.wantErr) @@ -1098,7 +1099,8 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { } else { env.tmc.reverse.Store(true) // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.updateTableRoutingRules(t, ctx, tabletTypes, []string{tableName}, tc.targetKeyspace.KeyspaceName) + env.updateTableRoutingRules(t, ctx, tabletTypes, []string{tableName}, + tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName) env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR) @@ -1312,7 +1314,8 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { } else { env.tmc.reverse.Store(true) // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.updateTableRoutingRules(t, ctx, tabletTypes, tables, tc.targetKeyspace.KeyspaceName) + env.updateTableRoutingRules(t, ctx, tabletTypes, tables, + tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName) env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) diff --git a/go/vt/vtctl/workflow/workflow_state_test.go b/go/vt/vtctl/workflow/workflow_state_test.go index 039788c4993..55baa8b55ec 100644 --- a/go/vt/vtctl/workflow/workflow_state_test.go +++ b/go/vt/vtctl/workflow/workflow_state_test.go @@ -21,6 +21,8 @@ import ( "fmt" "testing" + "vitess.io/vitess/go/vt/log" + "github.com/stretchr/testify/require" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -61,6 +63,9 @@ func setupMoveTables(t *testing.T) (context.Context, *testEnv) { wfs.Workflows = append(wfs.Workflows, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ Workflow: wfName, WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + }, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ + Workflow: "wf2", + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, }) wfs.Workflows[0].Streams = append(wfs.Workflows[0].Streams, &tabletmanagerdata.ReadVReplicationWorkflowResponse_Stream{ Id: id, @@ -77,73 +82,108 @@ func setupMoveTables(t *testing.T) (context.Context, *testEnv) { Pos: position, State: binlogdatapb.VReplicationWorkflowState_Running, }) + wfs.Workflows[1].Streams = append(wfs.Workflows[1].Streams, &tabletmanagerdata.ReadVReplicationWorkflowResponse_Stream{ + Id: 2, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: "source2", + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "t1", Filter: "select * from t1"}, + }, + }, + Tables: []string{"t1"}, + }, + Pos: position, + State: binlogdatapb.VReplicationWorkflowState_Running, + }) + workflowKey := te.tmc.GetWorkflowKey("target", "wf1") workflowResponses := []*tabletmanagerdata.ReadVReplicationWorkflowsResponse{ - nil, // this is the response for getting stopped workflows - &wfs, &wfs, &wfs, // return the full list for subsequent GetWorkflows calls + nil, // this is the response for getting stopped workflows + &wfs, &wfs, &wfs, &wfs, &wfs, &wfs, // return the full list for subsequent GetWorkflows calls } for _, resp := range workflowResponses { te.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) } - te.tmc.readVReplicationWorkflowRequests[200] = &tabletmanagerdata.ReadVReplicationWorkflowRequest{ + te.tmc.readVReplicationWorkflowRequests["200/wf2"] = &tabletmanagerdata.ReadVReplicationWorkflowRequest{ + Workflow: "wf2", + } + te.tmc.readVReplicationWorkflowRequests["200/wf1"] = &tabletmanagerdata.ReadVReplicationWorkflowRequest{ Workflow: wfName, } - te.updateTableRoutingRules(t, ctx, nil, []string{"t1"}, te.sourceKeyspace.KeyspaceName) + te.updateTableRoutingRules(t, ctx, nil, []string{"t1"}, + "source", te.targetKeyspace.KeyspaceName, "source") + te.updateTableRoutingRules(t, ctx, nil, []string{"t1"}, + "source2", "target2", "source2") return ctx, te } func TestWorkflowStateMoveTables(t *testing.T) { ctx, te := setupMoveTables(t) require.NotNil(t, te) + rules, _ := te.ts.GetRoutingRules(ctx) + log.Infof("rules: %v", rules) type testCase struct { - name string - tabletTypes []topodatapb.TabletType - wantState string + name string + wf1SwitchedTabletTypes []topodatapb.TabletType + wf1ExpectedState string + wf2SwitchedTabletTypes []topodatapb.TabletType } testCases := []testCase{ { - name: "switch reads", - tabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, - wantState: "All Reads Switched. Writes Not Switched", + name: "switch reads", + wf1SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, + wf1ExpectedState: "All Reads Switched. Writes Not Switched", + wf2SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, }, { - name: "switch writes", - tabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, - wantState: "Reads Not Switched. Writes Switched", + name: "switch writes", + wf1SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, + wf1ExpectedState: "Reads Not Switched. Writes Switched", + wf2SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, }, { - name: "switch reads and writes", - tabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, - wantState: "All Reads Switched. Writes Switched", + name: "switch reads and writes", + wf1SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, + wf1ExpectedState: "All Reads Switched. Writes Switched", }, { - name: "switch rdonly only", - tabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, - wantState: "Reads partially switched. Replica not switched. All Rdonly Reads Switched. Writes Not Switched", + name: "switch rdonly only", + wf1SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, + wf1ExpectedState: "Reads partially switched. Replica not switched. All Rdonly Reads Switched. Writes Not Switched", + wf2SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, }, { - name: "switch replica only", - tabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA}, - wantState: "Reads partially switched. All Replica Reads Switched. Rdonly not switched. Writes Not Switched", + name: "switch replica only", + wf1SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA}, + wf1ExpectedState: "Reads partially switched. All Replica Reads Switched. Rdonly not switched. Writes Not Switched", + wf2SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, }, } tables := []string{"t1"} - getStateString := func() string { - tsw, state, err := te.ws.getWorkflowState(ctx, te.targetKeyspace.KeyspaceName, "wf1") + getStateString := func(targetKeyspace, wfName string) string { + tsw, state, err := te.ws.getWorkflowState(ctx, targetKeyspace, wfName) require.NoError(t, err) require.NotNil(t, tsw) require.NotNil(t, state) return state.String() } - initState := getStateString() - require.Equal(t, "Reads Not Switched. Writes Not Switched", initState) + require.Equal(t, "Reads Not Switched. Writes Not Switched", getStateString("target", "wf1")) + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - te.updateTableRoutingRules(t, ctx, tc.tabletTypes, tables, te.targetKeyspace.KeyspaceName) - require.Equal(t, tc.wantState, getStateString()) + te.updateTableRoutingRules(t, ctx, tc.wf1SwitchedTabletTypes, tables, + "source", te.targetKeyspace.KeyspaceName, te.targetKeyspace.KeyspaceName) + te.updateTableRoutingRules(t, ctx, tc.wf2SwitchedTabletTypes, tables, + "source2", "target2", "target2") + require.Equal(t, tc.wf1ExpectedState, getStateString("target", "wf1")) // reset to initial state - te.updateTableRoutingRules(t, ctx, tc.tabletTypes, tables, te.sourceKeyspace.KeyspaceName) + te.updateTableRoutingRules(t, ctx, nil, tables, + "source", te.targetKeyspace.KeyspaceName, "source") + te.updateTableRoutingRules(t, ctx, nil, tables, + "source2", "target2", "source2") }) } } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 6c6bbd933a7..d337c1ee515 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -278,12 +278,12 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl } } } else { - state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_RDONLY) + state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_RDONLY) if err != nil { return nil, nil, err } - state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_REPLICA) + state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_REPLICA) if err != nil { return nil, nil, err } From 16f6db7ee2864b65a282a0960efcb19cdb272036 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 6 Sep 2024 18:32:52 +0200 Subject: [PATCH 4/9] Remove unnecessary test setup Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/framework_test.go | 7 ++-- go/vt/vtctl/workflow/workflow_state_test.go | 46 ++++++--------------- 2 files changed, 16 insertions(+), 37 deletions(-) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 44f7a6c6b04..59653d7b906 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -261,7 +261,7 @@ type testTMClient struct { mu sync.Mutex vrQueries map[int][]*queryResult createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest - readVReplicationWorkflowRequests map[string]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest + readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest primaryPositions map[uint32]string vdiffRequests map[uint32]*vdiffRequestResponse @@ -282,7 +282,7 @@ func newTestTMClient(env *testEnv) *testTMClient { schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), vrQueries: make(map[int][]*queryResult), createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), - readVReplicationWorkflowRequests: make(map[string]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), + readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse), primaryPositions: make(map[uint32]string), env: env, @@ -309,8 +309,7 @@ func (tmc *testTMClient) GetWorkflowKey(keyspace, shard string) string { func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { tmc.mu.Lock() defer tmc.mu.Unlock() - key := fmt.Sprintf("%d/%s", tablet.Alias.Uid, req.Workflow) - if expect := tmc.readVReplicationWorkflowRequests[key]; expect != nil { + if expect := tmc.readVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { if !proto.Equal(expect, req) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ReadVReplicationWorkflow request: got %+v, want %+v", req, expect) } diff --git a/go/vt/vtctl/workflow/workflow_state_test.go b/go/vt/vtctl/workflow/workflow_state_test.go index 55baa8b55ec..ab2b161392a 100644 --- a/go/vt/vtctl/workflow/workflow_state_test.go +++ b/go/vt/vtctl/workflow/workflow_state_test.go @@ -52,16 +52,10 @@ func setupMoveTables(t *testing.T) (context.Context, *testEnv) { } te := newTestEnv(t, ctx, "zone1", sourceKeyspace, targetKeyspace) te.tmc.schema = schema - for k := range te.tablets { - for k2 := range te.tablets[k] { - fmt.Println(k, k2) - } - } var wfs tabletmanagerdata.ReadVReplicationWorkflowsResponse - wfName := "wf1" id := int32(1) wfs.Workflows = append(wfs.Workflows, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ - Workflow: wfName, + Workflow: "wf1", WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, }, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ Workflow: "wf2", @@ -82,21 +76,6 @@ func setupMoveTables(t *testing.T) (context.Context, *testEnv) { Pos: position, State: binlogdatapb.VReplicationWorkflowState_Running, }) - wfs.Workflows[1].Streams = append(wfs.Workflows[1].Streams, &tabletmanagerdata.ReadVReplicationWorkflowResponse_Stream{ - Id: 2, - Bls: &binlogdatapb.BinlogSource{ - Keyspace: "source2", - Shard: "0", - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{ - {Match: "t1", Filter: "select * from t1"}, - }, - }, - Tables: []string{"t1"}, - }, - Pos: position, - State: binlogdatapb.VReplicationWorkflowState_Running, - }) workflowKey := te.tmc.GetWorkflowKey("target", "wf1") workflowResponses := []*tabletmanagerdata.ReadVReplicationWorkflowsResponse{ @@ -106,16 +85,11 @@ func setupMoveTables(t *testing.T) (context.Context, *testEnv) { for _, resp := range workflowResponses { te.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) } - te.tmc.readVReplicationWorkflowRequests["200/wf2"] = &tabletmanagerdata.ReadVReplicationWorkflowRequest{ - Workflow: "wf2", - } - te.tmc.readVReplicationWorkflowRequests["200/wf1"] = &tabletmanagerdata.ReadVReplicationWorkflowRequest{ - Workflow: wfName, + te.tmc.readVReplicationWorkflowRequests[200] = &tabletmanagerdata.ReadVReplicationWorkflowRequest{ + Workflow: "wf1", } te.updateTableRoutingRules(t, ctx, nil, []string{"t1"}, "source", te.targetKeyspace.KeyspaceName, "source") - te.updateTableRoutingRules(t, ctx, nil, []string{"t1"}, - "source2", "target2", "source2") return ctx, te } @@ -128,6 +102,8 @@ func TestWorkflowStateMoveTables(t *testing.T) { name string wf1SwitchedTabletTypes []topodatapb.TabletType wf1ExpectedState string + // Simulate a second workflow to validate that the logic used to determine the state of the first workflow + // from the routing rules is not affected by the presence of other workflows in different states. wf2SwitchedTabletTypes []topodatapb.TabletType } testCases := []testCase{ @@ -172,6 +148,13 @@ func TestWorkflowStateMoveTables(t *testing.T) { } require.Equal(t, "Reads Not Switched. Writes Not Switched", getStateString("target", "wf1")) + resetRoutingRules := func() { + te.updateTableRoutingRules(t, ctx, nil, tables, + "source", te.targetKeyspace.KeyspaceName, "source") + te.updateTableRoutingRules(t, ctx, nil, tables, + "source2", "target2", "source2") + } + resetRoutingRules() for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { te.updateTableRoutingRules(t, ctx, tc.wf1SwitchedTabletTypes, tables, @@ -180,10 +163,7 @@ func TestWorkflowStateMoveTables(t *testing.T) { "source2", "target2", "target2") require.Equal(t, tc.wf1ExpectedState, getStateString("target", "wf1")) // reset to initial state - te.updateTableRoutingRules(t, ctx, nil, tables, - "source", te.targetKeyspace.KeyspaceName, "source") - te.updateTableRoutingRules(t, ctx, nil, tables, - "source2", "target2", "source2") + resetRoutingRules() }) } } From e074b449262a542493ec2d04cb3a4a529dadf91c Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 6 Sep 2024 19:44:50 +0200 Subject: [PATCH 5/9] Fix TestMirrorRules Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/server_test.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 8a86e89a412..7396e09c331 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -1354,6 +1354,15 @@ func TestMirrorTraffic(t *testing.T) { topodatapb.TabletType_RDONLY, } + initialRoutingRules := map[string][]string{ + fmt.Sprintf("%s.%s", sourceKs, table1): {fmt.Sprintf("%s.%s", sourceKs, table1)}, + fmt.Sprintf("%s.%s", sourceKs, table2): {fmt.Sprintf("%s.%s", sourceKs, table2)}, + fmt.Sprintf("%s.%s@replica", sourceKs, table1): {fmt.Sprintf("%s.%s@replica", sourceKs, table1)}, + fmt.Sprintf("%s.%s@replica", sourceKs, table2): {fmt.Sprintf("%s.%s@replica", sourceKs, table2)}, + fmt.Sprintf("%s.%s@rdonly", sourceKs, table1): {fmt.Sprintf("%s.%s@rdonly", sourceKs, table1)}, + fmt.Sprintf("%s.%s@rdonly", sourceKs, table2): {fmt.Sprintf("%s.%s@rdonly", sourceKs, table2)}, + } + tests := []struct { name string @@ -1441,8 +1450,8 @@ func TestMirrorTraffic(t *testing.T) { Percent: 50.0, }, routingRules: map[string][]string{ - fmt.Sprintf("%s.%s@rdonly", targetKs, table1): {fmt.Sprintf("%s.%s@rdonly", targetKs, table1)}, - fmt.Sprintf("%s.%s@rdonly", targetKs, table2): {fmt.Sprintf("%s.%s@rdonly", targetKs, table2)}, + fmt.Sprintf("%s.%s@rdonly", sourceKs, table1): {fmt.Sprintf("%s.%s@rdonly", targetKs, table1)}, + fmt.Sprintf("%s.%s@rdonly", sourceKs, table2): {fmt.Sprintf("%s.%s@rdonly", targetKs, table2)}, }, wantErr: "cannot mirror [rdonly] traffic for workflow src2target at this time: traffic for those tablet types is switched", wantMirrorRules: make(map[string]map[string]float32), @@ -1456,8 +1465,8 @@ func TestMirrorTraffic(t *testing.T) { Percent: 50.0, }, routingRules: map[string][]string{ - fmt.Sprintf("%s.%s@replica", targetKs, table1): {fmt.Sprintf("%s.%s@replica", targetKs, table1)}, - fmt.Sprintf("%s.%s@replica", targetKs, table2): {fmt.Sprintf("%s.%s@replica", targetKs, table2)}, + fmt.Sprintf("%s.%s@replica", sourceKs, table1): {fmt.Sprintf("%s.%s@replica", targetKs, table1)}, + fmt.Sprintf("%s.%s@replica", sourceKs, table2): {fmt.Sprintf("%s.%s@replica", targetKs, table2)}, }, wantErr: "cannot mirror [replica] traffic for workflow src2target at this time: traffic for those tablet types is switched", wantMirrorRules: make(map[string]map[string]float32), @@ -1471,8 +1480,8 @@ func TestMirrorTraffic(t *testing.T) { Percent: 50.0, }, routingRules: map[string][]string{ - table1: {fmt.Sprintf("%s.%s", targetKs, table1)}, - table2: {fmt.Sprintf("%s.%s", targetKs, table2)}, + fmt.Sprintf("%s.%s", sourceKs, table1): {fmt.Sprintf("%s.%s", targetKs, table1)}, + fmt.Sprintf("%s.%s", sourceKs, table2): {fmt.Sprintf("%s.%s", targetKs, table2)}, }, wantErr: "cannot mirror [primary] traffic for workflow src2target at this time: traffic for those tablet types is switched", wantMirrorRules: make(map[string]map[string]float32), @@ -1553,6 +1562,7 @@ func TestMirrorTraffic(t *testing.T) { TabletTypes: tabletTypes, Percent: 50.0, }, + routingRules: initialRoutingRules, wantMirrorRules: map[string]map[string]float32{ fmt.Sprintf("%s.%s", sourceKs, table1): { fmt.Sprintf("%s.%s", targetKs, table1): 50.0, @@ -1587,6 +1597,7 @@ func TestMirrorTraffic(t *testing.T) { TabletTypes: tabletTypes, Percent: 50.0, }, + routingRules: initialRoutingRules, wantMirrorRules: map[string]map[string]float32{ fmt.Sprintf("%s.%s", sourceKs, table1): { fmt.Sprintf("%s.%s", targetKs, table1): 50.0, @@ -1624,6 +1635,7 @@ func TestMirrorTraffic(t *testing.T) { fmt.Sprintf("%s.%s", targetKs, table1): 25.0, }, }, + routingRules: initialRoutingRules, req: &vtctldatapb.WorkflowMirrorTrafficRequest{ Keyspace: targetKs, Workflow: workflow, From 36b8e0b47567273b5cbcaa778b4ecd2e299ca276 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 7 Sep 2024 19:54:22 +0200 Subject: [PATCH 6/9] Fix logic for checking for route for writes Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 13afa5e2bc5..a1ee1b364a1 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1028,10 +1028,10 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN return nil, nil, err } for _, table := range ts.Tables() { - // If a rule for primary exists for any table and points to the target keyspace, + // If a rule for the primary tablet type exists for any table and points to the target keyspace, // then writes have been switched. - rr := globalRules[fmt.Sprintf("%s.%s", ts.sourceKeyspace, table)] - if len(rr) > 0 && rr[0] != fmt.Sprintf("%s.%s", ts.sourceKeyspace, table) { + rr := globalRules[fmt.Sprintf("%s.%s", sourceKeyspace, table)] + if len(rr) > 0 && rr[0] != fmt.Sprintf("%s.%s", sourceKeyspace, table) { state.WritesSwitched = true break } From d6c39dad62145db285eabd43e4be794f8cd99ebc Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 7 Sep 2024 22:56:43 +0200 Subject: [PATCH 7/9] Fix failing unit test Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/server.go | 4 ++++ go/vt/vttablet/tabletmanager/rpc_vreplication_test.go | 5 ++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index a1ee1b364a1..10b92c78456 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -945,6 +945,10 @@ ORDER BY }, nil } +func (s *Server) GetWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) { + return s.getWorkflowState(ctx, targetKeyspace, workflowName) +} + func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) { ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName) if err != nil { diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 7ab959c1e17..8a33b682b22 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -478,7 +478,6 @@ func TestMoveTables(t *testing.T) { AutoStart: true, }) require.NoError(t, err) - for _, ftc := range targetShards { ftc.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, wf), sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -524,13 +523,13 @@ func TestMoveTables(t *testing.T) { ), fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, sourceKs), ), nil) - sourceTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, workflow.ReverseWorkflowName(wf)), sqltypes.MakeTestResult( + sourceTablet.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, workflow.ReverseWorkflowName(wf)), sqltypes.MakeTestResult( sqltypes.MakeTestFields( "workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys", "workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64", ), fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", workflow.ReverseWorkflowName(wf), vreplID, bls, position, sourceKs), - ), nil) + )) sourceTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, wf, tenv.dbName), &sqltypes.Result{}, nil) _, err = ws.WorkflowSwitchTraffic(ctx, &vtctldatapb.WorkflowSwitchTrafficRequest{ From abfcc8d552a4f68b95b806ef0d3def5e01b8a991 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 8 Sep 2024 13:30:34 +0200 Subject: [PATCH 8/9] Self-review Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/workflow_state_test.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/go/vt/vtctl/workflow/workflow_state_test.go b/go/vt/vtctl/workflow/workflow_state_test.go index ab2b161392a..188965a3fc3 100644 --- a/go/vt/vtctl/workflow/workflow_state_test.go +++ b/go/vt/vtctl/workflow/workflow_state_test.go @@ -1,5 +1,5 @@ /* -Copyright 2021 The Vitess Authors. +Copyright 2024 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,8 +21,6 @@ import ( "fmt" "testing" - "vitess.io/vitess/go/vt/log" - "github.com/stretchr/testify/require" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -30,8 +28,7 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) -func setupMoveTables(t *testing.T) (context.Context, *testEnv) { - ctx := context.Background() +func setupMoveTables(t *testing.T, ctx context.Context) *testEnv { schema := map[string]*tabletmanagerdata.SchemaDefinition{ "t1": { TableDefinitions: []*tabletmanagerdata.TableDefinition{ @@ -90,14 +87,15 @@ func setupMoveTables(t *testing.T) (context.Context, *testEnv) { } te.updateTableRoutingRules(t, ctx, nil, []string{"t1"}, "source", te.targetKeyspace.KeyspaceName, "source") - return ctx, te + return te } +// TestWorkflowStateMoveTables tests the logic used to determine the state of a MoveTables workflow based on the +// routing rules. We setup two workflows with the same table in both source and target keyspaces. func TestWorkflowStateMoveTables(t *testing.T) { - ctx, te := setupMoveTables(t) + ctx := context.Background() + te := setupMoveTables(t, ctx) require.NotNil(t, te) - rules, _ := te.ts.GetRoutingRules(ctx) - log.Infof("rules: %v", rules) type testCase struct { name string wf1SwitchedTabletTypes []topodatapb.TabletType From a847429605452e40c3430ad8e85a07213e360766 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 10 Sep 2024 09:54:27 +0200 Subject: [PATCH 9/9] Address review comments Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/framework_test.go | 8 +++++++- go/vt/vtctl/workflow/server.go | 5 +++-- go/vt/vtctl/workflow/workflow_state_test.go | 8 +++----- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 59653d7b906..9dd91d9d4b7 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -58,6 +58,12 @@ const ( tabletUIDStep = 10 ) +var defaultTabletTypes = []topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, +} + type testKeyspace struct { KeyspaceName string ShardNames []string @@ -209,7 +215,7 @@ func (env *testEnv) updateTableRoutingRules(t *testing.T, ctx context.Context, tabletTypes []topodatapb.TabletType, tables []string, sourceKeyspace, targetKeyspace, toKeyspace string) { if len(tabletTypes) == 0 { - tabletTypes = []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY} + tabletTypes = defaultTabletTypes } rr, err := env.ts.GetRoutingRules(ctx) require.NoError(t, err) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 10b92c78456..6a48cf90c2e 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1034,8 +1034,9 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN for _, table := range ts.Tables() { // If a rule for the primary tablet type exists for any table and points to the target keyspace, // then writes have been switched. - rr := globalRules[fmt.Sprintf("%s.%s", sourceKeyspace, table)] - if len(rr) > 0 && rr[0] != fmt.Sprintf("%s.%s", sourceKeyspace, table) { + ruleKey := fmt.Sprintf("%s.%s", sourceKeyspace, table) + rr := globalRules[ruleKey] + if len(rr) > 0 && rr[0] != ruleKey { state.WritesSwitched = true break } diff --git a/go/vt/vtctl/workflow/workflow_state_test.go b/go/vt/vtctl/workflow/workflow_state_test.go index 188965a3fc3..d64b6b36a86 100644 --- a/go/vt/vtctl/workflow/workflow_state_test.go +++ b/go/vt/vtctl/workflow/workflow_state_test.go @@ -119,7 +119,7 @@ func TestWorkflowStateMoveTables(t *testing.T) { }, { name: "switch reads and writes", - wf1SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, + wf1SwitchedTabletTypes: defaultTabletTypes, wf1ExpectedState: "All Reads Switched. Writes Switched", }, { @@ -132,7 +132,7 @@ func TestWorkflowStateMoveTables(t *testing.T) { name: "switch replica only", wf1SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA}, wf1ExpectedState: "Reads partially switched. All Replica Reads Switched. Rdonly not switched. Writes Not Switched", - wf2SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, + wf2SwitchedTabletTypes: defaultTabletTypes, }, } tables := []string{"t1"} @@ -152,16 +152,14 @@ func TestWorkflowStateMoveTables(t *testing.T) { te.updateTableRoutingRules(t, ctx, nil, tables, "source2", "target2", "source2") } - resetRoutingRules() for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + resetRoutingRules() te.updateTableRoutingRules(t, ctx, tc.wf1SwitchedTabletTypes, tables, "source", te.targetKeyspace.KeyspaceName, te.targetKeyspace.KeyspaceName) te.updateTableRoutingRules(t, ctx, tc.wf2SwitchedTabletTypes, tables, "source2", "target2", "target2") require.Equal(t, tc.wf1ExpectedState, getStateString("target", "wf1")) - // reset to initial state - resetRoutingRules() }) } }