Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow tablet picker to exclude specified tablets from its candidate list #14224

Merged
merged 11 commits into from
Oct 23, 2023
8 changes: 7 additions & 1 deletion go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type TabletPicker struct {
inOrder bool
cellPref TabletPickerCellPreference
localCellInfo localCellInfo
ignoreTablets map[string]*topodatapb.TabletAlias
}

// NewTabletPicker returns a TabletPicker.
Expand All @@ -144,6 +145,7 @@ func NewTabletPicker(
cells []string,
localCell, keyspace, shard, tabletTypesStr string,
options TabletPickerOptions,
ignoreTablets map[string]*topodatapb.TabletAlias,
mattlord marked this conversation as resolved.
Show resolved Hide resolved
) (*TabletPicker, error) {
// Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted.
if tabletTypesStr == "" {
Expand Down Expand Up @@ -228,6 +230,7 @@ func NewTabletPicker(
tabletTypes: tabletTypes,
inOrder: inOrder,
cellPref: cellPref,
ignoreTablets: ignoreTablets,
}, nil
}

Expand Down Expand Up @@ -433,7 +436,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
}); err == nil || err == io.EOF {
tablets = append(tablets, tabletInfo)
// if this tablet is not in the ignore list, then add it as a candidate
pbibra marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := tp.ignoreTablets[tabletInfo.GetAlias().String()]; !ok {
tablets = append(tablets, tabletInfo)
}
}
_ = conn.Close(ctx)
}
Expand Down
49 changes: 38 additions & 11 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestPickPrimary(t *testing.T) {
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
require.NoError(t, err)

ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond)
Expand Down Expand Up @@ -284,7 +284,7 @@ func TestPickLocalPreferences(t *testing.T) {
deleteTablet(t, te, tab)
}
}()
tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options)
tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options, make(map[string]*topodatapb.TabletAlias))
require.NoError(t, err)
require.Equal(t, tp.localCellInfo.localCell, tcase.localCell)
require.ElementsMatch(t, tp.cells, tcase.tpCells)
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestPickCellPreferenceLocalCell(t *testing.T) {
defer deleteTablet(t, te, want1)

// Local cell preference is default
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
require.NoError(t, err)

tablet, err := tp.PickForStreaming(ctx)
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) {

// test env puts all cells into an alias called "cella"
te := newPickerTestEnv(t, ctx, []string{"cell", "otherCell"})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
require.NoError(t, err)

// create a tablet in the other cell, it should be picked
Expand All @@ -370,7 +370,7 @@ func TestPickUsingCellAsAlias(t *testing.T) {
// added to the alias.
te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2", "cell3"}, "xtracell")
// Specify the alias as the cell.
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
require.NoError(t, err)

// Create a tablet in one of the main cells, it should be
Expand All @@ -391,6 +391,32 @@ func TestPickUsingCellAsAlias(t *testing.T) {
}
}

func TestPickWithIgnoreList(t *testing.T) {
ctx := utils.LeakCheckContext(t)

te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2"})

want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true)
defer deleteTablet(t, te, want)

noWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true)
defer deleteTablet(t, te, noWant)

ignoreTablets := make(map[string]*topodatapb.TabletAlias)
ignoreTablets[noWant.Alias.String()] = noWant.GetAlias()

// Specify the alias as the cell.
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets)
require.NoError(t, err)

// Try it many times to be sure we don't ever pick from the ignore list
for i := 0; i < 100; i++ {
tablet, err := tp.PickForStreaming(ctx)
require.NoError(t, err)
assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want)
}
}

func TestPickUsingCellAliasOnlySpecified(t *testing.T) {
ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond)

Expand All @@ -399,7 +425,7 @@ func TestPickUsingCellAliasOnlySpecified(t *testing.T) {
want1 := addTablet(ctx, te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(t, te, want1)

tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]*topodatapb.TabletAlias))
require.NoError(t, err)

tablet, err := tp.PickForStreaming(ctx)
Expand Down Expand Up @@ -442,7 +468,7 @@ func TestTabletAppearsDuringSleep(t *testing.T) {
ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond)

te := newPickerTestEnv(t, ctx, []string{"cell"})
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
require.NoError(t, err)

delay := GetTabletPickerRetryDelay()
Expand Down Expand Up @@ -472,10 +498,11 @@ func TestPickErrorLocalPreferenceDefault(t *testing.T) {
ctx := utils.LeakCheckContext(t)

te := newPickerTestEnv(t, ctx, []string{"cell"})
_, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{})
var ignoreTablets map[string]*topodatapb.TabletAlias
_, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}, ignoreTablets)
assert.EqualError(t, err, "failed to parse list of tablet types: badtype")

tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets)
require.NoError(t, err)
delay := GetTabletPickerRetryDelay()
defer func() {
Expand Down Expand Up @@ -503,7 +530,7 @@ func TestPickErrorOnlySpecified(t *testing.T) {

te := newPickerTestEnv(t, ctx, []string{"cell"})

tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"})
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]*topodatapb.TabletAlias))
require.NoError(t, err)
delay := GetTabletPickerRetryDelay()
defer func() {
Expand Down Expand Up @@ -559,7 +586,7 @@ func TestPickFallbackType(t *testing.T) {
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options, make(map[string]*topodatapb.TabletAlias))
require.NoError(t, err)
ctx2, cancel2 := context.WithTimeout(ctx, 1*time.Second)
defer cancel2()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf
if cells == nil {
cells = append(cells, shard.PrimaryAlias.Cell)
}
tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{})
tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
if err != nil {
allErrors.RecordError(err)
return
Expand Down
32 changes: 30 additions & 2 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type journalEvent struct {

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")

return &vstreamManager{
resolver: resolver,
toposerv: serv,
Expand Down Expand Up @@ -473,6 +474,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// journalDone is assigned a channel when a journal event is encountered.
// It will be closed when all journal events converge.
var journalDone chan struct{}
ignoreTablets := make(map[string]*topodatapb.TabletAlias)

errCount := 0
for {
Expand All @@ -490,7 +492,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var eventss [][]*binlogdatapb.VEvent
var err error
cells := vs.getCells()
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions)
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets)
if err != nil {
log.Errorf(err.Error())
return err
Expand Down Expand Up @@ -670,10 +672,16 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Unreachable.
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly")
}
if vterrors.Code(err) != vtrpcpb.Code_FAILED_PRECONDITION && vterrors.Code(err) != vtrpcpb.Code_UNAVAILABLE {

retry, ignoreTablet := vs.isRetriableError(err)
if !retry {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
}
if ignoreTablet {
ignoreTablets[tablet.Alias.String()] = tablet.GetAlias()
}

errCount++
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
Expand All @@ -683,6 +691,26 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
}

// isRetriable determines whether we should exit immediately or retry the vstream.
// The first return value determines if the error is retriable, the second indicates whether
// the tablet on which the error occurred should be ommitted from the candidate list of tablets
// to choose from on the retry.
pbibra marked this conversation as resolved.
Show resolved Hide resolved
func (vs *vstream) isRetriableError(err error) (bool, bool) {
pbibra marked this conversation as resolved.
Show resolved Hide resolved
errCode := vterrors.Code(err)

if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE {
return true, false
}

// If there is a GTIDSet Mismatch on the tablet or if the tablet cannot be found,
// omit it from the candidate list in the TabletPicker on retry.
if (errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch")) || errCode == vtrpcpb.Code_NOT_FOUND {
return true, true
}

return false, false
}

// sendAll sends a group of events together while holding the lock.
func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error {
vs.mu.Lock()
Expand Down
Loading
Loading