Skip to content

Commit

Permalink
Add ForAllUIDs to interface and export
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Mason <[email protected]>
Signed-off-by: Vilius Okockis <[email protected]>
  • Loading branch information
ajm188 authored and DeathBorn committed Apr 12, 2024
1 parent b7a666b commit ae1c8e8
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 22 deletions.
1 change: 1 addition & 0 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type ITrafficSwitcher interface {

ForAllSources(f func(source *MigrationSource) error) error
ForAllTargets(f func(target *MigrationTarget) error) error
ForAllUIDs(f func(target *MigrationTarget, uid uint32) error) error
SourceShards() []*topo.ShardInfo
TargetShards() []*topo.ShardInfo
}
Expand Down
42 changes: 21 additions & 21 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,25 @@ func (ts *trafficSwitcher) ForAllTargets(f func(source *workflow.MigrationTarget
return allErrors.AggrError(vterrors.Aggregate)
}

func (ts *trafficSwitcher) ForAllUIDs(f func(target *workflow.MigrationTarget, uid uint32) error) error {
var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
for _, target := range ts.Targets() {
for uid := range target.Sources {
wg.Add(1)
go func(target *workflow.MigrationTarget, uid uint32) {
defer wg.Done()

if err := f(target, uid); err != nil {
allErrors.RecordError(err)
}
}(target, uid)
}
}
wg.Wait()
return allErrors.AggrError(vterrors.Aggregate)
}

/* end: implementation of workflow.ITrafficSwitcher */

// For a Reshard, to check whether we have switched reads for a tablet type, we check if any one of the source shards has
Expand Down Expand Up @@ -1087,7 +1106,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati
ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
defer cancel()
// source writes have been stopped, wait for all streams on targets to catch up
if err := ts.forAllUids(func(target *workflow.MigrationTarget, uid uint32) error {
if err := ts.ForAllUIDs(func(target *workflow.MigrationTarget, uid uint32) error {
ts.Logger().Infof("Before Catchup: uid: %d, target master %s, target position %s, shard %s", uid,
target.GetPrimary().AliasString(), target.Position, target.GetShard().String())
bls := target.Sources[uid]
Expand Down Expand Up @@ -1168,7 +1187,7 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error
if err := ts.deleteReverseVReplication(ctx); err != nil {
return err
}
err := ts.forAllUids(func(target *workflow.MigrationTarget, uid uint32) error {
err := ts.ForAllUIDs(func(target *workflow.MigrationTarget, uid uint32) error {
bls := target.Sources[uid]
source := ts.Sources()[bls.Shard]
reverseBls := &binlogdatapb.BinlogSource{
Expand Down Expand Up @@ -1396,25 +1415,6 @@ func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace stri
return ts.wr.refreshMasters(ctx, shards)
}

func (ts *trafficSwitcher) forAllUids(f func(target *workflow.MigrationTarget, uid uint32) error) error {
var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
for _, target := range ts.targets {
for uid := range target.Sources {
wg.Add(1)
go func(target *workflow.MigrationTarget, uid uint32) {
defer wg.Done()

if err := f(target, uid); err != nil {
allErrors.RecordError(err)
}
}(target, uid)
}
}
wg.Wait()
return allErrors.AggrError(vterrors.Aggregate)
}

func (ts *trafficSwitcher) SourceShards() []*topo.ShardInfo {
shards := make([]*topo.ShardInfo, 0, len(ts.Sources()))
for _, source := range ts.Sources() {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func (df *vdiff) streamOne(ctx context.Context, keyspace, shard string, particip
func (df *vdiff) syncTargets(ctx context.Context, filteredReplicationWaitTime time.Duration) error {
waitCtx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
defer cancel()
err := df.ts.forAllUids(func(target *workflow.MigrationTarget, uid uint32) error {
err := df.ts.ForAllUIDs(func(target *workflow.MigrationTarget, uid uint32) error {
bls := target.Sources[uid]
pos := df.sources[bls.Shard].snapshotPosition
query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for vdiff' where id=%d", pos, uid)
Expand Down

0 comments on commit ae1c8e8

Please sign in to comment.