Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow tablet picker to exclude specified tablets from its candidate list #14224

Merged
merged 11 commits into from
Oct 23, 2023
23 changes: 19 additions & 4 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type TabletPicker struct {
inOrder bool
cellPref TabletPickerCellPreference
localCellInfo localCellInfo
// This map is keyed on the results of TabletAlias.String().
ignoreTablets map[string]struct{}
}

// NewTabletPicker returns a TabletPicker.
Expand All @@ -144,6 +146,7 @@ func NewTabletPicker(
cells []string,
localCell, keyspace, shard, tabletTypesStr string,
options TabletPickerOptions,
ignoreTablets ...*topodatapb.TabletAlias,
) (*TabletPicker, error) {
// Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted.
if tabletTypesStr == "" {
Expand Down Expand Up @@ -219,7 +222,7 @@ func NewTabletPicker(
}
}

return &TabletPicker{
tp := &TabletPicker{
ts: ts,
cells: dedupeCells(cells),
localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap},
Expand All @@ -228,7 +231,15 @@ func NewTabletPicker(
tabletTypes: tabletTypes,
inOrder: inOrder,
cellPref: cellPref,
}, nil
ignoreTablets: make(map[string]struct{}, len(ignoreTablets)),
}

for _, ignoreTablet := range ignoreTablets {
tp.ignoreTablets[ignoreTablet.String()] = struct{}{}
}

return tp, nil

}

// dedupeCells is used to remove duplicates in the cell list in case it is passed in
Expand Down Expand Up @@ -358,7 +369,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err)
return nil
}
aliases = append(aliases, si.PrimaryAlias)
if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore {
aliases = append(aliases, si.PrimaryAlias)
}
} else {
actualCells := make([]string, 0)
for _, cell := range tp.cells {
Expand Down Expand Up @@ -394,7 +407,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
continue
}
for _, node := range sri.Nodes {
aliases = append(aliases, node.TabletAlias)
if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore {
aliases = append(aliases, node.TabletAlias)
}
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,29 @@ func TestPickUsingCellAsAlias(t *testing.T) {
}
}

func TestPickWithIgnoreList(t *testing.T) {
ctx := utils.LeakCheckContext(t)

te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2"})

want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true)
defer deleteTablet(t, te, want)

dontWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true)
defer deleteTablet(t, te, dontWant)

// Specify the alias as the cell.
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, dontWant.GetAlias())
require.NoError(t, err)

// Try it many times to be sure we don't ever pick from the ignore list.
for i := 0; i < 100; i++ {
tablet, err := tp.PickForStreaming(ctx)
require.NoError(t, err)
require.False(t, proto.Equal(dontWant, tablet), "Picked the tablet we shouldn't have: %v", dontWant)
}
}

func TestPickUsingCellAliasOnlySpecified(t *testing.T) {
ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond)

Expand Down
54 changes: 47 additions & 7 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ import (

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
)

// vstreamManager manages vstream requests.
Expand Down Expand Up @@ -124,6 +124,7 @@ type journalEvent struct {

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")

return &vstreamManager{
resolver: resolver,
toposerv: serv,
Expand Down Expand Up @@ -473,6 +474,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// journalDone is assigned a channel when a journal event is encountered.
// It will be closed when all journal events converge.
var journalDone chan struct{}
ignoreTablets := make([]*topodatapb.TabletAlias, 0)

errCount := 0
for {
Expand All @@ -490,12 +492,19 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var eventss [][]*binlogdatapb.VEvent
var err error
cells := vs.getCells()
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions)

tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...)
if err != nil {
log.Errorf(err.Error())
return err
}
tablet, err := tp.PickForStreaming(ctx)

// Create a child context with a stricter timeout when picking a tablet.
// This will prevent hanging in the case no tablets are found
tpCtx, tpCancel := context.WithTimeout(context.Background(), 90*time.Second)
pbibra marked this conversation as resolved.
Show resolved Hide resolved
defer tpCancel()

tablet, err := tp.PickForStreaming(tpCtx)
if err != nil {
log.Errorf(err.Error())
return err
Expand Down Expand Up @@ -670,11 +679,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Unreachable.
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly")
}
if vterrors.Code(err) != vtrpcpb.Code_FAILED_PRECONDITION && vterrors.Code(err) != vtrpcpb.Code_UNAVAILABLE {

retry, ignoreTablet := vs.shouldRetry(err)
if !retry {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
}
if ignoreTablet {
ignoreTablets = append(ignoreTablets, tablet.GetAlias())
}

errCount++
// Retry, at most, 3 times if the error can be retried.
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
Expand All @@ -683,6 +699,30 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
}

// shouldRetry determines whether we should exit immediately or retry the vstream.
// The first return value determines if the error can be retried, while the second
// indicates whether the tablet with which the error occurred should be ommitted
// from the candidate list of tablets to choose from on the retry.
//
// An error should be retried if it is expected to be transient.
// A tablet should be ignored upon retry if it's likely another tablet will not
// produce the same error.
func (vs *vstream) shouldRetry(err error) (bool, bool) {
errCode := vterrors.Code(err)

if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE {
return true, false
}

// If there is a GTIDSet Mismatch on the tablet, omit it from the candidate
// list in the TabletPicker on retry.
if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") {
return true, true
}

return false, false
}

// sendAll sends a group of events together while holding the lock.
func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error {
vs.mu.Lock()
Expand Down
171 changes: 137 additions & 34 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,46 +386,133 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches")
}

func TestVStreamRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func TestVStreamRetriableErrors(t *testing.T) {
type testCase struct {
name string
code vtrpcpb.Code
msg string
shouldRetry bool
ignoreTablet bool
}

cell := "aa"
ks := "TestVStream"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
tcases := []testCase{
{
name: "failed precondition",
code: vtrpcpb.Code_FAILED_PRECONDITION,
msg: "",
shouldRetry: true,
ignoreTablet: false,
},
{
name: "gtid mismatch",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "GTIDSet Mismatch aa",
shouldRetry: true,
ignoreTablet: true,
},
{
name: "unavailable",
code: vtrpcpb.Code_UNAVAILABLE,
msg: "",
shouldRetry: true,
ignoreTablet: false,
},
{
name: "should not retry",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "final error",
shouldRetry: false,
ignoreTablet: false,
},
}

st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(ctx, hc, st, "aa")
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet())
commit := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_COMMIT},
}
sbc0.AddVStreamEvents(commit, nil)
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa"))
sbc0.AddVStreamEvents(commit, nil)
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error"))
var count atomic.Int32
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "pos",
}},
}
err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
count.Add(1)
return nil
})
wantErr := "final error"
if err == nil || !strings.Contains(err.Error(), wantErr) {
t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr)

want := &binlogdatapb.VStreamResponse{Events: commit}

for _, tcase := range tcases {
t.Run(tcase.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// aa will be the local cell for this test, but that tablet will have a vstream error.
cells := []string{"aa", "ab"}

ks := "TestVStream"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)

st := getSandboxTopoMultiCell(ctx, cells, ks, []string{"-20"})

sbc0 := hc.AddTestTablet(cells[0], "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil)
sbc1 := hc.AddTestTablet(cells[1], "1.1.1.1", 1002, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil)

addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet())
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc1.Tablet())

vsm := newTestVStreamManager(ctx, hc, st, cells[0])

// Always have the local cell tablet error so it's ignored on retry and we pick the other one
// if the error requires ignoring the tablet on retry
pbibra marked this conversation as resolved.
Show resolved Hide resolved
sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg))

if tcase.ignoreTablet {
sbc1.AddVStreamEvents(commit, nil)
} else {
sbc0.AddVStreamEvents(commit, nil)
}

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "pos",
}},
}

ch := make(chan *binlogdatapb.VStreamResponse)
done := make(chan struct{})
go func() {
err := vsm.VStream(ctx, topodatapb.TabletType_REPLICA, vgtid, nil, &vtgatepb.VStreamFlags{Cells: strings.Join(cells, ",")}, func(events []*binlogdatapb.VEvent) error {
ch <- &binlogdatapb.VStreamResponse{Events: events}
return nil
})
wantErr := "context canceled"

if !tcase.shouldRetry {
wantErr = tcase.msg
}

if err == nil || !strings.Contains(err.Error(), wantErr) {
t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr)
}
close(done)
}()

Loop:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this is needed, is it? Since there's only one for loop.

Copy link
Contributor Author

@pbibra pbibra Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The break statement breaks out of the switch instead of the loop by default, so we had to label the loop

for {
if tcase.shouldRetry {
select {
case event := <-ch:
got := event.CloneVT()
if !proto.Equal(got, want) {
t.Errorf("got different vstream event than expected")
}
cancel()
case <-done:
// The goroutine has completed, so break out of the loop
break Loop
}
} else {
<-done
break Loop
}
}
})
}
time.Sleep(100 * time.Millisecond) // wait for goroutine within VStream to finish
assert.Equal(t, int32(2), count.Load())

}

func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) {
Expand Down Expand Up @@ -1266,6 +1353,22 @@ func getSandboxTopo(ctx context.Context, cell string, keyspace string, shards []
return st
}

func getSandboxTopoMultiCell(ctx context.Context, cells []string, keyspace string, shards []string) *sandboxTopo {
st := newSandboxForCells(ctx, cells)
ts := st.topoServer

for _, cell := range cells {
ts.CreateCellInfo(ctx, cell, &topodatapb.CellInfo{})
}

ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})

for _, shard := range shards {
ts.CreateShard(ctx, keyspace, shard)
}
return st
}

func addTabletToSandboxTopo(t *testing.T, ctx context.Context, st *sandboxTopo, ks, shard string, tablet *topodatapb.Tablet) {
_, err := st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = tablet.Alias
Expand Down
Loading