From 490bb0c3181a2e5eeeaa147ae69b30b0c199ef5e Mon Sep 17 00:00:00 2001 From: Rohit Nayak <57520317+rohit-nayak-ps@users.noreply.github.com> Date: Fri, 20 Sep 2024 12:43:08 +0200 Subject: [PATCH] Workflow Status: change logic to determine whether `MoveTables` writes are switched (#16731) Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/framework_test.go | 43 +++-- go/vt/vtctl/workflow/server.go | 24 ++- go/vt/vtctl/workflow/server_test.go | 38 ++-- go/vt/vtctl/workflow/workflow_state_test.go | 165 ++++++++++++++++++ .../tabletmanager/rpc_vreplication_test.go | 5 +- go/vt/wrangler/traffic_switcher.go | 4 +- 6 files changed, 236 insertions(+), 43 deletions(-) create mode 100644 go/vt/vtctl/workflow/workflow_state_test.go diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 5c1bdfbd9c0..f3bf5869ab1 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -58,6 +58,12 @@ const ( tabletUIDStep = 10 ) +var defaultTabletTypes = []topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, +} + type testKeyspace struct { KeyspaceName string ShardNames []string @@ -199,30 +205,38 @@ func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspac return tablet } -// addTableRoutingRules adds routing rules from the test env's source keyspace to -// its target keyspace for the given tablet types and tables. -func (env *testEnv) addTableRoutingRules(t *testing.T, ctx context.Context, tabletTypes []topodatapb.TabletType, tables []string) { - ks := env.targetKeyspace.KeyspaceName - rules := make(map[string][]string, len(tables)*(len(tabletTypes)*3)) +func (env *testEnv) saveRoutingRules(t *testing.T, rules map[string][]string) { + err := topotools.SaveRoutingRules(context.Background(), env.ts, rules) + require.NoError(t, err) + err = env.ts.RebuildSrvVSchema(context.Background(), nil) + require.NoError(t, err) +} + +func (env *testEnv) updateTableRoutingRules(t *testing.T, ctx context.Context, + tabletTypes []topodatapb.TabletType, tables []string, sourceKeyspace, targetKeyspace, toKeyspace string) { + + if len(tabletTypes) == 0 { + tabletTypes = defaultTabletTypes + } + rr, err := env.ts.GetRoutingRules(ctx) + require.NoError(t, err) + rules := topotools.GetRoutingRulesMap(rr) for _, tabletType := range tabletTypes { for _, tableName := range tables { - toTarget := []string{ks + "." + tableName} + toTarget := []string{toKeyspace + "." + tableName} tt := strings.ToLower(tabletType.String()) if tabletType == topodatapb.TabletType_PRIMARY { rules[tableName] = toTarget - rules[ks+"."+tableName] = toTarget - rules[env.sourceKeyspace.KeyspaceName+"."+tableName] = toTarget + rules[targetKeyspace+"."+tableName] = toTarget + rules[sourceKeyspace+"."+tableName] = toTarget } else { rules[tableName+"@"+tt] = toTarget - rules[ks+"."+tableName+"@"+tt] = toTarget - rules[env.sourceKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget + rules[targetKeyspace+"."+tableName+"@"+tt] = toTarget + rules[sourceKeyspace+"."+tableName+"@"+tt] = toTarget } } } - err := topotools.SaveRoutingRules(ctx, env.ts, rules) - require.NoError(t, err) - err = env.ts.RebuildSrvVSchema(ctx, nil) - require.NoError(t, err) + env.saveRoutingRules(t, rules) } func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) { @@ -305,7 +319,6 @@ func (tmc *testTMClient) GetWorkflowKey(keyspace, shard string) string { func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { tmc.mu.Lock() defer tmc.mu.Unlock() - if expect := tmc.readVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { if !proto.Equal(expect, req) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ReadVReplicationWorkflow request: got %+v, want %+v", req, expect) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 84cb15c87f0..8443b2098f5 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -300,7 +300,8 @@ func (s *Server) GetCellsWithShardReadsSwitched( // keyspace. func (s *Server) GetCellsWithTableReadsSwitched( ctx context.Context, - keyspace string, + sourceKeyspace string, + targetKeyspace string, table string, tabletType topodatapb.TabletType, ) (cellsSwitched []string, cellsNotSwitched []string, err error) { @@ -330,7 +331,7 @@ func (s *Server) GetCellsWithTableReadsSwitched( ) for _, rule := range srvVSchema.RoutingRules.Rules { - ruleName := fmt.Sprintf("%s.%s@%s", keyspace, table, strings.ToLower(tabletType.String())) + ruleName := fmt.Sprintf("%s.%s@%s", sourceKeyspace, table, strings.ToLower(tabletType.String())) if rule.FromTable == ruleName { found = true @@ -341,7 +342,7 @@ func (s *Server) GetCellsWithTableReadsSwitched( return nil, nil, err } - if ks == keyspace { + if ks != sourceKeyspace { switched = true break // if one table in the workflow switched, we are done. } @@ -945,6 +946,10 @@ ORDER BY }, nil } +func (s *Server) GetWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) { + return s.getWorkflowState(ctx, targetKeyspace, workflowName) +} + func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) { ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName) if err != nil { @@ -1014,12 +1019,12 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN } } } else { - state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_RDONLY) + state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_RDONLY) if err != nil { return nil, nil, err } - state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_REPLICA) + state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_REPLICA) if err != nil { return nil, nil, err } @@ -1028,10 +1033,11 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN return nil, nil, err } for _, table := range ts.Tables() { - rr := globalRules[table] - // If a rule exists for the table and points to the target keyspace, then - // writes have been switched. - if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", targetKeyspace, table) { + // If a rule for the primary tablet type exists for any table and points to the target keyspace, + // then writes have been switched. + ruleKey := fmt.Sprintf("%s.%s", sourceKeyspace, table) + rr := globalRules[ruleKey] + if len(rr) > 0 && rr[0] != ruleKey { state.WritesSwitched = true break } diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index efc649e1ebe..5d35b205ed3 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -417,11 +417,6 @@ func TestMoveTablesComplete(t *testing.T) { tableTemplate := "CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))" sourceKeyspaceName := "sourceks" targetKeyspaceName := "targetks" - tabletTypes := []topodatapb.TabletType{ - topodatapb.TabletType_PRIMARY, - topodatapb.TabletType_REPLICA, - topodatapb.TabletType_RDONLY, - } lockName := fmt.Sprintf("%s/%s", targetKeyspaceName, workflowName) schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ table1Name: { @@ -641,7 +636,8 @@ func TestMoveTablesComplete(t *testing.T) { tc.preFunc(t, env) } // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.addTableRoutingRules(t, ctx, tabletTypes, []string{table1Name, table2Name, table3Name}) + env.updateTableRoutingRules(t, ctx, nil, []string{table1Name, table2Name, table3Name}, + tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName) got, err := env.ws.MoveTablesComplete(ctx, tc.req) if tc.wantErr != "" { require.EqualError(t, err, tc.wantErr) @@ -1154,7 +1150,8 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { } else { env.tmc.reverse.Store(true) // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.addTableRoutingRules(t, ctx, tabletTypes, []string{tableName}) + env.updateTableRoutingRules(t, ctx, tabletTypes, []string{tableName}, + tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName) env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR) @@ -1372,7 +1369,8 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { } else { env.tmc.reverse.Store(true) // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.addTableRoutingRules(t, ctx, tabletTypes, tables) + env.updateTableRoutingRules(t, ctx, tabletTypes, tables, + tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName) env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) @@ -1411,6 +1409,15 @@ func TestMirrorTraffic(t *testing.T) { topodatapb.TabletType_RDONLY, } + initialRoutingRules := map[string][]string{ + fmt.Sprintf("%s.%s", sourceKs, table1): {fmt.Sprintf("%s.%s", sourceKs, table1)}, + fmt.Sprintf("%s.%s", sourceKs, table2): {fmt.Sprintf("%s.%s", sourceKs, table2)}, + fmt.Sprintf("%s.%s@replica", sourceKs, table1): {fmt.Sprintf("%s.%s@replica", sourceKs, table1)}, + fmt.Sprintf("%s.%s@replica", sourceKs, table2): {fmt.Sprintf("%s.%s@replica", sourceKs, table2)}, + fmt.Sprintf("%s.%s@rdonly", sourceKs, table1): {fmt.Sprintf("%s.%s@rdonly", sourceKs, table1)}, + fmt.Sprintf("%s.%s@rdonly", sourceKs, table2): {fmt.Sprintf("%s.%s@rdonly", sourceKs, table2)}, + } + tests := []struct { name string @@ -1498,8 +1505,8 @@ func TestMirrorTraffic(t *testing.T) { Percent: 50.0, }, routingRules: map[string][]string{ - fmt.Sprintf("%s.%s@rdonly", targetKs, table1): {fmt.Sprintf("%s.%s@rdonly", targetKs, table1)}, - fmt.Sprintf("%s.%s@rdonly", targetKs, table2): {fmt.Sprintf("%s.%s@rdonly", targetKs, table2)}, + fmt.Sprintf("%s.%s@rdonly", sourceKs, table1): {fmt.Sprintf("%s.%s@rdonly", targetKs, table1)}, + fmt.Sprintf("%s.%s@rdonly", sourceKs, table2): {fmt.Sprintf("%s.%s@rdonly", targetKs, table2)}, }, wantErr: "cannot mirror [rdonly] traffic for workflow src2target at this time: traffic for those tablet types is switched", wantMirrorRules: make(map[string]map[string]float32), @@ -1513,8 +1520,8 @@ func TestMirrorTraffic(t *testing.T) { Percent: 50.0, }, routingRules: map[string][]string{ - fmt.Sprintf("%s.%s@replica", targetKs, table1): {fmt.Sprintf("%s.%s@replica", targetKs, table1)}, - fmt.Sprintf("%s.%s@replica", targetKs, table2): {fmt.Sprintf("%s.%s@replica", targetKs, table2)}, + fmt.Sprintf("%s.%s@replica", sourceKs, table1): {fmt.Sprintf("%s.%s@replica", targetKs, table1)}, + fmt.Sprintf("%s.%s@replica", sourceKs, table2): {fmt.Sprintf("%s.%s@replica", targetKs, table2)}, }, wantErr: "cannot mirror [replica] traffic for workflow src2target at this time: traffic for those tablet types is switched", wantMirrorRules: make(map[string]map[string]float32), @@ -1528,8 +1535,8 @@ func TestMirrorTraffic(t *testing.T) { Percent: 50.0, }, routingRules: map[string][]string{ - table1: {fmt.Sprintf("%s.%s", targetKs, table1)}, - table2: {fmt.Sprintf("%s.%s", targetKs, table2)}, + fmt.Sprintf("%s.%s", sourceKs, table1): {fmt.Sprintf("%s.%s", targetKs, table1)}, + fmt.Sprintf("%s.%s", sourceKs, table2): {fmt.Sprintf("%s.%s", targetKs, table2)}, }, wantErr: "cannot mirror [primary] traffic for workflow src2target at this time: traffic for those tablet types is switched", wantMirrorRules: make(map[string]map[string]float32), @@ -1610,6 +1617,7 @@ func TestMirrorTraffic(t *testing.T) { TabletTypes: tabletTypes, Percent: 50.0, }, + routingRules: initialRoutingRules, wantMirrorRules: map[string]map[string]float32{ fmt.Sprintf("%s.%s", sourceKs, table1): { fmt.Sprintf("%s.%s", targetKs, table1): 50.0, @@ -1644,6 +1652,7 @@ func TestMirrorTraffic(t *testing.T) { TabletTypes: tabletTypes, Percent: 50.0, }, + routingRules: initialRoutingRules, wantMirrorRules: map[string]map[string]float32{ fmt.Sprintf("%s.%s", sourceKs, table1): { fmt.Sprintf("%s.%s", targetKs, table1): 50.0, @@ -1681,6 +1690,7 @@ func TestMirrorTraffic(t *testing.T) { fmt.Sprintf("%s.%s", targetKs, table1): 25.0, }, }, + routingRules: initialRoutingRules, req: &vtctldatapb.WorkflowMirrorTrafficRequest{ Keyspace: targetKs, Workflow: workflow, diff --git a/go/vt/vtctl/workflow/workflow_state_test.go b/go/vt/vtctl/workflow/workflow_state_test.go new file mode 100644 index 00000000000..d64b6b36a86 --- /dev/null +++ b/go/vt/vtctl/workflow/workflow_state_test.go @@ -0,0 +1,165 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +func setupMoveTables(t *testing.T, ctx context.Context) *testEnv { + schema := map[string]*tabletmanagerdata.SchemaDefinition{ + "t1": { + TableDefinitions: []*tabletmanagerdata.TableDefinition{ + { + Name: "t1", + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", "t1"), + }, + }, + }, + } + sourceKeyspace := &testKeyspace{ + KeyspaceName: "source", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "target", + ShardNames: []string{"0"}, + } + te := newTestEnv(t, ctx, "zone1", sourceKeyspace, targetKeyspace) + te.tmc.schema = schema + var wfs tabletmanagerdata.ReadVReplicationWorkflowsResponse + id := int32(1) + wfs.Workflows = append(wfs.Workflows, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ + Workflow: "wf1", + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + }, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ + Workflow: "wf2", + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + }) + wfs.Workflows[0].Streams = append(wfs.Workflows[0].Streams, &tabletmanagerdata.ReadVReplicationWorkflowResponse_Stream{ + Id: id, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: te.sourceKeyspace.KeyspaceName, + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "t1", Filter: "select * from t1"}, + }, + }, + Tables: []string{"t1"}, + }, + Pos: position, + State: binlogdatapb.VReplicationWorkflowState_Running, + }) + + workflowKey := te.tmc.GetWorkflowKey("target", "wf1") + workflowResponses := []*tabletmanagerdata.ReadVReplicationWorkflowsResponse{ + nil, // this is the response for getting stopped workflows + &wfs, &wfs, &wfs, &wfs, &wfs, &wfs, // return the full list for subsequent GetWorkflows calls + } + for _, resp := range workflowResponses { + te.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) + } + te.tmc.readVReplicationWorkflowRequests[200] = &tabletmanagerdata.ReadVReplicationWorkflowRequest{ + Workflow: "wf1", + } + te.updateTableRoutingRules(t, ctx, nil, []string{"t1"}, + "source", te.targetKeyspace.KeyspaceName, "source") + return te +} + +// TestWorkflowStateMoveTables tests the logic used to determine the state of a MoveTables workflow based on the +// routing rules. We setup two workflows with the same table in both source and target keyspaces. +func TestWorkflowStateMoveTables(t *testing.T) { + ctx := context.Background() + te := setupMoveTables(t, ctx) + require.NotNil(t, te) + type testCase struct { + name string + wf1SwitchedTabletTypes []topodatapb.TabletType + wf1ExpectedState string + // Simulate a second workflow to validate that the logic used to determine the state of the first workflow + // from the routing rules is not affected by the presence of other workflows in different states. + wf2SwitchedTabletTypes []topodatapb.TabletType + } + testCases := []testCase{ + { + name: "switch reads", + wf1SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, + wf1ExpectedState: "All Reads Switched. Writes Not Switched", + wf2SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, + }, + { + name: "switch writes", + wf1SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, + wf1ExpectedState: "Reads Not Switched. Writes Switched", + wf2SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}, + }, + { + name: "switch reads and writes", + wf1SwitchedTabletTypes: defaultTabletTypes, + wf1ExpectedState: "All Reads Switched. Writes Switched", + }, + { + name: "switch rdonly only", + wf1SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, + wf1ExpectedState: "Reads partially switched. Replica not switched. All Rdonly Reads Switched. Writes Not Switched", + wf2SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, + }, + { + name: "switch replica only", + wf1SwitchedTabletTypes: []topodatapb.TabletType{topodatapb.TabletType_REPLICA}, + wf1ExpectedState: "Reads partially switched. All Replica Reads Switched. Rdonly not switched. Writes Not Switched", + wf2SwitchedTabletTypes: defaultTabletTypes, + }, + } + tables := []string{"t1"} + + getStateString := func(targetKeyspace, wfName string) string { + tsw, state, err := te.ws.getWorkflowState(ctx, targetKeyspace, wfName) + require.NoError(t, err) + require.NotNil(t, tsw) + require.NotNil(t, state) + return state.String() + } + require.Equal(t, "Reads Not Switched. Writes Not Switched", getStateString("target", "wf1")) + + resetRoutingRules := func() { + te.updateTableRoutingRules(t, ctx, nil, tables, + "source", te.targetKeyspace.KeyspaceName, "source") + te.updateTableRoutingRules(t, ctx, nil, tables, + "source2", "target2", "source2") + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + resetRoutingRules() + te.updateTableRoutingRules(t, ctx, tc.wf1SwitchedTabletTypes, tables, + "source", te.targetKeyspace.KeyspaceName, te.targetKeyspace.KeyspaceName) + te.updateTableRoutingRules(t, ctx, tc.wf2SwitchedTabletTypes, tables, + "source2", "target2", "target2") + require.Equal(t, tc.wf1ExpectedState, getStateString("target", "wf1")) + }) + } +} diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 5bcfff7d4c4..d43394ccb5c 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -730,7 +730,6 @@ func TestMoveTablesSharded(t *testing.T) { AutoStart: true, }) require.NoError(t, err) - for _, ftc := range targetShards { ftc.vrdbClient.Reset() ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult( @@ -790,13 +789,13 @@ func TestMoveTablesSharded(t *testing.T) { ), fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, sourceKs), ), nil) - sourceTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, workflow.ReverseWorkflowName(wf)), sqltypes.MakeTestResult( + sourceTablet.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, workflow.ReverseWorkflowName(wf)), sqltypes.MakeTestResult( sqltypes.MakeTestFields( "workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys", "workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64", ), fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", workflow.ReverseWorkflowName(wf), vreplID, bls, position, sourceKs), - ), nil) + )) sourceTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, wf, tenv.dbName), &sqltypes.Result{}, nil) _, err = ws.WorkflowSwitchTraffic(ctx, &vtctldatapb.WorkflowSwitchTrafficRequest{ diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 6c6bbd933a7..d337c1ee515 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -278,12 +278,12 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl } } } else { - state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_RDONLY) + state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_RDONLY) if err != nil { return nil, nil, err } - state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_REPLICA) + state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_REPLICA) if err != nil { return nil, nil, err }