Skip to content

Commit

Permalink
VReplication: Return lock error everywhere that LockName fails (vites…
Browse files Browse the repository at this point in the history
…sio#16560)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Aug 13, 2024
1 parent 61959f6 commit cc68dd5
Show file tree
Hide file tree
Showing 4 changed files with 440 additions and 43 deletions.
108 changes: 108 additions & 0 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,23 @@ func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) {
delete(env.tablets[tablet.Keyspace], int(tablet.Alias.Uid))
}

func (env *testEnv) confirmRoutingAllTablesToTarget(t *testing.T) {
t.Helper()
env.tmc.mu.Lock()
defer env.tmc.mu.Unlock()
wantRR := make(map[string][]string)
for _, sd := range env.tmc.schema {
for _, td := range sd.TableDefinitions {
for _, tt := range []string{"", "@rdonly", "@replica"} {
wantRR[td.Name+tt] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)}
wantRR[fmt.Sprintf("%s.%s", env.sourceKeyspace.KeyspaceName, td.Name+tt)] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)}
wantRR[fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name+tt)] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)}
}
}
}
checkRouting(t, env.ws, wantRR)
}

type testTMClient struct {
tmclient.TabletManagerClient
schema map[string]*tabletmanagerdatapb.SchemaDefinition
Expand All @@ -240,6 +257,7 @@ type testTMClient struct {

env *testEnv // For access to the env config from tmc methods.
reverse atomic.Bool // Are we reversing traffic?
frozen atomic.Bool // Are the workflows frozen?
}

func newTestTMClient(env *testEnv) *testTMClient {
Expand Down Expand Up @@ -306,6 +324,9 @@ func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *t
},
},
}
if tmc.frozen.Load() {
stream.Message = Frozen
}
res.Streams = append(res.Streams, stream)
}

Expand Down Expand Up @@ -503,3 +524,90 @@ func (tmc *testTMClient) WaitForPosition(ctx context.Context, tablet *topodatapb
func (tmc *testTMClient) VReplicationWaitForPos(ctx context.Context, tablet *topodatapb.Tablet, id int32, pos string) error {
return nil
}

//
// Utility / helper functions.
//

func checkRouting(t *testing.T, ws *Server, want map[string][]string) {
t.Helper()
ctx := context.Background()
got, err := topotools.GetRoutingRules(ctx, ws.ts)
require.NoError(t, err)
require.EqualValues(t, got, want, "routing rules don't match: got: %v, want: %v", got, want)
cells, err := ws.ts.GetCellInfoNames(ctx)
require.NoError(t, err)
for _, cell := range cells {
checkCellRouting(t, ws, cell, want)
}
}

func checkCellRouting(t *testing.T, ws *Server, cell string, want map[string][]string) {
t.Helper()
ctx := context.Background()
svs, err := ws.ts.GetSrvVSchema(ctx, cell)
require.NoError(t, err)
got := make(map[string][]string, len(svs.RoutingRules.Rules))
for _, rr := range svs.RoutingRules.Rules {
got[rr.FromTable] = append(got[rr.FromTable], rr.ToTables...)
}
require.EqualValues(t, got, want, "routing rules don't match for cell %s: got: %v, want: %v", cell, got, want)
}

func checkDenyList(t *testing.T, ts *topo.Server, keyspace, shard string, want []string) {
t.Helper()
ctx := context.Background()
si, err := ts.GetShard(ctx, keyspace, shard)
require.NoError(t, err)
tc := si.GetTabletControl(topodatapb.TabletType_PRIMARY)
var got []string
if tc != nil {
got = tc.DeniedTables
}
require.EqualValues(t, got, want, "denied tables for %s/%s: got: %v, want: %v", keyspace, shard, got, want)
}

func checkServedTypes(t *testing.T, ts *topo.Server, keyspace, shard string, want int) {
t.Helper()
ctx := context.Background()
si, err := ts.GetShard(ctx, keyspace, shard)
require.NoError(t, err)
servedTypes, err := ts.GetShardServingTypes(ctx, si)
require.NoError(t, err)
require.Equal(t, want, len(servedTypes), "shard %s/%s has wrong served types: got: %v, want: %v",
keyspace, shard, len(servedTypes), want)
}

func checkCellServedTypes(t *testing.T, ts *topo.Server, keyspace, shard, cell string, want int) {
t.Helper()
ctx := context.Background()
srvKeyspace, err := ts.GetSrvKeyspace(ctx, cell, keyspace)
require.NoError(t, err)
count := 0
outer:
for _, partition := range srvKeyspace.GetPartitions() {
for _, ref := range partition.ShardReferences {
if ref.Name == shard {
count++
continue outer
}
}
}
require.Equal(t, want, count, "serving types for %s/%s in cell %s: got: %d, want: %d", keyspace, shard, cell, count, want)
}

func checkIfPrimaryServing(t *testing.T, ts *topo.Server, keyspace, shard string, want bool) {
t.Helper()
ctx := context.Background()
si, err := ts.GetShard(ctx, keyspace, shard)
require.NoError(t, err)
require.Equal(t, want, si.IsPrimaryServing, "primary serving for %s/%s: got: %v, want: %v", keyspace, shard, si.IsPrimaryServing, want)
}

func checkIfTableExistInVSchema(ctx context.Context, t *testing.T, ts *topo.Server, keyspace, table string) bool {
vschema, err := ts.GetVSchema(ctx, keyspace)
require.NoError(t, err)
require.NotNil(t, vschema)
_, ok := vschema.Tables[table]
return ok
}
63 changes: 28 additions & 35 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2581,20 +2581,18 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData,
lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName())
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropTargets")
if lockErr != nil {
ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr)
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr)
}
defer workflowUnlock(&err)
ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets")
if lockErr != nil {
ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr)
return nil, lockErr
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
}
defer sourceUnlock(&err)
if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() {
lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets")
if lockErr != nil {
ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr)
return nil, lockErr
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
}
defer targetUnlock(&err)
ctx = lockCtx
Expand Down Expand Up @@ -2779,20 +2777,18 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName())
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropSources")
if lockErr != nil {
ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr)
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr)
}
defer workflowUnlock(&err)
ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources")
if lockErr != nil {
ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr)
return nil, lockErr
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
}
defer sourceUnlock(&err)
if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() {
lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources")
if lockErr != nil {
ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr)
return nil, lockErr
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
}
defer targetUnlock(&err)
ctx = lockCtx
Expand Down Expand Up @@ -3020,13 +3016,12 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche
lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName())
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "completeMigrateWorkflow")
if lockErr != nil {
ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr)
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr)
}
defer workflowUnlock(&err)
ctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow")
if lockErr != nil {
ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr)
return nil, lockErr
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
}
defer targetUnlock(&err)

Expand Down Expand Up @@ -3193,16 +3188,10 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc

cellsStr := strings.Join(req.Cells, ",")

// Consistently handle errors by logging and returning them.
handleError := func(message string, err error) (*[]string, error) {
werr := vterrors.Wrapf(err, message)
ts.Logger().Error(werr)
return nil, werr
}

log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String())
if !switchReplica && !switchRdonly {
return handleError("invalid tablet types", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
return defaultErrorHandler(ts.Logger(), "invalid tablet types",
vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
}
// For partial (shard-by-shard migrations) or multi-tenant migrations, traffic for all tablet types
// is expected to be switched at once. For other MoveTables migrations where we use table routing rules
Expand All @@ -3214,24 +3203,28 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
trafficSwitchingIsAllOrNothing = true
case ts.MigrationType() == binlogdatapb.MigrationType_TABLES && ts.IsMultiTenantMigration():
if direction == DirectionBackward {
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "requesting reversal of read traffic for multi-tenant migrations is not supported"))
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"requesting reversal of read traffic for multi-tenant migrations is not supported"))
}
// For multi-tenant migrations, we only support switching traffic to all cells at once
allCells, err := ts.TopoServer().GetCellInfoNames(ctx)
if err != nil {
return nil, err
}
if len(req.GetCells()) != 0 && len(req.GetCells()) != len(allCells) {
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "requesting read traffic for multi-tenant migrations must include all cells"))
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"requesting read traffic for multi-tenant migrations must include all cells"))
}
}

if !trafficSwitchingIsAllOrNothing {
if direction == DirectionBackward && switchReplica && len(state.ReplicaCellsSwitched) == 0 {
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched"))
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched"))
}
if direction == DirectionBackward && switchRdonly && len(state.RdonlyCellsSwitched) == 0 {
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched"))
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched"))
}
}

Expand All @@ -3253,7 +3246,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
// If journals exist notify user and fail.
journalsExist, _, err := ts.checkJournals(ctx)
if err != nil {
return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err)
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err)
}
if journalsExist {
log.Infof("Found a previous journal entry for %d", ts.id)
Expand All @@ -3266,7 +3259,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}

if err := ts.validate(ctx); err != nil {
return handleError("workflow validation failed", err)
return defaultErrorHandler(ts.Logger(), "workflow validation failed", err)
}

// For switching reads, locking the source keyspace is sufficient.
Expand All @@ -3282,7 +3275,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
// For reads, locking the source keyspace is sufficient.
ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTTL))
if lockErr != nil {
return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
}
defer unlock(&err)
confirmKeyspaceLocksHeld := func() error {
Expand All @@ -3297,7 +3290,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc

// Remove mirror rules for the specified tablet types.
if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil {
return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types",
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types",
ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err)
}

Expand All @@ -3306,36 +3299,36 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
case ts.IsMultiTenantMigration():
err := sw.switchKeyspaceReads(ctx, roTabletTypes)
if err != nil {
return handleError(fmt.Sprintf("failed to switch read traffic, from source keyspace %s to target keyspace %s, workflow %s",
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to switch read traffic, from source keyspace %s to target keyspace %s, workflow %s",
ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err)
}
case ts.isPartialMigration:
ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.")
default:
err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, rebuildSrvVSchema, direction)
if err != nil {
return handleError("failed to switch read traffic for the tables", err)
return defaultErrorHandler(ts.Logger(), "failed to switch read traffic for the tables", err)
}
}
return sw.logs(), nil
}

if err := confirmKeyspaceLocksHeld(); err != nil {
return handleError("locks were lost", err)
return defaultErrorHandler(ts.Logger(), "locks were lost", err)
}
ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction)
if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil {
return handleError("failed to switch read traffic for the shards", err)
return defaultErrorHandler(ts.Logger(), "failed to switch read traffic for the shards", err)
}

if err := confirmKeyspaceLocksHeld(); err != nil {
return handleError("locks were lost", err)
return defaultErrorHandler(ts.Logger(), "locks were lost", err)
}
ts.Logger().Infof("switchShardReads Completed: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction)
if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, cellsStr); err != nil {
err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s",
ts.targetKeyspace, cellsStr)
return handleError("failed to validate SrvKeyspace record", err2)
return defaultErrorHandler(ts.Logger(), "failed to validate SrvKeyspace record", err2)
}
return sw.logs(), nil
}
Expand Down
Loading

0 comments on commit cc68dd5

Please sign in to comment.