From 3a59aac243b038ffa7ca2b0de226cebbeb6cc1e2 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 8 Oct 2024 19:10:41 -0400 Subject: [PATCH 01/29] Only block workflow cancel if writes have been switched Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index c247ed91e9a..6c37b089a86 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2102,8 +2102,11 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe } if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { - // Return an error if the workflow traffic is partially switched. - if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 { + // Return an error if the read-write traffic for the workflow is switched. + // If any or all read-only traffic has been switched, then we can still + // cancel as the routing rules, denied table entries, etc will be cleaned up + // and things will be back in the original state before the workflow was created. + if state.WritesSwitched { return nil, ErrWorkflowPartiallySwitched } } From a48daf6570e15af88f99090d6e535e31a90ac9ee Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 8 Oct 2024 19:42:26 -0400 Subject: [PATCH 02/29] Support reversetraffic for read-only tablets Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 42 +++++++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 6c37b089a86..8246218fbf3 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3186,16 +3186,46 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor } direction := TrafficSwitchDirection(req.Direction) if direction == DirectionBackward { + if ts.IsMultiTenantMigration() { + // In a multi-tenant migration, multiple migrations would be writing to the same + // table, so we can't stop writes like we do with MoveTables, using denied tables, + // since it would block all other migrations as well as traffic for tenants which + // have already been migrated. + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations") + } + if !startState.WritesSwitched && !slices.Contains(req.TabletTypes, topodatapb.TabletType_PRIMARY) { + // We don't need to do anything but update the routing rules in the other direction. + ts.targetKeyspace = startState.SourceKeyspace + ts.sourceKeyspace = startState.TargetKeyspace + var err error + if ts.workflowType == binlogdatapb.VReplicationWorkflowType_Reshard { + err = ts.switchShardReads(ctx, req.Cells, req.TabletTypes, direction) + } else { + err = ts.switchTableReads(ctx, req.Cells, req.TabletTypes, true, direction) + } + if err != nil { + return nil, vterrors.Wrapf(err, "failed to reverse traffic for %v tablets in workflow %s", + req.TabletTypes, req.Workflow) + } + resp := &vtctldatapb.WorkflowSwitchTrafficResponse{ + Summary: fmt.Sprintf("ReverseTraffic was successful for workflow %s.%s", req.Keyspace, req.Workflow), + StartState: startState.String(), + } + _, currentState, err := s.getWorkflowState(ctx, req.Keyspace, req.Workflow) + if err != nil { + resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err) + } else { + resp.CurrentState = currentState.String() + } + return resp, nil + } + // Update the starting state so that we're using the reverse workflow so that we can + // move forward with a normal traffic switch operation. ts, startState, err = s.getWorkflowState(ctx, startState.SourceKeyspace, ts.reverseWorkflow) if err != nil { return nil, err } - if ts.IsMultiTenantMigration() { - // In a multi-tenant migration, multiple migrations would be writing to the same table, so we can't stop writes like - // we do with MoveTables, using denied tables, since it would block all other migrations as well as traffic for - // tenants which have already been migrated. - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations") - } + } ts.force = req.GetForce() From 1d53be5e54365d33b4b562fb3120a12a42d17310 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 9 Oct 2024 21:06:03 -0400 Subject: [PATCH 03/29] Revert cancel specific change Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 8246218fbf3..f097f874e7e 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2102,11 +2102,8 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe } if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { - // Return an error if the read-write traffic for the workflow is switched. - // If any or all read-only traffic has been switched, then we can still - // cancel as the routing rules, denied table entries, etc will be cleaned up - // and things will be back in the original state before the workflow was created. - if state.WritesSwitched { + // Return an error if the workflow traffic is partially switched. + if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 { return nil, ErrWorkflowPartiallySwitched } } From 0cea47d101cfffb10db44f633a427cdc06029ff4 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 9 Oct 2024 21:57:12 -0400 Subject: [PATCH 04/29] Add unit test case Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server_test.go | 64 ++++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 5d35b205ed3..d3b46ba8d1d 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -949,11 +949,15 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { targetKeyspaceName := "targetks" vrID := 1 - tabletTypes := []topodatapb.TabletType{ + allTabletTypes := []topodatapb.TabletType{ topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY, } + roTabletTypes := []topodatapb.TabletType{ + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ tableName: { @@ -1044,7 +1048,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionForward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, }, want: &vtctldatapb.WorkflowSwitchTrafficResponse{ Summary: fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s", targetKeyspaceName, workflowName), @@ -1066,7 +1070,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionBackward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, }, want: &vtctldatapb.WorkflowSwitchTrafficResponse{ Summary: fmt.Sprintf("ReverseTraffic was successful for workflow %s.%s", targetKeyspaceName, workflowName), @@ -1074,6 +1078,28 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { CurrentState: "Reads Not Switched. Writes Not Switched", }, }, + { + name: "backward for read-only tablets", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionBackward), + TabletTypes: roTabletTypes, + }, + want: &vtctldatapb.WorkflowSwitchTrafficResponse{ + Summary: fmt.Sprintf("ReverseTraffic was successful for workflow %s.%s", targetKeyspaceName, workflowName), + StartState: "All Reads Switched. Writes Not Switched", + CurrentState: "Reads Not Switched. Writes Not Switched", + }, + }, { name: "forward with tablet refresh error", sourceKeyspace: &testKeyspace{ @@ -1088,7 +1114,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionForward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, }, preFunc: func(env *testEnv) { env.tmc.SetRefreshStateError(env.tablets[sourceKeyspaceName][startingSourceTabletUID], errors.New("tablet refresh error")) @@ -1110,7 +1136,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionForward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, Force: true, }, preFunc: func(env *testEnv) { @@ -1150,7 +1176,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { } else { env.tmc.reverse.Store(true) // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.updateTableRoutingRules(t, ctx, tabletTypes, []string{tableName}, + env.updateTableRoutingRules(t, ctx, tc.req.TabletTypes, []string{tableName}, tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName) env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream @@ -1181,13 +1207,29 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { // Confirm that we have the expected routing rules. rr, err := env.ts.GetRoutingRules(ctx) require.NoError(t, err) - to := fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) - if tc.req.Direction == int32(DirectionBackward) { - to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) - } for _, rr := range rr.Rules { + _, rrTabletType, found := strings.Cut(rr.FromTable, "@") + if !found { // No @ is primary + rrTabletType = topodatapb.TabletType_PRIMARY.String() + } + tabletType, err := topoproto.ParseTabletType(rrTabletType) + require.NoError(t, err) + + var to string + if slices.Contains(tc.req.TabletTypes, tabletType) { + to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + if tc.req.Direction == int32(DirectionBackward) { + to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) + } + } else { + to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) + if tc.req.Direction == int32(DirectionBackward) { + to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + } + } for _, tt := range rr.ToTables { - require.Equal(t, to, tt) + require.Equal(t, to, tt, "Additional info: tablet type: %s, rr.FromTable: %s, rr.ToTables: %v, to string: %s", + tabletType.String(), rr.FromTable, rr.ToTables, to) } } // Confirm that we have the expected denied tables entries. From fbd7ad6a3bceaf834088ce367bf38c96ebaa6a97 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 9 Oct 2024 22:52:54 -0400 Subject: [PATCH 05/29] Fixes and improvements Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 23 ++++++++++++++----- go/vt/vtctl/workflow/server_test.go | 28 +++++++++++++----------- go/vt/vtctl/workflow/traffic_switcher.go | 7 +++--- 3 files changed, 36 insertions(+), 22 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index f097f874e7e..ff421d72e4e 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3192,22 +3192,34 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor } if !startState.WritesSwitched && !slices.Contains(req.TabletTypes, topodatapb.TabletType_PRIMARY) { // We don't need to do anything but update the routing rules in the other direction. - ts.targetKeyspace = startState.SourceKeyspace - ts.sourceKeyspace = startState.TargetKeyspace + var sw iswitcher + if req.DryRun { + sw = &switcherDryRun{ts: ts, drLog: NewLogRecorder()} + } else { + ts.targetKeyspace = startState.SourceKeyspace + ts.sourceKeyspace = startState.TargetKeyspace + sw = &switcher{ts: ts, s: s} + } var err error if ts.workflowType == binlogdatapb.VReplicationWorkflowType_Reshard { - err = ts.switchShardReads(ctx, req.Cells, req.TabletTypes, direction) + err = sw.switchShardReads(ctx, req.Cells, req.TabletTypes, direction) } else { - err = ts.switchTableReads(ctx, req.Cells, req.TabletTypes, true, direction) + err = sw.switchTableReads(ctx, req.Cells, req.TabletTypes, true, direction) } if err != nil { return nil, vterrors.Wrapf(err, "failed to reverse traffic for %v tablets in workflow %s", req.TabletTypes, req.Workflow) } resp := &vtctldatapb.WorkflowSwitchTrafficResponse{ - Summary: fmt.Sprintf("ReverseTraffic was successful for workflow %s.%s", req.Keyspace, req.Workflow), StartState: startState.String(), } + if req.DryRun { + resp.Summary = fmt.Sprintf("ReverseTraffic dry run results for workflow %s.%s at %v", + req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822)) + resp.DryRunResults = *sw.logs() + } else { + resp.Summary = fmt.Sprintf("ReverseTraffic was successful for workflow %s.%s", req.Keyspace, req.Workflow) + } _, currentState, err := s.getWorkflowState(ctx, req.Keyspace, req.Workflow) if err != nil { resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err) @@ -3263,6 +3275,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if req.DryRun && len(dryRunResults) == 0 { dryRunResults = append(dryRunResults, "No changes required") } + cmd := "SwitchTraffic" if direction == DirectionBackward { cmd = "ReverseTraffic" diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index d3b46ba8d1d..1570369872a 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -1233,19 +1233,21 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { } } // Confirm that we have the expected denied tables entries. - for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { - for _, shardName := range keyspace.ShardNames { - si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName) - require.NoError(t, err) - switch { - case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionForward): - require.True(t, hasDeniedTableEntry(si)) - case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionBackward): - require.False(t, hasDeniedTableEntry(si)) - case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionForward): - require.False(t, hasDeniedTableEntry(si)) - case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionBackward): - require.True(t, hasDeniedTableEntry(si)) + if slices.Contains(tc.req.TabletTypes, topodatapb.TabletType_PRIMARY) { + for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { + for _, shardName := range keyspace.ShardNames { + si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName) + require.NoError(t, err) + switch { + case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionForward): + require.True(t, hasDeniedTableEntry(si)) + case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionBackward): + require.False(t, hasDeniedTableEntry(si)) + case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionForward): + require.False(t, hasDeniedTableEntry(si)) + case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionBackward): + require.True(t, hasDeniedTableEntry(si)) + } } } } diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 80638cd5973..b4454818f5c 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -652,13 +652,12 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid tablet type specified when switching reads: %v", servedType) } - tt := strings.ToLower(servedType.String()) for _, table := range ts.Tables() { - toTarget := []string{ts.TargetKeyspaceName() + "." + table} + toTarget := []string{ts.targetKeyspace + "." + table} rules[table+"@"+tt] = toTarget - rules[ts.TargetKeyspaceName()+"."+table+"@"+tt] = toTarget - rules[ts.SourceKeyspaceName()+"."+table+"@"+tt] = toTarget + rules[ts.targetKeyspace+"."+table+"@"+tt] = toTarget + rules[ts.sourceKeyspace+"."+table+"@"+tt] = toTarget } } if err := topotools.SaveRoutingRules(ctx, ts.TopoServer(), rules); err != nil { From cdb53231c73aea782beae0bbdf9cc9bdc2186cb6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 10 Oct 2024 13:55:38 -0400 Subject: [PATCH 06/29] Cleanup and unify traffic switching handling Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 145 ++++++++--------------- go/vt/vtctl/workflow/server_test.go | 31 +++-- go/vt/vtctl/workflow/traffic_switcher.go | 6 +- 3 files changed, 74 insertions(+), 108 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index ff421d72e4e..c80185dbd43 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3164,15 +3164,6 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if timeout.Seconds() < 1 { return nil, vterrors.Wrap(err, "timeout must be at least 1 second") } - ts, startState, err := s.getWorkflowState(ctx, req.Keyspace, req.Workflow) - if err != nil { - return nil, err - } - - if startState.WorkflowType == TypeMigrate { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic") - } - maxReplicationLagAllowed, set, err := protoutil.DurationFromProto(req.MaxReplicationLagAllowed) if err != nil { err = vterrors.Wrapf(err, "unable to parse MaxReplicationLagAllowed into a valid duration") @@ -3182,59 +3173,67 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor maxReplicationLagAllowed = DefaultTimeout } direction := TrafficSwitchDirection(req.Direction) - if direction == DirectionBackward { - if ts.IsMultiTenantMigration() { - // In a multi-tenant migration, multiple migrations would be writing to the same - // table, so we can't stop writes like we do with MoveTables, using denied tables, - // since it would block all other migrations as well as traffic for tenants which - // have already been migrated. - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations") - } - if !startState.WritesSwitched && !slices.Contains(req.TabletTypes, topodatapb.TabletType_PRIMARY) { - // We don't need to do anything but update the routing rules in the other direction. - var sw iswitcher - if req.DryRun { - sw = &switcherDryRun{ts: ts, drLog: NewLogRecorder()} - } else { - ts.targetKeyspace = startState.SourceKeyspace - ts.sourceKeyspace = startState.TargetKeyspace - sw = &switcher{ts: ts, s: s} - } - var err error - if ts.workflowType == binlogdatapb.VReplicationWorkflowType_Reshard { - err = sw.switchShardReads(ctx, req.Cells, req.TabletTypes, direction) - } else { - err = sw.switchTableReads(ctx, req.Cells, req.TabletTypes, true, direction) - } - if err != nil { - return nil, vterrors.Wrapf(err, "failed to reverse traffic for %v tablets in workflow %s", - req.TabletTypes, req.Workflow) - } - resp := &vtctldatapb.WorkflowSwitchTrafficResponse{ - StartState: startState.String(), - } - if req.DryRun { - resp.Summary = fmt.Sprintf("ReverseTraffic dry run results for workflow %s.%s at %v", - req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822)) - resp.DryRunResults = *sw.logs() - } else { - resp.Summary = fmt.Sprintf("ReverseTraffic was successful for workflow %s.%s", req.Keyspace, req.Workflow) - } - _, currentState, err := s.getWorkflowState(ctx, req.Keyspace, req.Workflow) + hasReplica, hasRdonly, hasPrimary, err = parseTabletTypes(req.TabletTypes) + if err != nil { + return nil, err + } + + ts, startState, err := s.getWorkflowState(ctx, req.Keyspace, req.Workflow) + if err != nil { + return nil, err + } + onlySwitchingReads := !startState.WritesSwitched && !hasPrimary + + if startState.WorkflowType == TypeMigrate { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic") + } + + buildResponse := func() *vtctldatapb.WorkflowSwitchTrafficResponse { + if wrDryRunResults != nil { + dryRunResults = append(dryRunResults, *wrDryRunResults...) + } + if req.DryRun && len(dryRunResults) == 0 { + dryRunResults = append(dryRunResults, "No changes required") + } + + cmd := "SwitchTraffic" + // We must check the original direction requested. + if TrafficSwitchDirection(req.Direction) == DirectionBackward { + cmd = "ReverseTraffic" + } + s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) + resp := &vtctldatapb.WorkflowSwitchTrafficResponse{} + if req.DryRun { + resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v", + cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822)) + resp.DryRunResults = dryRunResults + } else { + s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) + resp.Summary = fmt.Sprintf("%s was successful for workflow %s.%s", cmd, req.Keyspace, req.Workflow) + // Reload the state after the SwitchTraffic operation and return that + // as a string. + resp.StartState = startState.String() + s.Logger().Infof("Before reloading workflow state after switching traffic: %+v\n", resp.StartState) + _, currentState, err := s.getWorkflowState(ctx, ts.targetKeyspace, ts.workflow) if err != nil { resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err) } else { resp.CurrentState = currentState.String() } - return resp, nil + s.Logger().Infof("%s done for workflow %s.%s, returning response %v", cmd, req.Keyspace, req.Workflow, resp) } + return resp + } + + if direction == DirectionBackward && !onlySwitchingReads { // Update the starting state so that we're using the reverse workflow so that we can - // move forward with a normal traffic switch operation. + // move forward with a normal traffic switch forward operation, from the _reverse + // workflow's perspective. + direction = DirectionForward ts, startState, err = s.getWorkflowState(ctx, startState.SourceKeyspace, ts.reverseWorkflow) if err != nil { return nil, err } - } ts.force = req.GetForce() @@ -3247,10 +3246,6 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot switch traffic for workflow %s at this time: %s", startState.Workflow, reason) } - hasReplica, hasRdonly, hasPrimary, err = parseTabletTypes(req.TabletTypes) - if err != nil { - return nil, err - } if hasReplica || hasRdonly { // If we're going to switch writes immediately after then we don't need to // rebuild the SrvVSchema here as we will do it after switching writes. @@ -3269,45 +3264,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor s.Logger().Infof("Switch Writes done for workflow %s.%s", req.Keyspace, req.Workflow) } - if wrDryRunResults != nil { - dryRunResults = append(dryRunResults, *wrDryRunResults...) - } - if req.DryRun && len(dryRunResults) == 0 { - dryRunResults = append(dryRunResults, "No changes required") - } - - cmd := "SwitchTraffic" - if direction == DirectionBackward { - cmd = "ReverseTraffic" - } - s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) - resp := &vtctldatapb.WorkflowSwitchTrafficResponse{} - if req.DryRun { - resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v", - cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822)) - resp.DryRunResults = dryRunResults - } else { - s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) - resp.Summary = fmt.Sprintf("%s was successful for workflow %s.%s", cmd, req.Keyspace, req.Workflow) - // Reload the state after the SwitchTraffic operation - // and return that as a string. - keyspace := req.Keyspace - workflow := req.Workflow - if direction == DirectionBackward { - keyspace = startState.SourceKeyspace - workflow = ts.reverseWorkflow - } - resp.StartState = startState.String() - s.Logger().Infof("Before reloading workflow state after switching traffic: %+v\n", resp.StartState) - _, currentState, err := s.getWorkflowState(ctx, keyspace, workflow) - if err != nil { - resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err) - } else { - resp.CurrentState = currentState.String() - } - s.Logger().Infof("%s done for workflow %s.%s, returning response %v", cmd, req.Keyspace, req.Workflow, resp) - } - return resp, nil + return buildResponse(), nil } // switchReads is a generic way of switching read traffic for a workflow. diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 1570369872a..aac0e2ecefc 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -1178,20 +1178,25 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { // Setup the routing rules as they would be after having previously done SwitchTraffic. env.updateTableRoutingRules(t, ctx, tc.req.TabletTypes, []string{tableName}, tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) - for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR) - } - for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) - } - for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) + if !slices.Contains(tc.req.TabletTypes, topodatapb.TabletType_PRIMARY) { + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, journalQR) + } + } else { + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR) + } + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) + } + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) + } + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, deleteWFQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createWFQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createJournalQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, freezeReverseWFQR) } - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, deleteWFQR) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createWFQR) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createJournalQR) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, freezeReverseWFQR) } if tc.preFunc != nil { tc.preFunc(env) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index b4454818f5c..9ac5302f19f 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -648,13 +648,17 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, // targetKeyspace.table -> sourceKeyspace.table // For forward migration, we add tablet type specific rules to redirect traffic to the target. // For backward, we redirect to source. + targetKeyspace := ts.targetKeyspace + if direction == DirectionBackward { + targetKeyspace = ts.sourceKeyspace + } for _, servedType := range servedTypes { if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid tablet type specified when switching reads: %v", servedType) } tt := strings.ToLower(servedType.String()) for _, table := range ts.Tables() { - toTarget := []string{ts.targetKeyspace + "." + table} + toTarget := []string{targetKeyspace + "." + table} rules[table+"@"+tt] = toTarget rules[ts.targetKeyspace+"."+table+"@"+tt] = toTarget rules[ts.sourceKeyspace+"."+table+"@"+tt] = toTarget From 0e4e39b909297d051c83e06f71042ab53503d7ee Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 10 Oct 2024 14:29:41 -0400 Subject: [PATCH 07/29] Shrink diff Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 73 ++++++++++++++++------------------ 1 file changed, 35 insertions(+), 38 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index c80185dbd43..002d856eea1 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3188,43 +3188,6 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic") } - buildResponse := func() *vtctldatapb.WorkflowSwitchTrafficResponse { - if wrDryRunResults != nil { - dryRunResults = append(dryRunResults, *wrDryRunResults...) - } - if req.DryRun && len(dryRunResults) == 0 { - dryRunResults = append(dryRunResults, "No changes required") - } - - cmd := "SwitchTraffic" - // We must check the original direction requested. - if TrafficSwitchDirection(req.Direction) == DirectionBackward { - cmd = "ReverseTraffic" - } - s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) - resp := &vtctldatapb.WorkflowSwitchTrafficResponse{} - if req.DryRun { - resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v", - cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822)) - resp.DryRunResults = dryRunResults - } else { - s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) - resp.Summary = fmt.Sprintf("%s was successful for workflow %s.%s", cmd, req.Keyspace, req.Workflow) - // Reload the state after the SwitchTraffic operation and return that - // as a string. - resp.StartState = startState.String() - s.Logger().Infof("Before reloading workflow state after switching traffic: %+v\n", resp.StartState) - _, currentState, err := s.getWorkflowState(ctx, ts.targetKeyspace, ts.workflow) - if err != nil { - resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err) - } else { - resp.CurrentState = currentState.String() - } - s.Logger().Infof("%s done for workflow %s.%s, returning response %v", cmd, req.Keyspace, req.Workflow, resp) - } - return resp - } - if direction == DirectionBackward && !onlySwitchingReads { // Update the starting state so that we're using the reverse workflow so that we can // move forward with a normal traffic switch forward operation, from the _reverse @@ -3264,7 +3227,41 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor s.Logger().Infof("Switch Writes done for workflow %s.%s", req.Keyspace, req.Workflow) } - return buildResponse(), nil + if wrDryRunResults != nil { + dryRunResults = append(dryRunResults, *wrDryRunResults...) + } + if req.DryRun && len(dryRunResults) == 0 { + dryRunResults = append(dryRunResults, "No changes required") + } + + cmd := "SwitchTraffic" + // We must check the original direction requested. + if TrafficSwitchDirection(req.Direction) == DirectionBackward { + cmd = "ReverseTraffic" + } + s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) + resp := &vtctldatapb.WorkflowSwitchTrafficResponse{} + if req.DryRun { + resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v", + cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822)) + resp.DryRunResults = dryRunResults + } else { + s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) + resp.Summary = fmt.Sprintf("%s was successful for workflow %s.%s", cmd, req.Keyspace, req.Workflow) + // Reload the state after the SwitchTraffic operation and return that + // as a string. + resp.StartState = startState.String() + s.Logger().Infof("Before reloading workflow state after switching traffic: %+v\n", resp.StartState) + _, currentState, err := s.getWorkflowState(ctx, ts.targetKeyspace, ts.workflow) + if err != nil { + resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err) + } else { + resp.CurrentState = currentState.String() + } + s.Logger().Infof("%s done for workflow %s.%s, returning response %v", cmd, req.Keyspace, req.Workflow, resp) + } + + return resp, nil } // switchReads is a generic way of switching read traffic for a workflow. From 8297aae036c7e7bf13ae3805b5cabbb2b50f017b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 10 Oct 2024 14:50:28 -0400 Subject: [PATCH 08/29] Adjust and add to dry run unit test Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server_test.go | 80 ++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 25 deletions(-) diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index aac0e2ecefc..baf2979b6f2 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -50,6 +50,19 @@ import ( vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) +var ( + allTabletTypes = []topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } + + roTabletTypes = []topodatapb.TabletType{ + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } +) + type fakeTMC struct { tmclient.TabletManagerClient vrepQueriesByTablet map[string]map[string]*querypb.QueryResult @@ -949,16 +962,6 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { targetKeyspaceName := "targetks" vrID := 1 - allTabletTypes := []topodatapb.TabletType{ - topodatapb.TabletType_PRIMARY, - topodatapb.TabletType_REPLICA, - topodatapb.TabletType_RDONLY, - } - roTabletTypes := []topodatapb.TabletType{ - topodatapb.TabletType_REPLICA, - topodatapb.TabletType_RDONLY, - } - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ tableName: { TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ @@ -1273,11 +1276,6 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { sourceKeyspaceName := "sourceks" targetKeyspaceName := "targetks" vrID := 1 - tabletTypes := []topodatapb.TabletType{ - topodatapb.TabletType_PRIMARY, - topodatapb.TabletType_REPLICA, - topodatapb.TabletType_RDONLY, - } schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ table1Name: { TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ @@ -1330,7 +1328,7 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionForward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, DryRun: true, }, want: []string{ @@ -1371,13 +1369,13 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionBackward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, DryRun: true, }, want: []string{ fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", targetKeyspaceName, sourceKeyspaceName), - fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, targetKeyspaceName), + fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, sourceKeyspaceName), fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), @@ -1398,6 +1396,32 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), }, }, + { + name: "backward for read-only tablets", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionBackward), + TabletTypes: roTabletTypes, + DryRun: true, + }, + want: []string{ + fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", sourceKeyspaceName, targetKeyspaceName), + fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, sourceKeyspaceName), + fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), + fmt.Sprintf("Serving VSchema will be rebuilt for the %s keyspace", sourceKeyspaceName), + fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName), + }, + }, } for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { @@ -1418,14 +1442,20 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { } else { env.tmc.reverse.Store(true) // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.updateTableRoutingRules(t, ctx, tabletTypes, tables, + env.updateTableRoutingRules(t, ctx, tc.req.TabletTypes, tables, tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) - for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) - } - for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) + if !slices.Contains(tc.req.TabletTypes, topodatapb.TabletType_PRIMARY) { + for i := 0; i < len(tc.sourceKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, journalQR) + } + } else { + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) + for i := 0; i < len(tc.sourceKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) + } + for i := 0; i < len(tc.sourceKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) + } } } got, err := env.ws.WorkflowSwitchTraffic(ctx, tc.req) From 07aca7697d4514487deea400639695b8ae72ca86 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 10 Oct 2024 18:58:00 -0400 Subject: [PATCH 09/29] Fix e2e test Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/helper_test.go | 5 +- .../resharding_workflows_v2_test.go | 50 ++++++++++++------- go/vt/vtctl/workflow/server.go | 6 ++- 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 44c35d0acea..c2dc3d0dade 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -331,8 +331,11 @@ func executeOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletPro func assertQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) { t.Helper() + rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules") + require.NoError(t, err) count0, body0, count1, body1 := executeOnTablet(t, conn, tablet, ksName, query, matchQuery) - assert.Equalf(t, count0+1, count1, "query %q did not execute in target;\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\n", query, matchQuery, body0, body1) + assert.Equalf(t, count0+1, count1, "query %q did not execute on target %s:%s@%s tablet;\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\nrouting rules:\n%s\n\n", + query, tablet.Keyspace, tablet.Shard, tablet.TabletType, matchQuery, body0, body1, rr) } func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) { diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 4c6dea61912..bc062b1e70b 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -22,6 +22,7 @@ import ( "fmt" "math/rand/v2" "net" + "slices" "strconv" "strings" "sync" @@ -35,9 +36,11 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/throttler" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/wrangler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) @@ -353,15 +356,29 @@ func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.Vttabl } func validateReadsRouteToSource(t *testing.T, tabletTypes string) { - if sourceReplicaTab != nil { + tt, err := topoproto.ParseTabletTypes(tabletTypes) + require.NoError(t, err) + if slices.Contains(tt, topodatapb.TabletType_REPLICA) { + require.NotNil(t, sourceReplicaTab) validateReadsRoute(t, tabletTypes, sourceReplicaTab) } + if slices.Contains(tt, topodatapb.TabletType_RDONLY) { + require.NotNil(t, sourceRdonlyTab) + validateReadsRoute(t, tabletTypes, sourceRdonlyTab) + } } func validateReadsRouteToTarget(t *testing.T, tabletTypes string) { - if targetReplicaTab1 != nil { + tt, err := topoproto.ParseTabletTypes(tabletTypes) + require.NoError(t, err) + if slices.Contains(tt, topodatapb.TabletType_REPLICA) { + require.NotNil(t, targetReplicaTab1) validateReadsRoute(t, tabletTypes, targetReplicaTab1) } + if slices.Contains(tt, topodatapb.TabletType_RDONLY) { + require.NotNil(t, targetRdonlyTab1) + validateReadsRoute(t, tabletTypes, targetRdonlyTab1) + } } func validateWritesRouteToSource(t *testing.T) { @@ -664,7 +681,7 @@ func testMoveTablesV2Workflow(t *testing.T) { // If it's not then we'll get an error as the table doesn't exist in the vschema createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) - validateReadsRouteToSource(t, "replica") + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) // Verify that we've properly ignored any internal operational tables @@ -771,12 +788,12 @@ func testRestOfWorkflow(t *testing.T) { waitForLowLag(t, "customer", "wf1") tstWorkflowSwitchReads(t, "", "") checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) - validateReadsRouteToTarget(t, "replica") + validateReadsRouteToTarget(t, "replica,rdonly") validateWritesRouteToSource(t) tstWorkflowSwitchWrites(t) checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateAllSwitched) - validateReadsRouteToTarget(t, "replica") + validateReadsRouteToTarget(t, "replica,rdonly") validateWritesRouteToTarget(t) // this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces @@ -787,42 +804,41 @@ func testRestOfWorkflow(t *testing.T) { waitForLowLag(t, keyspace, "wf1_reverse") tstWorkflowReverseReads(t, "", "") checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched) - validateReadsRouteToSource(t, "replica") + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToTarget(t) tstWorkflowReverseWrites(t) checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched) - validateReadsRouteToSource(t, "replica") + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) waitForLowLag(t, "customer", "wf1") tstWorkflowSwitchWrites(t) checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateWritesSwitched) - validateReadsRouteToSource(t, "replica") + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToTarget(t) waitForLowLag(t, keyspace, "wf1_reverse") tstWorkflowReverseWrites(t) - validateReadsRouteToSource(t, "replica") + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) waitForLowLag(t, "customer", "wf1") tstWorkflowSwitchReads(t, "", "") - validateReadsRouteToTarget(t, "replica") + validateReadsRouteToTarget(t, "replica,rdonly") validateWritesRouteToSource(t) tstWorkflowReverseReads(t, "", "") - validateReadsRouteToSource(t, "replica") + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) tstWorkflowSwitchReadsAndWrites(t) - validateReadsRouteToTarget(t, "replica") - validateReadsRoute(t, "rdonly", targetRdonlyTab1) + validateReadsRouteToTarget(t, "replica,rdonly") + validateReadsRouteToTarget(t, "rdonly,rdonly") validateWritesRouteToTarget(t) waitForLowLag(t, keyspace, "wf1_reverse") tstWorkflowReverseReadsAndWrites(t) - validateReadsRoute(t, "rdonly", sourceRdonlyTab) - validateReadsRouteToSource(t, "replica") + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) // trying to complete an unswitched workflow should error @@ -835,8 +851,7 @@ func testRestOfWorkflow(t *testing.T) { waitForLowLag(t, "customer", "customer_name") waitForLowLag(t, "customer", "enterprise_customer") tstWorkflowSwitchReadsAndWrites(t) - validateReadsRoute(t, "rdonly", targetRdonlyTab1) - validateReadsRouteToTarget(t, "replica") + validateReadsRouteToTarget(t, "replica,rdonly") validateWritesRouteToTarget(t) err = tstWorkflowComplete(t) @@ -1048,6 +1063,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) { targetTab2 = custKs.Shards["80-c0"].Tablets["zone1-600"].Vttablet targetTab1 = custKs.Shards["40-80"].Tablets["zone1-500"].Vttablet targetReplicaTab1 = custKs.Shards["-40"].Tablets["zone1-401"].Vttablet + targetRdonlyTab1 = custKs.Shards["-40"].Tablets["zone1-402"].Vttablet sourceTab = custKs.Shards["-80"].Tablets["zone1-200"].Vttablet sourceReplicaTab = custKs.Shards["-80"].Tablets["zone1-201"].Vttablet diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 002d856eea1..27dbcafd161 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3138,6 +3138,10 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor span, ctx := trace.NewSpan(ctx, "workflow.Server.WorkflowSwitchTraffic") defer span.Finish() + if req == nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid nil request") + } + span.Annotate("keyspace", req.Keyspace) span.Annotate("workflow", req.Workflow) span.Annotate("tablet-types", req.TabletTypes) @@ -3192,11 +3196,11 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor // Update the starting state so that we're using the reverse workflow so that we can // move forward with a normal traffic switch forward operation, from the _reverse // workflow's perspective. - direction = DirectionForward ts, startState, err = s.getWorkflowState(ctx, startState.SourceKeyspace, ts.reverseWorkflow) if err != nil { return nil, err } + direction = DirectionForward } ts.force = req.GetForce() From d83164ac9410ff3a545a2d52f8392bbc1ab7d02e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 10 Oct 2024 22:46:45 -0400 Subject: [PATCH 10/29] Fix e2e test Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/helper_test.go | 4 +-- .../resharding_workflows_v2_test.go | 29 +++++++++---------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index c2dc3d0dade..3795b6f52d5 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -334,8 +334,8 @@ func assertQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules") require.NoError(t, err) count0, body0, count1, body1 := executeOnTablet(t, conn, tablet, ksName, query, matchQuery) - assert.Equalf(t, count0+1, count1, "query %q did not execute on target %s:%s@%s tablet;\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\nrouting rules:\n%s\n\n", - query, tablet.Keyspace, tablet.Shard, tablet.TabletType, matchQuery, body0, body1, rr) + require.Equalf(t, count0+1, count1, "query %q did not execute on destination %s (%s-%d);\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\nrouting rules:\n%s\n\n", + query, ksName, tablet.Cell, tablet.TabletUID, matchQuery, body0, body1, rr) } func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) { diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index bc062b1e70b..c7aaaf71351 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -337,22 +337,19 @@ func tstWorkflowCancel(t *testing.T) error { return tstWorkflowAction(t, workflowActionCancel, "", "") } -func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.VttabletProcess) { +func validateReadsRoute(t *testing.T, tabletType string, tablet *cluster.VttabletProcess) { if tablet == nil { return } - if tabletTypes == "" { - tabletTypes = "replica,rdonly" - } vtgateConn, closeConn := getVTGateConn() defer closeConn() - for _, tt := range []string{"replica", "rdonly"} { - destination := fmt.Sprintf("%s:%s@%s", tablet.Keyspace, tablet.Shard, tt) - if strings.Contains(tabletTypes, tt) { - readQuery := "select cid from customer limit 10" - assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1") - } - } + // We do NOT want to target a shard as that goes around the routing rules and + // defeats the purpose here. We are using a query w/o a WHERE clause so for + // sharded keyspaces it should hit all shards as a SCATTER query. So all we + // care about is the keyspace and tablet type. + destination := fmt.Sprintf("%s@%s", tablet.Keyspace, strings.ToLower(tabletType)) + readQuery := "select cid from customer limit 50" + assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1") } func validateReadsRouteToSource(t *testing.T, tabletTypes string) { @@ -360,11 +357,11 @@ func validateReadsRouteToSource(t *testing.T, tabletTypes string) { require.NoError(t, err) if slices.Contains(tt, topodatapb.TabletType_REPLICA) { require.NotNil(t, sourceReplicaTab) - validateReadsRoute(t, tabletTypes, sourceReplicaTab) + validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), sourceReplicaTab) } if slices.Contains(tt, topodatapb.TabletType_RDONLY) { require.NotNil(t, sourceRdonlyTab) - validateReadsRoute(t, tabletTypes, sourceRdonlyTab) + validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), sourceRdonlyTab) } } @@ -373,11 +370,11 @@ func validateReadsRouteToTarget(t *testing.T, tabletTypes string) { require.NoError(t, err) if slices.Contains(tt, topodatapb.TabletType_REPLICA) { require.NotNil(t, targetReplicaTab1) - validateReadsRoute(t, tabletTypes, targetReplicaTab1) + validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), targetReplicaTab1) } if slices.Contains(tt, topodatapb.TabletType_RDONLY) { require.NotNil(t, targetRdonlyTab1) - validateReadsRoute(t, tabletTypes, targetRdonlyTab1) + validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), targetRdonlyTab1) } } @@ -428,6 +425,7 @@ func getCurrentStatus(t *testing.T) string { // but CI currently fails on creating multiple clusters even after the previous ones are torn down func TestBasicV2Workflows(t *testing.T) { + defaultReplicas = 1 defaultRdonly = 1 extraVTTabletArgs = []string{ parallelInsertWorkers, @@ -834,7 +832,6 @@ func testRestOfWorkflow(t *testing.T) { tstWorkflowSwitchReadsAndWrites(t) validateReadsRouteToTarget(t, "replica,rdonly") - validateReadsRouteToTarget(t, "rdonly,rdonly") validateWritesRouteToTarget(t) waitForLowLag(t, keyspace, "wf1_reverse") tstWorkflowReverseReadsAndWrites(t) From ef645d2067f639c9e97da781cf513b73cdc7f14f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 12 Oct 2024 10:31:56 -0400 Subject: [PATCH 11/29] Fix switchShardReads for reverse Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/traffic_switcher.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 9ac5302f19f..1c29a060843 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -609,28 +609,35 @@ func (ts *trafficSwitcher) dropSourceShards(ctx context.Context) error { func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { cellsStr := strings.Join(cells, ",") ts.Logger().Infof("switchShardReads: cells: %s, tablet types: %+v, direction %d", cellsStr, servedTypes, direction) + + targetKeyspace, sourceKeyspace := ts.targetKeyspace, ts.sourceKeyspace fromShards, toShards := ts.SourceShards(), ts.TargetShards() - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { + if direction == DirectionBackward { + targetKeyspace, sourceKeyspace = ts.sourceKeyspace, ts.targetKeyspace + fromShards, toShards = ts.TargetShards(), ts.SourceShards() + } + + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, targetKeyspace, cellsStr); err != nil { err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.TargetKeyspaceName(), cellsStr) + targetKeyspace, cellsStr) ts.Logger().Errorf("%w", err2) return err2 } for _, servedType := range servedTypes { - if err := ts.ws.updateShardRecords(ctx, ts.SourceKeyspaceName(), fromShards, cells, servedType, true /* isFrom */, false /* clearSourceShards */, ts.Logger()); err != nil { + if err := ts.ws.updateShardRecords(ctx, sourceKeyspace, fromShards, cells, servedType, true /* isFrom */, false /* clearSourceShards */, ts.Logger()); err != nil { return err } - if err := ts.ws.updateShardRecords(ctx, ts.SourceKeyspaceName(), toShards, cells, servedType, false, false, ts.Logger()); err != nil { + if err := ts.ws.updateShardRecords(ctx, sourceKeyspace, toShards, cells, servedType, false, false, ts.Logger()); err != nil { return err } - err := ts.TopoServer().MigrateServedType(ctx, ts.SourceKeyspaceName(), toShards, fromShards, servedType, cells) + err := ts.TopoServer().MigrateServedType(ctx, sourceKeyspace, toShards, fromShards, servedType, cells) if err != nil { return err } } - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, targetKeyspace, cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.TargetKeyspaceName(), cellsStr) + targetKeyspace, cellsStr) ts.Logger().Errorf("%w", err2) return err2 } From e8622d6d4ea6041bcb427946ae1e3490af77c999 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 12 Oct 2024 11:05:04 -0400 Subject: [PATCH 12/29] Changes from self review Signed-off-by: Matt Lord --- .../endtoend/vreplication/resharding_workflows_v2_test.go | 5 +++++ go/vt/vtctl/workflow/server.go | 7 +++---- go/vt/vtctl/workflow/server_test.go | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index c7aaaf71351..45c89d86331 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -818,23 +818,28 @@ func testRestOfWorkflow(t *testing.T) { waitForLowLag(t, keyspace, "wf1_reverse") tstWorkflowReverseWrites(t) + checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched) validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) waitForLowLag(t, "customer", "wf1") tstWorkflowSwitchReads(t, "", "") + checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) validateReadsRouteToTarget(t, "replica,rdonly") validateWritesRouteToSource(t) tstWorkflowReverseReads(t, "", "") + checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) tstWorkflowSwitchReadsAndWrites(t) + checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) validateReadsRouteToTarget(t, "replica,rdonly") validateWritesRouteToTarget(t) waitForLowLag(t, keyspace, "wf1_reverse") tstWorkflowReverseReadsAndWrites(t) + checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 27dbcafd161..9955719c415 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3181,7 +3181,6 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if err != nil { return nil, err } - ts, startState, err := s.getWorkflowState(ctx, req.Keyspace, req.Workflow) if err != nil { return nil, err @@ -3193,9 +3192,9 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor } if direction == DirectionBackward && !onlySwitchingReads { - // Update the starting state so that we're using the reverse workflow so that we can - // move forward with a normal traffic switch forward operation, from the _reverse - // workflow's perspective. + // This means that the reverse workflow exists. So we update the starting state + // so that we're using the reverse workflow and we can move forward with a normal + // traffic switch forward operation, from the reverse workflow's perspective. ts, startState, err = s.getWorkflowState(ctx, startState.SourceKeyspace, ts.reverseWorkflow) if err != nil { return nil, err diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index baf2979b6f2..2784581f92a 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -1217,7 +1217,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { require.NoError(t, err) for _, rr := range rr.Rules { _, rrTabletType, found := strings.Cut(rr.FromTable, "@") - if !found { // No @ is primary + if !found { // No @ is primary rrTabletType = topodatapb.TabletType_PRIMARY.String() } tabletType, err := topoproto.ParseTabletType(rrTabletType) From 0ed696511c23dc084503806c47aee0966c29ad98 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 13 Oct 2024 21:08:01 -0400 Subject: [PATCH 13/29] Incorporate additional tests from Rohit Signed-off-by: Matt Lord --- .../resharding_workflows_v2_test.go | 21 +- .../vreplication_vtctldclient_cli_test.go | 179 ++++++++++++++++++ .../endtoend/vreplication/wrappers_test.go | 76 ++++++-- go/vt/vtctl/workflow/traffic_switcher.go | 17 +- 4 files changed, 270 insertions(+), 23 deletions(-) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 45c89d86331..128fff4c341 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -916,7 +916,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster { zone1 := vc.Cells["zone1"] - vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, 0, 0, 100, nil) + vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil) verifyClusterHealth(t, vc) insertInitialData(t) @@ -929,7 +929,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster { func setupMinimalCustomerKeyspace(t *testing.T) map[string]*cluster.VttabletProcess { tablets := make(map[string]*cluster.VttabletProcess) if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, "customer", "-80,80-", - customerVSchema, customerSchema, 0, 0, 200, nil); err != nil { + customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, nil); err != nil { t.Fatal(err) } defaultCell := vc.Cells[vc.CellNames[0]] @@ -1077,3 +1077,20 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) { "--sql", sql, keyspace) require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err)) } + +func validateRoutingRule(t *testing.T, table, tabletType, from, to string) bool { + rr := getRoutingRules(t) + for _, r := range rr.GetRules() { + s := fmt.Sprintf("%s.%s", from, table) + if tabletType != "" && tabletType != "primary" { + s = fmt.Sprintf("%s@%s", s, tabletType) + } + if r.FromTable == s { + toTable := r.ToTables[0] + if toTable == to { + return true + } + } + } + return false +} diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index 1b26a4b05ba..86b87d52f21 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -64,7 +64,19 @@ func TestVtctldclientCLI(t *testing.T) { targetKeyspaceName := "customer" var mt iMoveTables workflowName := "wf1" + + sourceReplicaTab = vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-101"].Vttablet + require.NotNil(t, sourceReplicaTab) + sourceTab = vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-100"].Vttablet + require.NotNil(t, sourceTab) + targetTabs := setupMinimalCustomerKeyspace(t) + targetTab1 = targetTabs["-80"] + require.NotNil(t, targetTab1) + targetTab2 = targetTabs["80-"] + require.NotNil(t, targetTab2) + targetReplicaTab1 = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-80"].Tablets["zone1-201"].Vttablet + require.NotNil(t, targetReplicaTab1) t.Run("RoutingRulesApply", func(t *testing.T) { testRoutingRulesApplyCommands(t) @@ -95,6 +107,19 @@ func TestVtctldclientCLI(t *testing.T) { "-40": targetKeyspace.Shards["-40"].Tablets["zone1-400"].Vttablet, "40-80": targetKeyspace.Shards["40-80"].Tablets["zone1-500"].Vttablet, } + + sourceReplicaTab = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-80"].Tablets["zone1-201"].Vttablet + require.NotNil(t, sourceReplicaTab) + sourceTab = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-80"].Tablets["zone1-200"].Vttablet + require.NotNil(t, sourceTab) + + targetTab1 = tablets["-40"] + require.NotNil(t, targetTab1) + targetTab2 = tablets["40-80"] + require.NotNil(t, targetTab2) + targetReplicaTab1 = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-40"].Tablets["zone1-401"].Vttablet + require.NotNil(t, targetReplicaTab1) + splitShard(t, targetKeyspaceName, reshardWorkflowName, sourceShard, newShards, tablets) }) } @@ -163,7 +188,71 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK for _, tab := range targetTabs { catchup(t, tab, workflowName, "MoveTables") } + + (*mt).SwitchReads() + validateReadsRouteToTarget(t, "replica") + validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateRoutingRule(t, "customer", "", targetKs, sourceKs) + + (*mt).ReverseReads() + validateReadsRouteToSource(t, "replica") + validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateRoutingRule(t, "customer", "", targetKs, sourceKs) + + (*mt).SwitchReadsAndWrites() + validateReadsRouteToTarget(t, "replica") + validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateWritesRouteToTarget(t) + validateRoutingRule(t, "customer", "", sourceKs, targetKs) + + (*mt).ReverseReadsAndWrites() + validateReadsRouteToSource(t, "replica") + validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateWritesRouteToSource(t) + validateRoutingRule(t, "customer", "", targetKs, sourceKs) + + (*mt).SwitchReadsAndWrites() + validateReadsRouteToTarget(t, "replica") + validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateWritesRouteToTarget(t) + validateRoutingRule(t, "customer", "", sourceKs, targetKs) + + (*mt).ReverseReads() + validateReadsRouteToSource(t, "replica") + validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateWritesRouteToTarget(t) + validateRoutingRule(t, "customer", "", sourceKs, targetKs) + + (*mt).ReverseWrites() + validateReadsRouteToSource(t, "replica") + validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateWritesRouteToSource(t) + validateRoutingRule(t, "customer", "", targetKs, sourceKs) + (*mt).SwitchReadsAndWrites() + validateReadsRouteToTarget(t, "replica") + validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateWritesRouteToTarget(t) + validateRoutingRule(t, "customer", "", sourceKs, targetKs) + + (*mt).ReverseWrites() + validateReadsRouteToTarget(t, "replica") + validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateWritesRouteToSource(t) + validateRoutingRule(t, "customer", "", targetKs, sourceKs) + + (*mt).ReverseReads() + validateReadsRouteToSource(t, "replica") + validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateWritesRouteToSource(t) + validateRoutingRule(t, "customer", "", targetKs, sourceKs) + + (*mt).SwitchReadsAndWrites() + validateReadsRouteToTarget(t, "replica") + validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateWritesRouteToTarget(t) + validateRoutingRule(t, "customer", "", sourceKs, targetKs) + (*mt).Complete() confirmRoutingRulesExist(t) // Confirm that --keep-data was honored. @@ -381,17 +470,107 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards } vdiff(t, keyspace, workflowName, "zone1", false, true, nil) + shardReadsRouteToSource := func() { + require.True(t, getShardRoute(t, keyspace, "-80", "replica")) + } + + shardReadsRouteToTarget := func() { + require.True(t, getShardRoute(t, keyspace, "-40", "replica")) + } + + shardWritesRouteToSource := func() { + require.True(t, getShardRoute(t, keyspace, "-80", "primary")) + } + + shardWritesRouteToTarget := func() { + require.True(t, getShardRoute(t, keyspace, "-40", "primary")) + } + rs.SwitchReadsAndWrites() waitForLowLag(t, keyspace, workflowName+"_reverse") vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil) + shardReadsRouteToTarget() + shardWritesRouteToTarget() rs.ReverseReadsAndWrites() waitForLowLag(t, keyspace, workflowName) vdiff(t, keyspace, workflowName, "zone1", false, true, nil) + shardReadsRouteToSource() + shardWritesRouteToSource() + + rs.SwitchReads() + shardReadsRouteToTarget() + shardWritesRouteToSource() + + rs.ReverseReads() + shardReadsRouteToSource() + shardWritesRouteToSource() + + rs.SwitchReadsAndWrites() + shardReadsRouteToTarget() + shardWritesRouteToTarget() + + rs.ReverseReadsAndWrites() + shardReadsRouteToSource() + shardWritesRouteToSource() + + rs.SwitchReadsAndWrites() + shardReadsRouteToTarget() + shardWritesRouteToTarget() + + rs.ReverseReads() + shardReadsRouteToSource() + shardWritesRouteToTarget() + + rs.ReverseWrites() + shardReadsRouteToSource() + shardWritesRouteToSource() + + rs.SwitchReadsAndWrites() + shardReadsRouteToTarget() + shardWritesRouteToTarget() + + rs.ReverseWrites() + shardReadsRouteToTarget() + shardWritesRouteToSource() + + rs.ReverseReads() + shardReadsRouteToSource() + shardWritesRouteToSource() + rs.SwitchReadsAndWrites() + shardReadsRouteToTarget() + shardWritesRouteToTarget() + rs.Complete() } +func getSrvKeyspace(t *testing.T, keyspace string) *topodatapb.SrvKeyspace { + output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetSrvKeyspaces", keyspace, "zone1") + require.NoError(t, err) + var srvKeyspaces map[string]*topodatapb.SrvKeyspace + err = json2.Unmarshal([]byte(output), &srvKeyspaces) + require.NoError(t, err) + require.Equal(t, 1, len(srvKeyspaces)) + return srvKeyspaces["zone1"] +} + +func getShardRoute(t *testing.T, keyspace, shard string, tabletType string) bool { + srvKeyspace := getSrvKeyspace(t, keyspace) + for _, partition := range srvKeyspace.Partitions { + tt, err := topoproto.ParseTabletType(tabletType) + require.NoError(t, err) + if partition.ServedType == tt { + for _, shardReference := range partition.ShardReferences { + if shardReference.Name == shard { + return true + } + } + } + } + return false +} + func getReshardShowResponse(rs *iReshard) *vtctldatapb.GetWorkflowsResponse { (*rs).Show() reshardOutput := (*rs).GetLastOutput() diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 96c54b89fe8..9f097e0be69 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -33,6 +33,8 @@ type iWorkflow interface { SwitchReads() SwitchWrites() SwitchReadsAndWrites() + ReverseReads() + ReverseWrites() ReverseReadsAndWrites() Cancel() Complete() @@ -156,13 +158,26 @@ func (vmt *VtctlMoveTables) exec(action string) { require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) SwitchReads() { - // TODO implement me - panic("implement me") + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionSwitchTraffic, "replica,rdonly", "", "", defaultWorkflowExecOptions) + require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) SwitchWrites() { - // TODO implement me - panic("implement me") + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionSwitchTraffic, "primary", "", "", defaultWorkflowExecOptions) + require.NoError(vmt.vc.t, err) +} +func (vmt *VtctlMoveTables) ReverseReads() { + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionReverseTraffic, "replica,rdonly", "", "", defaultWorkflowExecOptions) + require.NoError(vmt.vc.t, err) +} + +func (vmt *VtctlMoveTables) ReverseWrites() { + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionReverseTraffic, "primary", "", "", defaultWorkflowExecOptions) + require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) Cancel() { @@ -259,6 +274,18 @@ func (v VtctldMoveTables) SwitchWrites() { v.exec(args...) } +func (v VtctldMoveTables) ReverseReads() { + args := []string{"ReverseTraffic", "--tablet-types=rdonly,replica"} + args = append(args, v.switchFlags...) + v.exec(args...) +} + +func (v VtctldMoveTables) ReverseWrites() { + args := []string{"ReverseTraffic", "--tablet-types=primary"} + args = append(args, v.switchFlags...) + v.exec(args...) +} + func (v VtctldMoveTables) Cancel() { v.exec("Cancel") } @@ -323,6 +350,16 @@ type VtctlReshard struct { *reshardWorkflow } +func (vrs *VtctlReshard) ReverseReads() { + //TODO implement me + panic("implement me") +} + +func (vrs *VtctlReshard) ReverseWrites() { + //TODO implement me + panic("implement me") +} + func (vrs *VtctlReshard) Flavor() string { return "vtctl" } @@ -409,9 +446,8 @@ func (v VtctldReshard) exec(args ...string) { args2 := []string{"Reshard", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace} args2 = append(args2, args...) var err error - if v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...); err != nil { - v.vc.t.Fatalf("failed to create Reshard workflow: %v: %s", err, v.lastOutput) - } + v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...) + require.NoError(v.vc.t, err, "failed to create Reshard workflow: %v: %s", err, v.lastOutput) } func (v VtctldReshard) Create() { @@ -449,13 +485,31 @@ func (v VtctldReshard) Show() { } func (v VtctldReshard) SwitchReads() { - // TODO implement me - panic("implement me") + args := []string{"SwitchTraffic"} + args = append(args, v.switchFlags...) + args = append(args, "--tablet-types=rdonly,replica") + v.exec(args...) } func (v VtctldReshard) SwitchWrites() { - // TODO implement me - panic("implement me") + args := []string{"SwitchTraffic"} + args = append(args, v.switchFlags...) + args = append(args, "--tablet-types=primary") + v.exec(args...) +} + +func (v VtctldReshard) ReverseReads() { + args := []string{"ReverseTraffic"} + args = append(args, v.switchFlags...) + args = append(args, "--tablet-types=rdonly,replica") + v.exec(args...) +} + +func (v VtctldReshard) ReverseWrites() { + args := []string{"ReverseTraffic"} + args = append(args, v.switchFlags...) + args = append(args, "--tablet-types=primary") + v.exec(args...) } func (v VtctldReshard) Cancel() { diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 1c29a060843..971f6e8f323 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -610,34 +610,31 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, cellsStr := strings.Join(cells, ",") ts.Logger().Infof("switchShardReads: cells: %s, tablet types: %+v, direction %d", cellsStr, servedTypes, direction) - targetKeyspace, sourceKeyspace := ts.targetKeyspace, ts.sourceKeyspace fromShards, toShards := ts.SourceShards(), ts.TargetShards() if direction == DirectionBackward { - targetKeyspace, sourceKeyspace = ts.sourceKeyspace, ts.targetKeyspace fromShards, toShards = ts.TargetShards(), ts.SourceShards() } - - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, targetKeyspace, cellsStr); err != nil { + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - targetKeyspace, cellsStr) + ts.TargetKeyspaceName(), cellsStr) ts.Logger().Errorf("%w", err2) return err2 } for _, servedType := range servedTypes { - if err := ts.ws.updateShardRecords(ctx, sourceKeyspace, fromShards, cells, servedType, true /* isFrom */, false /* clearSourceShards */, ts.Logger()); err != nil { + if err := ts.ws.updateShardRecords(ctx, ts.SourceKeyspaceName(), fromShards, cells, servedType, true /* isFrom */, false /* clearSourceShards */, ts.Logger()); err != nil { return err } - if err := ts.ws.updateShardRecords(ctx, sourceKeyspace, toShards, cells, servedType, false, false, ts.Logger()); err != nil { + if err := ts.ws.updateShardRecords(ctx, ts.SourceKeyspaceName(), toShards, cells, servedType, false, false, ts.Logger()); err != nil { return err } - err := ts.TopoServer().MigrateServedType(ctx, sourceKeyspace, toShards, fromShards, servedType, cells) + err := ts.TopoServer().MigrateServedType(ctx, ts.SourceKeyspaceName(), toShards, fromShards, servedType, cells) if err != nil { return err } } - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, targetKeyspace, cellsStr); err != nil { + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - targetKeyspace, cellsStr) + ts.TargetKeyspaceName(), cellsStr) ts.Logger().Errorf("%w", err2) return err2 } From c264a25593fcef25b1e386485d4a84c102c69136 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 13 Oct 2024 23:31:05 -0400 Subject: [PATCH 14/29] Fixes from new tests Signed-off-by: Matt Lord --- .../endtoend/vreplication/wrappers_test.go | 2 ++ go/vt/vtctl/workflow/server.go | 25 ++++++++++--------- go/vt/vtctl/workflow/server_test.go | 13 +++++----- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 9f097e0be69..32307611e2b 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -19,6 +19,7 @@ package vreplication import ( "math/rand/v2" "strconv" + "strings" "github.com/stretchr/testify/require" @@ -446,6 +447,7 @@ func (v VtctldReshard) exec(args ...string) { args2 := []string{"Reshard", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace} args2 = append(args2, args...) var err error + v.vc.t.Logf("Executing command: vtctldclient %s", strings.Join(args2, " ")) v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...) require.NoError(v.vc.t, err, "failed to create Reshard workflow: %v: %s", err, v.lastOutput) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 9955719c415..6a1e604c0dc 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3147,6 +3147,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor span.Annotate("tablet-types", req.TabletTypes) span.Annotate("direction", req.Direction) span.Annotate("enable-reverse-replication", req.EnableReverseReplication) + span.Annotate("shards", req.GetShards) span.Annotate("force", req.Force) var ( @@ -3204,13 +3205,18 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor ts.force = req.GetForce() - reason, err := s.canSwitch(ctx, ts, startState, direction, int64(maxReplicationLagAllowed.Seconds()), req.GetShards()) - if err != nil { - return nil, err - } - if reason != "" { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot switch traffic for workflow %s at this time: %s", - startState.Workflow, reason) + if onlySwitchingReads { + s.Logger().Infof("Writes already switched no need to check lag for the %s.%s workflow", + ts.targetKeyspace, ts.workflow) + } else { + reason, err := s.canSwitch(ctx, ts, startState, direction, int64(maxReplicationLagAllowed.Seconds()), req.GetShards()) + if err != nil { + return nil, err + } + if reason != "" { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot switch traffic for workflow %s at this time: %s", + startState.Workflow, reason) + } } if hasReplica || hasRdonly { // If we're going to switch writes immediately after then we don't need to @@ -3711,11 +3717,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *State, direction TrafficSwitchDirection, maxAllowedReplLagSecs int64, shards []string) (reason string, err error) { - if direction == DirectionForward && state.WritesSwitched || - direction == DirectionBackward && !state.WritesSwitched { - s.Logger().Infof("writes already switched no need to check lag") - return "", nil - } wf, err := s.GetWorkflow(ctx, state.TargetKeyspace, state.Workflow, false, shards) if err != nil { return "", err diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 2784581f92a..8b42a2ebd97 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -1186,18 +1186,19 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, journalQR) } } else { - for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR) - } + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) } for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) } - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, deleteWFQR) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createWFQR) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createJournalQR) + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, deleteWFQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createWFQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createJournalQR) + } env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, freezeReverseWFQR) } } From dec4aa438908c782bda444fe4e9182cbaff1f377 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Oct 2024 02:19:01 -0400 Subject: [PATCH 15/29] Corrections and improvements Signed-off-by: Matt Lord --- .../endtoend/vreplication/wrappers_test.go | 8 ++--- go/vt/vtctl/workflow/server.go | 36 ++++++++++++------- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 32307611e2b..e46bc6acda0 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -219,9 +219,9 @@ func (v VtctldMoveTables) exec(args ...string) { args2 := []string{"MoveTables", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace} args2 = append(args2, args...) var err error - if v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...); err != nil { - require.FailNowf(v.vc.t, "failed MoveTables action", "%v: %s", err, v.lastOutput) - } + v.vc.t.Logf("Executing command: vtctldclient %s", strings.Join(args2, " ")) + v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...) + require.NoError(v.vc.t, err, "failed MoveTables action, error: %v: output: %s", err, v.lastOutput) } func (v VtctldMoveTables) Create() { @@ -449,7 +449,7 @@ func (v VtctldReshard) exec(args ...string) { var err error v.vc.t.Logf("Executing command: vtctldclient %s", strings.Join(args2, " ")) v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...) - require.NoError(v.vc.t, err, "failed to create Reshard workflow: %v: %s", err, v.lastOutput) + require.NoError(v.vc.t, err, "failed Reshard action, error: %v: output: %s", err, v.lastOutput) } func (v VtctldReshard) Create() { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 6a1e604c0dc..e5226441713 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3151,9 +3151,9 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor span.Annotate("force", req.Force) var ( - dryRunResults []string - rdDryRunResults, wrDryRunResults *[]string - hasReplica, hasRdonly, hasPrimary bool + dryRunResults []string + rdDryRunResults, wrDryRunResults *[]string + switchReplica, switchRdonly, switchPrimary bool ) timeout, set, err := protoutil.DurationFromProto(req.GetTimeout()) if err != nil { @@ -3178,7 +3178,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor maxReplicationLagAllowed = DefaultTimeout } direction := TrafficSwitchDirection(req.Direction) - hasReplica, hasRdonly, hasPrimary, err = parseTabletTypes(req.TabletTypes) + switchReplica, switchRdonly, switchPrimary, err = parseTabletTypes(req.TabletTypes) if err != nil { return nil, err } @@ -3186,12 +3186,21 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if err != nil { return nil, err } - onlySwitchingReads := !startState.WritesSwitched && !hasPrimary if startState.WorkflowType == TypeMigrate { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic") } + // We need this to know when there isn't a reverse workflow to use. + onlySwitchingReads := !startState.WritesSwitched && !switchPrimary + + // We need this for idempotency and to avoid unnecessary work and resulting risk. + writesAlreadySwitched := (direction == DirectionForward && startState.WritesSwitched) || + (direction == DirectionBackward && !startState.WritesSwitched) + readsAlreadySwitched := (direction == DirectionForward && len(startState.ReplicaCellsNotSwitched) == 0 && len(startState.RdonlyCellsNotSwitched) == 0) || + (direction == DirectionBackward && len(startState.ReplicaCellsSwitched) == 0 && len(startState.RdonlyCellsSwitched) == 0) + needToSwitchWrites := switchPrimary && !writesAlreadySwitched + if direction == DirectionBackward && !onlySwitchingReads { // This means that the reverse workflow exists. So we update the starting state // so that we're using the reverse workflow and we can move forward with a normal @@ -3205,11 +3214,11 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor ts.force = req.GetForce() - if onlySwitchingReads { + if writesAlreadySwitched { s.Logger().Infof("Writes already switched no need to check lag for the %s.%s workflow", ts.targetKeyspace, ts.workflow) } else { - reason, err := s.canSwitch(ctx, ts, startState, direction, int64(maxReplicationLagAllowed.Seconds()), req.GetShards()) + reason, err := s.canSwitch(ctx, ts, int64(maxReplicationLagAllowed.Seconds()), req.GetShards()) if err != nil { return nil, err } @@ -3218,10 +3227,11 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor startState.Workflow, reason) } } - if hasReplica || hasRdonly { + + if (switchReplica || switchRdonly) && !readsAlreadySwitched { // If we're going to switch writes immediately after then we don't need to // rebuild the SrvVSchema here as we will do it after switching writes. - if rdDryRunResults, err = s.switchReads(ctx, req, ts, startState, !hasPrimary /* rebuildSrvVSchema */, direction); err != nil { + if rdDryRunResults, err = s.switchReads(ctx, req, ts, startState, !needToSwitchWrites /* rebuildSrvVSchema */, direction); err != nil { return nil, err } s.Logger().Infof("Switch Reads done for workflow %s.%s", req.Keyspace, req.Workflow) @@ -3229,7 +3239,8 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if rdDryRunResults != nil { dryRunResults = append(dryRunResults, *rdDryRunResults...) } - if hasPrimary { + + if needToSwitchWrites { if _, wrDryRunResults, err = s.switchWrites(ctx, req, ts, timeout, false); err != nil { return nil, err } @@ -3715,9 +3726,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return ts.id, sw.logs(), nil } -func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *State, direction TrafficSwitchDirection, - maxAllowedReplLagSecs int64, shards []string) (reason string, err error) { - wf, err := s.GetWorkflow(ctx, state.TargetKeyspace, state.Workflow, false, shards) +func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, maxAllowedReplLagSecs int64, shards []string) (reason string, err error) { + wf, err := s.GetWorkflow(ctx, ts.targetKeyspace, ts.workflow, false, shards) if err != nil { return "", err } From 20a6f7ceb59ca08447bce3414eb9224198ee3b03 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Oct 2024 08:58:54 -0400 Subject: [PATCH 16/29] Minimize changes Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index e5226441713..38a60d07ced 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3197,9 +3197,6 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor // We need this for idempotency and to avoid unnecessary work and resulting risk. writesAlreadySwitched := (direction == DirectionForward && startState.WritesSwitched) || (direction == DirectionBackward && !startState.WritesSwitched) - readsAlreadySwitched := (direction == DirectionForward && len(startState.ReplicaCellsNotSwitched) == 0 && len(startState.RdonlyCellsNotSwitched) == 0) || - (direction == DirectionBackward && len(startState.ReplicaCellsSwitched) == 0 && len(startState.RdonlyCellsSwitched) == 0) - needToSwitchWrites := switchPrimary && !writesAlreadySwitched if direction == DirectionBackward && !onlySwitchingReads { // This means that the reverse workflow exists. So we update the starting state @@ -3228,10 +3225,10 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor } } - if (switchReplica || switchRdonly) && !readsAlreadySwitched { + if switchReplica || switchRdonly { // If we're going to switch writes immediately after then we don't need to // rebuild the SrvVSchema here as we will do it after switching writes. - if rdDryRunResults, err = s.switchReads(ctx, req, ts, startState, !needToSwitchWrites /* rebuildSrvVSchema */, direction); err != nil { + if rdDryRunResults, err = s.switchReads(ctx, req, ts, startState, !switchPrimary /* rebuildSrvVSchema */, direction); err != nil { return nil, err } s.Logger().Infof("Switch Reads done for workflow %s.%s", req.Keyspace, req.Workflow) @@ -3240,7 +3237,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor dryRunResults = append(dryRunResults, *rdDryRunResults...) } - if needToSwitchWrites { + if switchPrimary { if _, wrDryRunResults, err = s.switchWrites(ctx, req, ts, timeout, false); err != nil { return nil, err } From 4dd57d3e0ae91eea3edc09a5adfb1454c6b7ff17 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Oct 2024 09:11:43 -0400 Subject: [PATCH 17/29] Line up wrangler and workflow impls Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/traffic_switcher.go | 28 +++++++++++++++--------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 971f6e8f323..137483c2c63 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -610,10 +610,13 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, cellsStr := strings.Join(cells, ",") ts.Logger().Infof("switchShardReads: cells: %s, tablet types: %+v, direction %d", cellsStr, servedTypes, direction) - fromShards, toShards := ts.SourceShards(), ts.TargetShards() - if direction == DirectionBackward { + var fromShards, toShards []*topo.ShardInfo + if direction == DirectionForward { + fromShards, toShards = ts.SourceShards(), ts.TargetShards() + } else { fromShards, toShards = ts.TargetShards(), ts.SourceShards() } + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", ts.TargetKeyspaceName(), cellsStr) @@ -652,20 +655,25 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, // targetKeyspace.table -> sourceKeyspace.table // For forward migration, we add tablet type specific rules to redirect traffic to the target. // For backward, we redirect to source. - targetKeyspace := ts.targetKeyspace - if direction == DirectionBackward { - targetKeyspace = ts.sourceKeyspace - } for _, servedType := range servedTypes { if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid tablet type specified when switching reads: %v", servedType) } tt := strings.ToLower(servedType.String()) for _, table := range ts.Tables() { - toTarget := []string{targetKeyspace + "." + table} - rules[table+"@"+tt] = toTarget - rules[ts.targetKeyspace+"."+table+"@"+tt] = toTarget - rules[ts.sourceKeyspace+"."+table+"@"+tt] = toTarget + if direction == DirectionForward { + log.Infof("Route direction forward") + toTarget := []string{ts.TargetKeyspaceName() + "." + table} + rules[table+"@"+tt] = toTarget + rules[ts.TargetKeyspaceName()+"."+table+"@"+tt] = toTarget + rules[ts.SourceKeyspaceName()+"."+table+"@"+tt] = toTarget + } else { + log.Infof("Route direction backwards") + toSource := []string{ts.SourceKeyspaceName() + "." + table} + rules[table+"@"+tt] = toSource + rules[ts.TargetKeyspaceName()+"."+table+"@"+tt] = toSource + rules[ts.SourceKeyspaceName()+"."+table+"@"+tt] = toSource + } } } if err := topotools.SaveRoutingRules(ctx, ts.TopoServer(), rules); err != nil { From f10bb1e6b472acbfdd4e59a77047dffbc722300a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Oct 2024 09:25:05 -0400 Subject: [PATCH 18/29] Minor changes Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/traffic_switcher.go | 9 ++++----- go/vt/wrangler/traffic_switcher.go | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 137483c2c63..af790fee245 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -607,8 +607,7 @@ func (ts *trafficSwitcher) dropSourceShards(ctx context.Context) error { } func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { - cellsStr := strings.Join(cells, ",") - ts.Logger().Infof("switchShardReads: cells: %s, tablet types: %+v, direction %d", cellsStr, servedTypes, direction) + ts.Logger().Infof("switchShardReads: cells: %v, tablet types: %v, direction %s", cells, servedTypes, direction.String()) var fromShards, toShards []*topo.ShardInfo if direction == DirectionForward { @@ -617,6 +616,7 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, fromShards, toShards = ts.TargetShards(), ts.SourceShards() } + cellsStr := strings.Join(cells, ",") if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", ts.TargetKeyspaceName(), cellsStr) @@ -645,7 +645,8 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, } func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error { - ts.Logger().Infof("switchTableReads: cells: %s, tablet types: %+v, direction: %s", strings.Join(cells, ","), servedTypes, direction) + ts.Logger().Infof("switchTableReads: cells: %v, tablet types: %v, direction: %s", cells, servedTypes, direction.String()) + rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err @@ -662,13 +663,11 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, tt := strings.ToLower(servedType.String()) for _, table := range ts.Tables() { if direction == DirectionForward { - log.Infof("Route direction forward") toTarget := []string{ts.TargetKeyspaceName() + "." + table} rules[table+"@"+tt] = toTarget rules[ts.TargetKeyspaceName()+"."+table+"@"+tt] = toTarget rules[ts.SourceKeyspaceName()+"."+table+"@"+tt] = toTarget } else { - log.Infof("Route direction backwards") toSource := []string{ts.SourceKeyspaceName() + "." + table} rules[table+"@"+tt] = toSource rules[ts.TargetKeyspaceName()+"."+table+"@"+tt] = toSource diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index d337c1ee515..68eca1b6782 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -1117,7 +1117,7 @@ func (ts *trafficSwitcher) validate(ctx context.Context) error { } func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error { - log.Infof("switchTableReads: servedTypes: %+v, direction %t", servedTypes, direction) + log.Infof("switchTableReads: for tablet types: %+v", servedTypes) rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err From 4d2a9926d3cbcc15a0617f5b1df6c8bf2d33c1a8 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Oct 2024 09:29:30 -0400 Subject: [PATCH 19/29] Add workflow to log message So that we can see e.g. that we're doing a forward switch for the reverse workflow. Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/traffic_switcher.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index af790fee245..dd4975f7d43 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -607,7 +607,8 @@ func (ts *trafficSwitcher) dropSourceShards(ctx context.Context) error { } func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { - ts.Logger().Infof("switchShardReads: cells: %v, tablet types: %v, direction %s", cells, servedTypes, direction.String()) + ts.Logger().Infof("switchShardReads: workflow: %s, direction: %s, cells: %v, tablet types: %v", + ts.workflow, direction.String(), cells, servedTypes) var fromShards, toShards []*topo.ShardInfo if direction == DirectionForward { @@ -645,7 +646,8 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, } func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error { - ts.Logger().Infof("switchTableReads: cells: %v, tablet types: %v, direction: %s", cells, servedTypes, direction.String()) + ts.Logger().Infof("switchTableReads: workflow: %s, direction: %s, cells: %v, tablet types: %v", + ts.workflow, direction.String(), cells, servedTypes) rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { From e5b45dc7697ecc40f77d8666b6d4a150fe903161 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Oct 2024 09:30:40 -0400 Subject: [PATCH 20/29] Remove accidental change Signed-off-by: Matt Lord --- go/vt/wrangler/traffic_switcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 68eca1b6782..d337c1ee515 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -1117,7 +1117,7 @@ func (ts *trafficSwitcher) validate(ctx context.Context) error { } func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error { - log.Infof("switchTableReads: for tablet types: %+v", servedTypes) + log.Infof("switchTableReads: servedTypes: %+v, direction %t", servedTypes, direction) rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err From a629cbbd62d6026ffe8a623dc1489215530fb775 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Oct 2024 10:47:51 -0400 Subject: [PATCH 21/29] Preserve previous TestMoveTablesBuffering behavior Meaning that we continue to use 0 replica and 0 rdonly tablets. Signed-off-by: Matt Lord --- .../vreplication/movetables_buffering_test.go | 14 ++++++++++++-- go/vt/vtctl/workflow/server.go | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index f456c32bfd5..d7ae8db1e03 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -12,7 +12,14 @@ import ( ) func TestMoveTablesBuffering(t *testing.T) { - defaultRdonly = 1 + ogReplicas := defaultReplicas + ogRdOnly := defaultRdonly + defer func() { + defaultReplicas = ogReplicas + defaultRdonly = ogRdOnly + }() + defaultRdonly = 0 + defaultReplicas = 0 vc = setupMinimalCluster(t) defer vc.TearDown() @@ -32,11 +39,14 @@ func TestMoveTablesBuffering(t *testing.T) { catchup(t, targetTab1, workflowName, "MoveTables") catchup(t, targetTab2, workflowName, "MoveTables") - vdiffSideBySide(t, ksWorkflow, "") waitForLowLag(t, "customer", workflowName) + vdiffSideBySide(t, ksWorkflow, "") + reverseWorkflowName := workflowName + "_reverse" for i := 0; i < 10; i++ { + waitForLowLag(t, "customer", workflowName) tstWorkflowSwitchReadsAndWrites(t) time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) + waitForLowLag(t, "customer", reverseWorkflowName) tstWorkflowReverseReadsAndWrites(t) time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 38a60d07ced..aea51898bc8 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3202,7 +3202,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor // This means that the reverse workflow exists. So we update the starting state // so that we're using the reverse workflow and we can move forward with a normal // traffic switch forward operation, from the reverse workflow's perspective. - ts, startState, err = s.getWorkflowState(ctx, startState.SourceKeyspace, ts.reverseWorkflow) + ts, startState, err = s.getWorkflowState(ctx, ts.sourceKeyspace, ts.reverseWorkflow) if err != nil { return nil, err } From 7628c577f8482116b828baa88892a3fa9077197e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Oct 2024 12:18:02 -0400 Subject: [PATCH 22/29] Remove unnecessary test changes Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/movetables_buffering_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index d7ae8db1e03..eed96768fc5 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -39,14 +39,11 @@ func TestMoveTablesBuffering(t *testing.T) { catchup(t, targetTab1, workflowName, "MoveTables") catchup(t, targetTab2, workflowName, "MoveTables") - waitForLowLag(t, "customer", workflowName) vdiffSideBySide(t, ksWorkflow, "") - reverseWorkflowName := workflowName + "_reverse" + waitForLowLag(t, "customer", workflowName) for i := 0; i < 10; i++ { - waitForLowLag(t, "customer", workflowName) tstWorkflowSwitchReadsAndWrites(t) time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) - waitForLowLag(t, "customer", reverseWorkflowName) tstWorkflowReverseReadsAndWrites(t) time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) } From 394ce81a9ac14518b390df7d7fd66483810d56c0 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Oct 2024 12:20:43 -0400 Subject: [PATCH 23/29] Restore globals Signed-off-by: Matt Lord --- .../endtoend/vreplication/resharding_workflows_v2_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 128fff4c341..f82fbb2b04f 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -425,6 +425,12 @@ func getCurrentStatus(t *testing.T) string { // but CI currently fails on creating multiple clusters even after the previous ones are torn down func TestBasicV2Workflows(t *testing.T) { + ogReplicas := defaultReplicas + ogRdOnly := defaultRdonly + defer func() { + defaultReplicas = ogReplicas + defaultRdonly = ogRdOnly + }() defaultReplicas = 1 defaultRdonly = 1 extraVTTabletArgs = []string{ From 9dbb24ebe4eb1cc05accdd9b64d5331dda95d02c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 14 Oct 2024 17:54:57 -0400 Subject: [PATCH 24/29] Check workflow states and do a final vdiff in new tests Signed-off-by: Matt Lord --- .../vreplication_vtctldclient_cli_test.go | 38 +++++++++++++++++++ .../endtoend/vreplication/wrappers_test.go | 19 ++++++++++ 2 files changed, 57 insertions(+) diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index 86b87d52f21..a379239c246 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -31,6 +31,7 @@ import ( "vitess.io/vitess/go/json2" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/wrangler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -169,6 +170,7 @@ func getMoveTablesShowResponse(mt *iMoveTables) *vtctldatapb.GetWorkflowsRespons // Validates some of the flags created from the previous test. func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetKeyspace, workflowName string, targetTabs map[string]*cluster.VttabletProcess) { ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName) + wf := (*mt).(iWorkflow) (*mt).Start() // Need to start because we set auto-start to false. waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String()) confirmNoRoutingRules(t) @@ -193,65 +195,79 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) (*mt).ReverseReads() validateReadsRouteToSource(t, "replica") validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) validateRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).ReverseReadsAndWrites() validateReadsRouteToSource(t, "replica") validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToSource(t) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) validateRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).ReverseReads() validateReadsRouteToSource(t, "replica") validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToTarget(t) validateRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched) (*mt).ReverseWrites() validateReadsRouteToSource(t, "replica") validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToSource(t) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) validateRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).ReverseWrites() validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToSource(t) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateReadsSwitched) (*mt).ReverseReads() validateReadsRouteToSource(t, "replica") validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToSource(t) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) + + // Confirm that everything is still in sync after our switch fest. + vdiff(t, targetKeyspace, workflowName, "zone1", false, true, nil) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) validateRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).Complete() confirmRoutingRulesExist(t) @@ -443,6 +459,7 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards }, workflowFlavorVtctld) ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName) + wf := rs.(iWorkflow) rs.Create() validateReshardResponse(rs) validateOverrides(t, targetTabs, overrides) @@ -491,56 +508,72 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil) shardReadsRouteToTarget() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.ReverseReadsAndWrites() waitForLowLag(t, keyspace, workflowName) vdiff(t, keyspace, workflowName, "zone1", false, true, nil) shardReadsRouteToSource() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) rs.SwitchReads() shardReadsRouteToTarget() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) rs.ReverseReads() shardReadsRouteToSource() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) rs.SwitchReadsAndWrites() shardReadsRouteToTarget() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.ReverseReadsAndWrites() shardReadsRouteToSource() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) rs.SwitchReadsAndWrites() shardReadsRouteToTarget() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.ReverseReads() shardReadsRouteToSource() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched) rs.ReverseWrites() shardReadsRouteToSource() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched) rs.SwitchReadsAndWrites() shardReadsRouteToTarget() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.ReverseWrites() shardReadsRouteToTarget() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateReadsSwitched) rs.ReverseReads() shardReadsRouteToSource() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) + + // Confirm that everything is still in sync after our switch fest. + vdiff(t, keyspace, workflowName, "zone1", false, true, nil) rs.SwitchReadsAndWrites() shardReadsRouteToTarget() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.Complete() } @@ -910,3 +943,8 @@ func testOneRoutingRulesCommand(t *testing.T, typ string, rules string, validate }) } } + +func confirmStates(t *testing.T, workflow *iWorkflow, startState, endState string) { + require.Contains(t, (*workflow).GetLastOutput(), fmt.Sprintf("Start State: %s", startState)) + require.Contains(t, (*workflow).GetLastOutput(), fmt.Sprintf("Current State: %s", endState)) +} diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index e46bc6acda0..c91eeddcfd0 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -42,6 +42,7 @@ type iWorkflow interface { Flavor() string GetLastOutput() string Start() + Status() Stop() } @@ -149,6 +150,11 @@ func (vmt *VtctlMoveTables) Show() { panic("implement me") } +func (vmt *VtctlMoveTables) Status() { + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables + vmt.exec("Status") +} + func (vmt *VtctlMoveTables) exec(action string) { options := &workflowExecOptions{ deferSecondaryKeys: false, @@ -263,6 +269,10 @@ func (v VtctldMoveTables) Show() { v.exec(args...) } +func (v VtctldMoveTables) Status() { + v.exec("Status") +} + func (v VtctldMoveTables) SwitchReads() { args := []string{"SwitchTraffic", "--tablet-types=rdonly,replica"} args = append(args, v.switchFlags...) @@ -379,6 +389,11 @@ func (vrs *VtctlReshard) MirrorTraffic() { panic("implement me") } +func (vrs *VtctlReshard) Status() { + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard + vrs.exec("Status") +} + func (vrs *VtctlReshard) SwitchReadsAndWrites() { vrs.exec(workflowActionSwitchTraffic) } @@ -486,6 +501,10 @@ func (v VtctldReshard) Show() { v.exec("Show") } +func (v *VtctldReshard) Status() { + v.exec("Status") +} + func (v VtctldReshard) SwitchReads() { args := []string{"SwitchTraffic"} args = append(args, v.switchFlags...) From 8f241e8bfcb3015a4aac79f30dc3b4905f8bc45e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 15 Oct 2024 13:10:49 -0400 Subject: [PATCH 25/29] Update TestBasicV2Workflows so that it fails on main Signed-off-by: Matt Lord --- .../vreplication/resharding_workflows_v2_test.go | 10 +++++++--- go/test/endtoend/vreplication/wrappers_test.go | 2 +- go/vt/vtctl/workflow/server.go | 7 ++++--- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index f82fbb2b04f..c1d1c72c315 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -172,9 +172,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, args = append(args, "--tablet-types", tabletTypes) } args = append(args, "--action_timeout=10m") // At this point something is up so fail the test - if debugMode { - t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " ")) - } + t.Logf("Executing workflow command: vtctldclient %s", strings.Join(args, " ")) output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...) lastOutput = output if err != nil { @@ -746,6 +744,12 @@ func testPartialSwitches(t *testing.T) { tstWorkflowSwitchReads(t, "", "") checkStates(t, nextState, nextState) // idempotency + tstWorkflowReverseReads(t, "replica,rdonly", "") + checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) + + tstWorkflowSwitchReads(t, "", "") + checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) + tstWorkflowSwitchWrites(t) currentState = nextState nextState = wrangler.WorkflowStateAllSwitched diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index c91eeddcfd0..ab9a8eb9dfb 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -225,7 +225,7 @@ func (v VtctldMoveTables) exec(args ...string) { args2 := []string{"MoveTables", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace} args2 = append(args2, args...) var err error - v.vc.t.Logf("Executing command: vtctldclient %s", strings.Join(args2, " ")) + v.vc.t.Logf("Executing workflow command: vtctldclient %s", strings.Join(args2, " ")) v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...) require.NoError(v.vc.t, err, "failed MoveTables action, error: %v: output: %s", err, v.lastOutput) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index aea51898bc8..c339dcfbcd9 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3199,9 +3199,10 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor (direction == DirectionBackward && !startState.WritesSwitched) if direction == DirectionBackward && !onlySwitchingReads { - // This means that the reverse workflow exists. So we update the starting state - // so that we're using the reverse workflow and we can move forward with a normal - // traffic switch forward operation, from the reverse workflow's perspective. + // This means that the main workflow is FROZEN and the reverse workflow + // exists. So we update the starting state so that we're using the reverse + // workflow and we can move forward with a normal traffic switch forward + // operation, from the reverse workflow's perspective. ts, startState, err = s.getWorkflowState(ctx, ts.sourceKeyspace, ts.reverseWorkflow) if err != nil { return nil, err From e8edd10711a8b2c9fee4d065fc6f22715ea0b0ea Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 15 Oct 2024 17:57:39 -0400 Subject: [PATCH 26/29] kick CI Signed-off-by: Matt Lord From 118e0b3b83efec7501cc79bc68b669c8d3ed3367 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 15 Oct 2024 23:30:50 -0400 Subject: [PATCH 27/29] Restore unintentionally deleted condition Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index c339dcfbcd9..43ce3401fd9 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3138,10 +3138,6 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor span, ctx := trace.NewSpan(ctx, "workflow.Server.WorkflowSwitchTraffic") defer span.Finish() - if req == nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid nil request") - } - span.Annotate("keyspace", req.Keyspace) span.Annotate("workflow", req.Workflow) span.Annotate("tablet-types", req.TabletTypes) @@ -3191,6 +3187,14 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic") } + if direction == DirectionBackward && ts.IsMultiTenantMigration() { + // In a multi-tenant migration, multiple migrations would be writing to the same + // table, so we can't stop writes like we do with MoveTables, using denied tables, + // since it would block all other migrations as well as traffic for tenants which + // have already been migrated. + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations") + } + // We need this to know when there isn't a reverse workflow to use. onlySwitchingReads := !startState.WritesSwitched && !switchPrimary From 3fa88df44f00e4f20295fd51f39bc47049034bea Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 19 Oct 2024 23:46:59 +0200 Subject: [PATCH 28/29] Fix validateRoutingRule which had a bug causing the wrong value to be checked for the routed keyspace. Also it not actually failing if route was incorrect. Signed-off-by: Rohit Nayak --- .../resharding_workflows_v2_test.go | 24 +++++++--- .../vreplication_vtctldclient_cli_test.go | 44 +++++++++---------- 2 files changed, 39 insertions(+), 29 deletions(-) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index c1d1c72c315..c68b4f57099 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -1088,19 +1088,29 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) { require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err)) } -func validateRoutingRule(t *testing.T, table, tabletType, from, to string) bool { +func validateTableRoutingRule(t *testing.T, table, tabletType, fromKeyspace, toKeyspace string) { rr := getRoutingRules(t) + // We set matched = true by default because it is possible, if --no-routing-rules is set while creating a workflow, + // that the routing rules are empty when the workflow starts. + // We set it to false below when the rule is found, but before matching the routed keyspace. + matched := true for _, r := range rr.GetRules() { - s := fmt.Sprintf("%s.%s", from, table) + ruleKey := fmt.Sprintf("%s.%s", fromKeyspace, table) if tabletType != "" && tabletType != "primary" { - s = fmt.Sprintf("%s@%s", s, tabletType) + ruleKey = fmt.Sprintf("%s@%s", ruleKey, tabletType) } - if r.FromTable == s { + if r.FromTable == ruleKey { + matched = false // We found the rule, so we can set matched to false here and check for the routed keyspace below. toTable := r.ToTables[0] - if toTable == to { - return true + arr := strings.Split(toTable, ".") // The ToTables value is of the form "routedKeyspace".table". + if len(arr) == 2 { // Should always be true. + routedKeyspace := arr[0] + if routedKeyspace == toKeyspace { + matched = true // We found the rule and the keyspace matches. + break + } } } } - return false + require.Truef(t, matched, "Routing rule for %s.%s from %s to %s not found", fromKeyspace, table, tabletType, toKeyspace) } diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index a379239c246..1dc07a90abc 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -193,70 +193,70 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK (*mt).SwitchReads() validateReadsRouteToTarget(t, "replica") - validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) - validateRoutingRule(t, "customer", "", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) (*mt).ReverseReads() validateReadsRouteToSource(t, "replica") - validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) - validateRoutingRule(t, "customer", "", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") - validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) - validateRoutingRule(t, "customer", "", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "", sourceKs, targetKs) confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).ReverseReadsAndWrites() validateReadsRouteToSource(t, "replica") - validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToSource(t) - validateRoutingRule(t, "customer", "", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") - validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) - validateRoutingRule(t, "customer", "", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "", sourceKs, targetKs) confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).ReverseReads() validateReadsRouteToSource(t, "replica") - validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToTarget(t) - validateRoutingRule(t, "customer", "", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "", sourceKs, targetKs) confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched) (*mt).ReverseWrites() validateReadsRouteToSource(t, "replica") - validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToSource(t) - validateRoutingRule(t, "customer", "", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) confirmStates(t, &wf, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") - validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) - validateRoutingRule(t, "customer", "", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "", sourceKs, targetKs) confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).ReverseWrites() validateReadsRouteToTarget(t, "replica") - validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToSource(t) - validateRoutingRule(t, "customer", "", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateReadsSwitched) (*mt).ReverseReads() validateReadsRouteToSource(t, "replica") - validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToSource(t) - validateRoutingRule(t, "customer", "", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) // Confirm that everything is still in sync after our switch fest. @@ -264,9 +264,9 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") - validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) - validateRoutingRule(t, "customer", "", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "", sourceKs, targetKs) confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).Complete() From e9685d504f9d1d3814a1c677765dea688d99fce4 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 20 Oct 2024 13:02:12 -0400 Subject: [PATCH 29/29] Minor changes after final self review Signed-off-by: Matt Lord --- .../resharding_workflows_v2_test.go | 32 +++++++++++-------- go/vt/vtctl/workflow/server.go | 4 +-- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index c68b4f57099..28ffc762ecd 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -1089,28 +1089,32 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) { } func validateTableRoutingRule(t *testing.T, table, tabletType, fromKeyspace, toKeyspace string) { + tabletType = strings.ToLower(strings.TrimSpace(tabletType)) rr := getRoutingRules(t) - // We set matched = true by default because it is possible, if --no-routing-rules is set while creating a workflow, - // that the routing rules are empty when the workflow starts. + // We set matched = true by default because it is possible, if --no-routing-rules is set while creating + // a workflow, that the routing rules are empty when the workflow starts. // We set it to false below when the rule is found, but before matching the routed keyspace. matched := true for _, r := range rr.GetRules() { - ruleKey := fmt.Sprintf("%s.%s", fromKeyspace, table) + fromRule := fmt.Sprintf("%s.%s", fromKeyspace, table) if tabletType != "" && tabletType != "primary" { - ruleKey = fmt.Sprintf("%s@%s", ruleKey, tabletType) + fromRule = fmt.Sprintf("%s@%s", fromRule, tabletType) } - if r.FromTable == ruleKey { - matched = false // We found the rule, so we can set matched to false here and check for the routed keyspace below. + if r.FromTable == fromRule { + // We found the rule, so we can set matched to false here and check for the routed keyspace below. + matched = false + require.NotEmpty(t, r.ToTables) toTable := r.ToTables[0] - arr := strings.Split(toTable, ".") // The ToTables value is of the form "routedKeyspace".table". - if len(arr) == 2 { // Should always be true. - routedKeyspace := arr[0] - if routedKeyspace == toKeyspace { - matched = true // We found the rule and the keyspace matches. - break - } + // The ToTables value is of the form "routedKeyspace.table". + routedKeyspace, routedTable, ok := strings.Cut(toTable, ".") + require.True(t, ok) + require.Equal(t, table, routedTable) + if routedKeyspace == toKeyspace { + // We found the rule, the table and keyspace matches, so our search is done. + matched = true + break } } } - require.Truef(t, matched, "Routing rule for %s.%s from %s to %s not found", fromKeyspace, table, tabletType, toKeyspace) + require.Truef(t, matched, "routing rule for %s.%s from %s to %s not found", fromKeyspace, table, tabletType, toKeyspace) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 43ce3401fd9..b975b98b2d3 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3143,7 +3143,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor span.Annotate("tablet-types", req.TabletTypes) span.Annotate("direction", req.Direction) span.Annotate("enable-reverse-replication", req.EnableReverseReplication) - span.Annotate("shards", req.GetShards) + span.Annotate("shards", req.Shards) span.Annotate("force", req.Force) var ( @@ -3195,7 +3195,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations") } - // We need this to know when there isn't a reverse workflow to use. + // We need this to know when there isn't a (non-FROZEN) reverse workflow to use. onlySwitchingReads := !startState.WritesSwitched && !switchPrimary // We need this for idempotency and to avoid unnecessary work and resulting risk.