Skip to content

Commit

Permalink
Don't fail workflow cancel/cleanup if there's nothing to do
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed May 22, 2024
1 parent 51b75c1 commit 82005fb
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 16 deletions.
3 changes: 2 additions & 1 deletion go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ func (si *ShardInfo) updatePrimaryTabletControl(tc *topodatapb.Shard_TabletContr
}
if remove {
if len(newTables) != 0 {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, dlTablesNotPresent)
// These tables did not exist in the denied list so we don't need to remove them.
log.Warningf("%s:%s", dlTablesNotPresent, strings.Join(newTables, ","))
}
var newDenyList []string
if len(tables) != 0 { // legacy uses
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1971,7 +1971,7 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe

ts, state, err := s.getWorkflowState(ctx, req.GetKeyspace(), req.GetWorkflow())
if err != nil {
log.Errorf("Failed to get VReplication workflow state for %s.%s: %v", req.GetKeyspace(), req.GetWorkflow(), err)
log.Errorf("failed to get VReplication workflow state for %s.%s: %v", req.GetKeyspace(), req.GetWorkflow(), err)
return nil, err
}

Expand Down Expand Up @@ -2019,7 +2019,7 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
// Cleanup related data and artifacts.
if _, err := s.DropTargets(delCtx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil {
if topo.IsErrType(err, topo.NoNode) {
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.Keyspace)
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.GetKeyspace())
}
return nil, err
}
Expand Down Expand Up @@ -2807,8 +2807,8 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs
shardInfo, err := s.ts.GetShard(ctx, keyspace, shard)
if err != nil {
if topo.IsErrType(err, topo.NoNode) {
log.Infof("Shard %v/%v doesn't seem to exist, cleaning up any potential leftover", keyspace, shard)
return s.ts.DeleteShard(ctx, keyspace, shard)
log.Warningf("Shard %v/%v did not exist when attempting to remove it", keyspace, shard)
return nil
}
return err
}
Expand Down
37 changes: 26 additions & 11 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ import (
"sync"
"time"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"

"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
Expand All @@ -53,6 +52,7 @@ import (
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

Expand Down Expand Up @@ -428,6 +428,10 @@ func (ts *trafficSwitcher) deleteShardRoutingRules(ctx context.Context) error {
}
srr, err := topotools.GetShardRoutingRules(ctx, ts.TopoServer())
if err != nil {
if topo.IsErrType(err, topo.NoNode) {
log.Warningf("No shard routing rules found when attempting to delete the ones for the %s keyspace", ts.targetKeyspace)
return nil
}
return err
}
for _, si := range ts.TargetShards() {
Expand Down Expand Up @@ -520,14 +524,14 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType T
query := fmt.Sprintf("drop table %s.%s", primaryDbName, tableNameEscaped)
if removalType == DropTable {
ts.Logger().Infof("%s: Dropping table %s.%s\n",
source.GetPrimary().String(), source.GetPrimary().DbName(), tableName)
topoproto.TabletAliasString(source.GetPrimary().Alias), source.GetPrimary().DbName(), tableName)
} else {
renameName, err := sqlescape.EnsureEscaped(getRenameFileName(tableName))
if err != nil {
return err
}
ts.Logger().Infof("%s: Renaming table %s.%s to %s.%s\n",
source.GetPrimary().String(), source.GetPrimary().DbName(), tableName, source.GetPrimary().DbName(), renameName)
topoproto.TabletAliasString(source.GetPrimary().Alias), source.GetPrimary().DbName(), tableName, source.GetPrimary().DbName(), renameName)
query = fmt.Sprintf("rename table %s.%s TO %s.%s", primaryDbName, tableNameEscaped, primaryDbName, renameName)
}
_, err = ts.ws.tmc.ExecuteFetchAsDba(ctx, source.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Expand All @@ -537,10 +541,14 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType T
DisableForeignKeyChecks: true,
})
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v", source.GetPrimary().String(), tableName, err)
if mysqlErr, ok := err.(*sqlerror.SQLError); ok && mysqlErr.Num == sqlerror.ERNoSuchTable {
ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(source.GetPrimary().Alias), tableName)
return nil
}
ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(source.GetPrimary().Alias), tableName, err)
return err
}
ts.Logger().Infof("%s: Removed table %s.%s\n", source.GetPrimary().String(), source.GetPrimary().DbName(), tableName)
ts.Logger().Infof("%s: Removed table %s.%s\n", topoproto.TabletAliasString(source.GetPrimary().Alias), source.GetPrimary().DbName(), tableName)

}
return nil
Expand Down Expand Up @@ -833,6 +841,7 @@ func (ts *trafficSwitcher) deleteReverseVReplication(ctx context.Context) error
return ts.ForAllSources(func(source *MigrationSource) error {
query := fmt.Sprintf(sqlDeleteWorkflow, encodeString(source.GetPrimary().DbName()), encodeString(ts.reverseWorkflow))
if _, err := ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, query); err != nil {
// vreplication.exec returns no error on delete if the rows do not exist.
return err
}
ts.ws.deleteWorkflowVDiffData(ctx, source.GetPrimary().Tablet, ts.reverseWorkflow)
Expand Down Expand Up @@ -1112,6 +1121,7 @@ func (ts *trafficSwitcher) dropTargetVReplicationStreams(ctx context.Context) er
ts.Logger().Infof("Deleting target streams and related data for workflow %s db_name %s", ts.WorkflowName(), target.GetPrimary().DbName())
query := fmt.Sprintf(sqlDeleteWorkflow, encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
if _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query); err != nil {
// vreplication.exec returns no error on delete if the rows do not exist.
return err
}
ts.ws.deleteWorkflowVDiffData(ctx, target.GetPrimary().Tablet, ts.WorkflowName())
Expand All @@ -1125,6 +1135,7 @@ func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Cont
ts.Logger().Infof("Deleting reverse streams and related data for workflow %s db_name %s", ts.WorkflowName(), source.GetPrimary().DbName())
query := fmt.Sprintf(sqlDeleteWorkflow, encodeString(source.GetPrimary().DbName()), encodeString(ReverseWorkflowName(ts.WorkflowName())))
if _, err := ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, query); err != nil {
// vreplication.exec returns no error on delete if the rows do not exist.
return err
}
ts.ws.deleteWorkflowVDiffData(ctx, source.GetPrimary().Tablet, ReverseWorkflowName(ts.WorkflowName()))
Expand All @@ -1147,7 +1158,7 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
}
query := fmt.Sprintf("drop table %s.%s", primaryDbName, tableName)
ts.Logger().Infof("%s: Dropping table %s.%s\n",
target.GetPrimary().String(), target.GetPrimary().DbName(), tableName)
topoproto.TabletAliasString(target.GetPrimary().Alias), target.GetPrimary().DbName(), tableName)
res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
Expand All @@ -1156,12 +1167,16 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
})
log.Infof("Removed target table with result: %+v", res)
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v",
target.GetPrimary().String(), tableName, err)
if mysqlErr, ok := err.(*sqlerror.SQLError); ok && mysqlErr.Num == sqlerror.ERNoSuchTable {
// The table was already gone, so we can ignore the error.
ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(target.GetPrimary().Alias), tableName)
return nil
}
ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(target.GetPrimary().Alias), tableName, err)
return err
}
ts.Logger().Infof("%s: Removed table %s.%s\n",
target.GetPrimary().String(), target.GetPrimary().DbName(), tableName)
topoproto.TabletAliasString(target.GetPrimary().Alias), target.GetPrimary().DbName(), tableName)

}
return nil
Expand Down Expand Up @@ -1626,7 +1641,7 @@ func (ts *trafficSwitcher) resetSequences(ctx context.Context) error {
}
return ts.ForAllSources(func(source *MigrationSource) error {
ts.Logger().Infof("Resetting sequences for source shard %s.%s on tablet %s",
source.GetShard().Keyspace(), source.GetShard().ShardName(), source.GetPrimary().String())
source.GetShard().Keyspace(), source.GetShard().ShardName(), topoproto.TabletAliasString(source.GetPrimary().Alias))
return ts.TabletManagerClient().ResetSequences(ctx, source.GetPrimary().Tablet, ts.Tables())
})
}
Expand Down

0 comments on commit 82005fb

Please sign in to comment.