diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 17b01736a77..7af3a3982ea 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1616,7 +1616,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa span, ctx := trace.NewSpan(ctx, "workflow.Server.MoveTablesComplete") defer span.Finish() - ts, state, err := s.getWorkflowState(ctx, req.TargetKeyspace, req.Workflow) + ts, state, err := s.getWorkflowState(ctx, req.GetTargetKeyspace(), req.GetWorkflow()) if err != nil { return nil, err } @@ -1630,8 +1630,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa var dryRunResults *[]string if state.WorkflowType == TypeMigrate { - dryRunResults, err = s.finalizeMigrateWorkflow(ctx, req.TargetKeyspace, req.Workflow, strings.Join(ts.tables, ","), - false, req.KeepData, req.KeepRoutingRules, req.DryRun) + dryRunResults, err = s.finalizeMigrateWorkflow(ctx, ts, strings.Join(ts.tables, ","), false, req.KeepData, req.KeepRoutingRules, req.DryRun) if err != nil { return nil, vterrors.Wrapf(err, "failed to finalize the %s workflow in the %s keyspace", req.Workflow, req.TargetKeyspace) @@ -1970,11 +1969,24 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe span.Annotate("keep_routing_rules", req.KeepRoutingRules) span.Annotate("shards", req.Shards) - // Cleanup related data and artifacts. - if _, err := s.DropTargets(ctx, req.Keyspace, req.Workflow, req.KeepData, req.KeepRoutingRules, false); err != nil { - if topo.IsErrType(err, topo.NoNode) { - return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.Keyspace) - } + ts, state, err := s.getWorkflowState(ctx, req.GetKeyspace(), req.GetWorkflow()) + if err != nil { + log.Errorf("Failed to get VReplication workflow state for %s.%s: %v", req.GetKeyspace(), req.GetWorkflow(), err) + return nil, err + } + + // There is nothing to drop for a LookupVindex workflow. + if ts.workflowType == binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { + return nil, nil + } + + // Return an error if the workflow traffic is partially switched. + if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 { + return nil, ErrWorkflowPartiallySwitched + } + + if state.WorkflowType == TypeMigrate { + _, err := s.finalizeMigrateWorkflow(ctx, ts, "", true, req.GetKeepData(), req.GetKeepRoutingRules(), false) return nil, err } @@ -2002,6 +2014,14 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "the %s workflow does not exist in the %s keyspace", req.Workflow, req.Keyspace) } + // Cleanup related data and artifacts. + if _, err := s.DropTargets(ctx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil { + if topo.IsErrType(err, topo.NoNode) { + return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.Keyspace) + } + return nil, err + } + response := &vtctldatapb.WorkflowDeleteResponse{} response.Summary = fmt.Sprintf("Successfully cancelled the %s workflow in the %s keyspace", req.Workflow, req.Keyspace) details := make([]*vtctldatapb.WorkflowDeleteResponse_TabletInfo, 0, len(res)) @@ -2497,28 +2517,8 @@ func (s *Server) optimizeCopyStateTable(tablet *topodatapb.Tablet) { // DropTargets cleans up target tables, shards and denied tables if a MoveTables/Reshard // is cancelled. -func (s *Server) DropTargets(ctx context.Context, targetKeyspace, workflow string, keepData, keepRoutingRules, dryRun bool) (*[]string, error) { - ts, state, err := s.getWorkflowState(ctx, targetKeyspace, workflow) - if err != nil { - log.Errorf("Failed to get VReplication workflow state for %s.%s: %v", targetKeyspace, workflow, err) - return nil, err - } - - // There is nothing to drop for a LookupVindex workflow. - if ts.workflowType == binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { - return nil, nil - } - - // Return an error if the workflow traffic is partially switched. - if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 { - return nil, ErrWorkflowPartiallySwitched - } - - if state.WorkflowType == TypeMigrate { - _, err := s.finalizeMigrateWorkflow(ctx, targetKeyspace, workflow, "", true, keepData, keepRoutingRules, dryRun) - return nil, err - } - +func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, keepRoutingRules, dryRun bool) (*[]string, error) { + var err error ts.keepRoutingRules = keepRoutingRules var sw iswitcher if dryRun { @@ -2942,13 +2942,11 @@ func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.Shard // finalizeMigrateWorkflow deletes the streams for the Migrate workflow. // We only cleanup the target for external sources. -func (s *Server) finalizeMigrateWorkflow(ctx context.Context, targetKeyspace, workflow, tableSpecs string, cancel, keepData, keepRoutingRules, dryRun bool) (*[]string, error) { - ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflow) - if err != nil { - ts.Logger().Errorf("buildTrafficSwitcher failed: %v", err) - return nil, err - } - var sw iswitcher +func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitcher, tableSpecs string, cancel, keepData, keepRoutingRules, dryRun bool) (*[]string, error) { + var ( + sw iswitcher + err error + ) if dryRun { sw = &switcherDryRun{ts: ts, drLog: NewLogRecorder()} } else { @@ -2966,7 +2964,7 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, targetKeyspace, wo return nil, err } if !cancel { - if err := sw.addParticipatingTablesToKeyspace(ctx, targetKeyspace, tableSpecs); err != nil { + if err := sw.addParticipatingTablesToKeyspace(ctx, ts.targetKeyspace, tableSpecs); err != nil { return nil, err } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil {