diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 16bacc5f266..1d25aafa75f 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -254,6 +254,7 @@ type testTMClient struct { vrQueries map[int][]*queryResult createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest + primaryPositions map[uint32]string env *testEnv // For access to the env config from tmc methods. reverse atomic.Bool // Are we reversing traffic? @@ -266,6 +267,7 @@ func newTestTMClient(env *testEnv) *testTMClient { vrQueries: make(map[int][]*queryResult), createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), + primaryPositions: make(map[uint32]string), env: env, } } @@ -513,7 +515,21 @@ func (tmc *testTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet }, nil } +func (tmc *testTMClient) setPrimaryPosition(tablet *topodatapb.Tablet, position string) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + if tmc.primaryPositions == nil { + tmc.primaryPositions = make(map[uint32]string) + } + tmc.primaryPositions[tablet.Alias.Uid] = position +} + func (tmc *testTMClient) PrimaryPosition(ctx context.Context, tablet *topodatapb.Tablet) (string, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + if tmc.primaryPositions != nil && tmc.primaryPositions[tablet.Alias.Uid] != "" { + return tmc.primaryPositions[tablet.Alias.Uid], nil + } return position, nil } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index d0645a3a578..e1b9859e897 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3497,6 +3497,13 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } + // Get the source positions now that writes are stopped, the streams were stopped (e.g. + // intra-keyspace materializations that write on the source), and we know for certain + // that any in progress writes are done. + if err := ts.gatherSourcePositions(ctx); err != nil { + return handleError("failed to gather replication positions on migration sources", err) + } + if err := confirmKeyspaceLocksHeld(); err != nil { return handleError("locks were lost", err) } @@ -3725,7 +3732,7 @@ func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodat // Notify Replicas to reload schema. This is best-effort. reloadCtx, cancel := context.WithTimeout(ctx, waitReplicasTimeout) defer cancel() - _, ok := schematools.ReloadShard(reloadCtx, s.ts, s.tmc, logutil.NewMemoryLogger(), destKeyspace, destShard, destPrimaryPos, nil, true) + _, ok := schematools.ReloadShard(reloadCtx, s.ts, s.tmc, s.Logger(), destKeyspace, destShard, destPrimaryPos, nil, true) if !ok { s.Logger().Error(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "CopySchemaShard: failed to reload schema on all replicas")) } diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 044efd0d9cf..34e1e4e4329 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1016,19 +1016,10 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites) } if err != nil { - ts.Logger().Warningf("Error: %s", err) + ts.Logger().Warningf("Error stopping writes on migration sources: %v", err) return err } - return ts.ForAllSources(func(source *MigrationSource) error { - var err error - source.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, source.GetPrimary().Tablet) - ts.Logger().Infof("Stopped Source Writes. Position for source %v:%v: %v", - ts.SourceKeyspaceName(), source.GetShard().ShardName(), source.Position) - if err != nil { - ts.Logger().Warningf("Error: %s", err) - } - return err - }) + return nil } // switchDeniedTables switches the denied tables rules for the traffic switch. @@ -1318,6 +1309,24 @@ func (ts *trafficSwitcher) gatherPositions(ctx context.Context) error { }) } +// gatherSourcePositions will get the current replication position for all +// migration sources. +func (ts *trafficSwitcher) gatherSourcePositions(ctx context.Context) error { + return ts.ForAllSources(func(source *MigrationSource) error { + var err error + tablet := source.GetPrimary().Tablet + tabletAlias := topoproto.TabletAliasString(tablet.Alias) + source.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, tablet) + if err != nil { + ts.Logger().Errorf("Error getting migration source position on %s: %s", tabletAlias, err) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get position on migration source %s: %v", + tabletAlias, err) + } + ts.Logger().Infof("Position on migration source %s after having stopped writes: %s", tabletAlias, source.Position) + return nil + }) +} + func (ts *trafficSwitcher) isSequenceParticipating(ctx context.Context) (bool, error) { vschema, err := ts.TopoServer().GetVSchema(ctx, ts.targetKeyspace) if err != nil { diff --git a/go/vt/vtctl/workflow/traffic_switcher_test.go b/go/vt/vtctl/workflow/traffic_switcher_test.go index 5c0b2aba682..dfe394b2608 100644 --- a/go/vt/vtctl/workflow/traffic_switcher_test.go +++ b/go/vt/vtctl/workflow/traffic_switcher_test.go @@ -20,12 +20,15 @@ import ( "context" "fmt" "reflect" + "strconv" + "strings" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/proto/vschema" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -361,3 +364,72 @@ func TestGetTargetSequenceMetadata(t *testing.T) { }) } } + +// TestSwitchTrafficPositionHandling confirms that if any writes are somehow +// executed against the source between the stop source writes and wait for +// catchup steps, that we have the correct position and do not lose the write(s). +func TestTrafficSwitchPositionHandling(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + tableName := "t1" + sourceKeyspaceName := "sourceks" + targetKeyspaceName := "targetks" + + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + tableName: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: tableName, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), + }, + }, + }, + } + + sourceKeyspace := &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"0"}, + } + + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + env.tmc.schema = schema + + ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) + require.NoError(t, err) + sw := &switcher{ts: ts, s: env.ws} + + lockCtx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "test") + require.NoError(t, lockErr) + ctx = lockCtx + defer sourceUnlock(&err) + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "test") + require.NoError(t, lockErr) + ctx = lockCtx + defer targetUnlock(&err) + + err = ts.stopSourceWrites(ctx) + require.NoError(t, err) + + // Now we simulate a write on the source. + newPosition := position[:strings.LastIndex(position, "-")+1] + oldSeqNo, err := strconv.Atoi(position[strings.LastIndex(position, "-")+1:]) + require.NoError(t, err) + newPosition = fmt.Sprintf("%s%d", newPosition, oldSeqNo+1) + env.tmc.setPrimaryPosition(env.tablets[sourceKeyspaceName][startingSourceTabletUID], newPosition) + + // And confirm that we picked up the new position. + err = ts.gatherSourcePositions(ctx) + require.NoError(t, err) + err = ts.ForAllSources(func(ms *MigrationSource) error { + require.Equal(t, newPosition, ms.Position) + return nil + }) + require.NoError(t, err) +} diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 1e317b5c69a..374d96396f2 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -398,8 +398,7 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag optionsJSON := wf.GetOptions() if optionsJSON != "" { if err := json.Unmarshal([]byte(optionsJSON), &options); err != nil { - log.Errorf("failed to unmarshal options: %v %s", err, optionsJSON) - return nil, err + return nil, vterrors.Wrapf(err, "failed to unmarshal options: %s", optionsJSON) } } @@ -671,7 +670,7 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf wg.Wait() if allErrors.HasErrors() { - log.Errorf("%s", allErrors.Error()) + ts.Logger().Errorf("%s", allErrors.Error()) return allErrors.Error() } return nil