diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index e5177d81f3f..a507528d3a2 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -90,8 +90,9 @@ func SetTabletPickerRetryDelay(delay time.Duration) { } type TabletPickerOptions struct { - CellPreference string - TabletOrder string + CellPreference string + TabletOrder string + IncludeNonServingTablets bool } func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) { @@ -137,6 +138,7 @@ type TabletPicker struct { localCellInfo localCellInfo // This map is keyed on the results of TabletAlias.String(). ignoreTablets map[string]struct{} + options TabletPickerOptions } // NewTabletPicker returns a TabletPicker. @@ -232,6 +234,7 @@ func NewTabletPicker( inOrder: inOrder, cellPref: cellPref, ignoreTablets: make(map[string]struct{}, len(ignoreTablets)), + options: options, } for _, ignoreTablet := range ignoreTablets { @@ -292,6 +295,40 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo return candidates } +func (tp *TabletPicker) sortCandidates(ctx context.Context, candidates []*topo.TabletInfo) []*topo.TabletInfo { + if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias { + sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates) + + if tp.inOrder { + sameCellCandidates = tp.orderByTabletType(sameCellCandidates) + sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates) + allOtherCandidates = tp.orderByTabletType(allOtherCandidates) + } else { + // Randomize candidates + rand.Shuffle(len(sameCellCandidates), func(i, j int) { + sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i] + }) + rand.Shuffle(len(sameAliasCandidates), func(i, j int) { + sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i] + }) + rand.Shuffle(len(allOtherCandidates), func(i, j int) { + allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i] + }) + } + + candidates = append(sameCellCandidates, sameAliasCandidates...) + candidates = append(candidates, allOtherCandidates...) + } else if tp.inOrder { + candidates = tp.orderByTabletType(candidates) + } else { + // Randomize candidates. + rand.Shuffle(len(candidates), func(i, j int) { + candidates[i], candidates[j] = candidates[j], candidates[i] + }) + } + return candidates +} + // PickForStreaming picks a tablet that is healthy and serving. // Selection is based on CellPreference. // See prioritizeTablets for prioritization logic. @@ -305,36 +342,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table default: } candidates := tp.GetMatchingTablets(ctx) - if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias { - sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates) - - if tp.inOrder { - sameCellCandidates = tp.orderByTabletType(sameCellCandidates) - sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates) - allOtherCandidates = tp.orderByTabletType(allOtherCandidates) - } else { - // Randomize candidates - rand.Shuffle(len(sameCellCandidates), func(i, j int) { - sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i] - }) - rand.Shuffle(len(sameAliasCandidates), func(i, j int) { - sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i] - }) - rand.Shuffle(len(allOtherCandidates), func(i, j int) { - allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i] - }) - } - - candidates = append(sameCellCandidates, sameAliasCandidates...) - candidates = append(candidates, allOtherCandidates...) - } else if tp.inOrder { - candidates = tp.orderByTabletType(candidates) - } else { - // Randomize candidates. - rand.Shuffle(len(candidates), func(i, j int) { - candidates[i], candidates[j] = candidates[j], candidates[i] - }) - } + candidates = tp.sortCandidates(ctx, candidates) if len(candidates) == 0 { // If no viable candidates were found, sleep and try again. tp.incNoTabletFoundStat() @@ -349,7 +357,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } continue } - log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String()) + log.Infof("Tablet picker found a healthy tablet for streaming: %s", candidates[0].Tablet.String()) return candidates[0].Tablet, nil } } @@ -443,7 +451,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error { - if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" { + if shr != nil && + (shr.Serving || tp.options.IncludeNonServingTablets) && + shr.RealtimeStats != nil && + shr.RealtimeStats.HealthError == "" { return io.EOF // End the stream } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index ac822124d58..76a8828afec 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -31,6 +31,11 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +const ( + contextTimeout = 5 * time.Second + numTestIterations = 50 +) + func TestPickPrimary(t *testing.T) { defer utils.EnsureNoLeaks(t) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) @@ -591,6 +596,72 @@ func TestPickFallbackType(t *testing.T) { assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet) } +// TestPickNonServingTablets validates that non serving tablets are included when the +// IncludeNonServingTablets option is set. Unhealthy tablets should not be picked, irrespective of this option. +func TestPickNonServingTablets(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + cells := []string{"cell1", "cell2"} + localCell := cells[0] + tabletTypes := "replica,primary" + options := TabletPickerOptions{} + te := newPickerTestEnv(t, ctx, cells) + + // Tablet should be selected as it is healthy and serving. + primaryTablet := addTablet(ctx, te, 100, topodatapb.TabletType_PRIMARY, localCell, true, true) + defer deleteTablet(t, te, primaryTablet) + + // Tablet should not be selected as it is unhealthy. + replicaTablet := addTablet(ctx, te, 200, topodatapb.TabletType_REPLICA, localCell, false, false) + defer deleteTablet(t, te, replicaTablet) + + // Tablet should be selected because the IncludeNonServingTablets option is set and it is healthy. + replicaTablet2 := addTablet(ctx, te, 300, topodatapb.TabletType_REPLICA, localCell, false, true) + defer deleteTablet(t, te, replicaTablet2) + + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, contextTimeout) + defer cancel() + _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = primaryTablet.Alias + return nil + }) + require.NoError(t, err) + + tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) + require.NoError(t, err) + ctx2, cancel2 := context.WithTimeout(ctx, contextTimeout) + defer cancel2() + tablet, err := tp.PickForStreaming(ctx2) + require.NoError(t, err) + // IncludeNonServingTablets is false: only the healthy serving tablet should be picked. + assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet) + + options.IncludeNonServingTablets = true + tp, err = NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) + require.NoError(t, err) + ctx3, cancel3 := context.WithTimeout(ctx, contextTimeout) + defer cancel3() + var picked1, picked2, picked3 bool + // IncludeNonServingTablets is true: both the healthy tablets should be picked even though one is not serving. + for i := 0; i < numTestIterations; i++ { + tablet, err := tp.PickForStreaming(ctx3) + require.NoError(t, err) + if proto.Equal(tablet, primaryTablet) { + picked1 = true + } + if proto.Equal(tablet, replicaTablet) { + picked2 = true + } + if proto.Equal(tablet, replicaTablet2) { + picked3 = true + } + } + assert.True(t, picked1) + assert.False(t, picked2) + assert.True(t, picked3) +} + type pickerTestEnv struct { t *testing.T keyspace string diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index de93895a4eb..22b1d3f5374 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -58,6 +58,7 @@ type controller struct { id int64 // id from row in _vt.vdiff uuid string workflow string + workflowType binlogdatapb.VReplicationWorkflowType cancel context.CancelFunc dbClientFactory func() binlogplayer.DBClient ts *topo.Server @@ -227,6 +228,12 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient) ct.sourceKeyspace = bls.Keyspace ct.filter = bls.Filter } + + workflowType, err := row["workflow_type"].ToInt64() + if err != nil { + return err + } + ct.workflowType = binlogdatapb.VReplicationWorkflowType(workflowType) } if err := ct.validate(); err != nil { diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index d3761436285..c0cba599bdd 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -229,11 +229,13 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { } sourceTopoServer = extTS } + tabletPickerOptions := discovery.TabletPickerOptions{} wg.Add(1) go func() { defer wg.Done() sourceErr = td.forEachSource(func(source *migrationSource) error { - sourceTablet, err := pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.sourceKeyspace, source.shard, td.wd.opts.PickerOptions.TabletTypes) + sourceTablet, err := td.pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.sourceKeyspace, + source.shard, td.wd.opts.PickerOptions.TabletTypes, tabletPickerOptions) if err != nil { return err } @@ -245,8 +247,15 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - targetTablet, targetErr = pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.vde.thisTablet.Keyspace, - td.wd.ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes) + if td.wd.ct.workflowType == binlogdatapb.VReplicationWorkflowType_Reshard { + // For resharding, the target shards could be non-serving if traffic has already been switched once. + // When shards are created their IsPrimaryServing attribute is set to true. However, when the traffic is switched + // it is set to false for the shards we are switching from. We don't have a way to know if we have + // switched or not, so we just include non-serving tablets for all reshards. + tabletPickerOptions.IncludeNonServingTablets = true + } + targetTablet, targetErr = td.pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Keyspace, + td.wd.ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes, tabletPickerOptions) if targetErr != nil { return } @@ -263,8 +272,11 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { return targetErr } -func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodatapb.Tablet, error) { - tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) +func (td *tableDiffer) pickTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace, + shard, tabletTypes string, options discovery.TabletPickerOptions) (*topodatapb.Tablet, error) { + + tp, err := discovery.NewTabletPicker(ctx, ts, cells, td.wd.ct.vde.thisTablet.Alias.Cell, keyspace, + shard, tabletTypes, options) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index d38b18e33bf..b9aad39fe6c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -268,11 +268,11 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() { - log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err) if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil { - log.Errorf("INTERNAL: unable to setState() in controller. Attempting to set error text: [%v]; setState() error is: %v", err, errSetState) + log.Errorf("INTERNAL: unable to setState() in controller: %v. Could not set error text to: %v.", errSetState, err) return err // yes, err and not errSetState. } + log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err) return nil // this will cause vreplicate to quit the workflow } return err diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 3311d376431..d09232e8997 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -810,7 +810,8 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { if ts.ExternalTopo() != nil { sourceTopo = ts.ExternalTopo() } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, + df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return err } @@ -827,8 +828,18 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { wg.Add(1) go func() { defer wg.Done() + includeNonServingTablets := false + if df.ts.workflowType == binlogdatapb.VReplicationWorkflowType_Reshard { + // For resharding, the target shards could be non-serving if traffic has already been switched once. + // When shards are created their IsPrimaryServing attribute is set to true. However, when the traffic is switched + // it is set to false for the shards we are switching from. We don't have a way to know if we have + // switched or not, so we just include non-serving tablets for all reshards. + includeNonServingTablets = true + } err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, + df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, + discovery.TabletPickerOptions{IncludeNonServingTablets: includeNonServingTablets}) if err != nil { return err }