diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index a24637a6edc..6bb4d0f65ca 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -137,6 +137,8 @@ type TabletPicker struct { inOrder bool cellPref TabletPickerCellPreference localCellInfo localCellInfo + // This map is keyed on the results of TabletAlias.String(). + ignoreTablets map[string]struct{} } // NewTabletPicker returns a TabletPicker. @@ -146,6 +148,7 @@ func NewTabletPicker( cells []string, localCell, keyspace, shard, tabletTypesStr string, options TabletPickerOptions, + ignoreTablets ...*topodatapb.TabletAlias, ) (*TabletPicker, error) { // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. tabletTypes, inOrder, err := ParseTabletTypesAndOrder(tabletTypesStr) @@ -218,7 +221,7 @@ func NewTabletPicker( } } - return &TabletPicker{ + tp := &TabletPicker{ ts: ts, cells: dedupeCells(cells), localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap}, @@ -227,7 +230,14 @@ func NewTabletPicker( tabletTypes: tabletTypes, inOrder: inOrder, cellPref: cellPref, - }, nil + ignoreTablets: make(map[string]struct{}, len(ignoreTablets)), + } + + for _, ignoreTablet := range ignoreTablets { + tp.ignoreTablets[ignoreTablet.String()] = struct{}{} + } + + return tp, nil } // dedupeCells is used to remove duplicates in the cell list in case it is passed in @@ -369,6 +379,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn return nil } aliases = append(aliases, si.PrimaryAlias) + if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore { + aliases = append(aliases, si.PrimaryAlias) + } } else { actualCells := make([]string, 0) for _, cell := range tp.cells { @@ -404,7 +417,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } for _, node := range sri.Nodes { - aliases = append(aliases, node.TabletAlias) + if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore { + aliases = append(aliases, node.TabletAlias) + } } } } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 88368c02a60..685fd1a8832 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -350,6 +350,28 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) { assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) } +func TestPickWithIgnoreList(t *testing.T) { + ctx := context.Background() + te := newPickerTestEnv(t, []string{"cell1", "cell2"}) + + want := addTablet(te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true) + defer deleteTablet(t, te, want) + + dontWant := addTablet(te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true) + defer deleteTablet(t, te, dontWant) + + // Specify the alias as the cell. + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, dontWant.GetAlias()) + require.NoError(t, err) + + // Try it many times to be sure we don't ever pick from the ignore list. + for i := 0; i < 100; i++ { + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + require.False(t, proto.Equal(dontWant, tablet), "Picked the tablet we shouldn't have: %v", dontWant) + } +} + func TestPickUsingCellAliasOnlySpecified(t *testing.T) { // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, []string{"cell", "otherCell"}) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index a9e0f651e6a..c19ab82b0ac 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -60,6 +60,10 @@ type vstreamManager struct { // maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set const maxSkewTimeoutSeconds = 10 * 60 +// tabletPickerContextTimeout is the timeout for the child context used to select candidate tablets +// for a vstream +const tabletPickerContextTimeout = 90 * time.Second + // vstream contains the metadata for one VStream request. type vstream struct { // mu protects parts of vgtid, the semantics of a send, and journaler. @@ -130,6 +134,7 @@ type journalEvent struct { func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager { exporter := servenv.NewExporter(cell, "VStreamManager") + return &vstreamManager{ resolver: resolver, toposerv: serv, @@ -481,6 +486,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // journalDone is assigned a channel when a journal event is encountered. // It will be closed when all journal events converge. var journalDone chan struct{} + ignoreTablets := make([]*topodatapb.TabletAlias, 0) errCount := 0 for { @@ -498,12 +504,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions) + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) if err != nil { log.Errorf(err.Error()) return err } - tablet, err := tp.PickForStreaming(ctx) + + // Create a child context with a stricter timeout when picking a tablet. + // This will prevent hanging in the case no tablets are found. + tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout) + defer tpCancel() + + tablet, err := tp.PickForStreaming(tpCtx) if err != nil { log.Errorf(err.Error()) return err @@ -678,11 +690,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Unreachable. err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") } - if vterrors.Code(err) != vtrpcpb.Code_FAILED_PRECONDITION && vterrors.Code(err) != vtrpcpb.Code_UNAVAILABLE { + retry, ignoreTablet := vs.shouldRetry(err) + if !retry { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) return err } + if ignoreTablet { + ignoreTablets = append(ignoreTablets, tablet.GetAlias()) + } + errCount++ + // Retry, at most, 3 times if the error can be retried. if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) return err @@ -691,6 +709,30 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } +// shouldRetry determines whether we should exit immediately or retry the vstream. +// The first return value determines if the error can be retried, while the second +// indicates whether the tablet with which the error occurred should be ommitted +// from the candidate list of tablets to choose from on the retry. +// +// An error should be retried if it is expected to be transient. +// A tablet should be ignored upon retry if it's likely another tablet will not +// produce the same error. +func (vs *vstream) shouldRetry(err error) (bool, bool) { + errCode := vterrors.Code(err) + + if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE { + return true, false + } + + // If there is a GTIDSet Mismatch on the tablet, omit it from the candidate + // list in the TabletPicker on retry. + if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { + return true, true + } + + return false, false +} + // sendAll sends a group of events together while holding the lock. func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock() diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 754c604c5f5..280665fd37a 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -390,47 +390,132 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches") } -func TestVStreamRetry(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func TestVStreamRetriableErrors(t *testing.T) { + type testCase struct { + name string + code vtrpcpb.Code + msg string + shouldRetry bool + ignoreTablet bool + } - cell := "aa" - ks := "TestVStream" - _ = createSandbox(ks) - hc := discovery.NewFakeHealthCheck(nil) + tcases := []testCase{ + { + name: "failed precondition", + code: vtrpcpb.Code_FAILED_PRECONDITION, + msg: "", + shouldRetry: true, + ignoreTablet: false, + }, + { + name: "gtid mismatch", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "GTIDSet Mismatch aa", + shouldRetry: true, + ignoreTablet: true, + }, + { + name: "unavailable", + code: vtrpcpb.Code_UNAVAILABLE, + msg: "", + shouldRetry: true, + ignoreTablet: false, + }, + { + name: "should not retry", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "final error", + shouldRetry: false, + ignoreTablet: false, + }, + } - st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, "aa", true) - sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) - addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) commit := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_COMMIT}, } - sbc0.AddVStreamEvents(commit, nil) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa")) - sbc0.AddVStreamEvents(commit, nil) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb")) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc")) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error")) - var count sync2.AtomicInt32 - count.Set(0) - vgtid := &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: ks, - Shard: "-20", - Gtid: "pos", - }}, - } - err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { - count.Add(1) - return nil - }) - wantErr := "final error" - if err == nil || !strings.Contains(err.Error(), wantErr) { - t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + + want := &binlogdatapb.VStreamResponse{Events: commit} + + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // aa will be the local cell for this test, but that tablet will have a vstream error. + cells := []string{"aa", "ab"} + + ks := "TestVStream" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + + st := getSandboxTopoMultiCell(ctx, cells, ks, []string{"-20"}) + + sbc0 := hc.AddTestTablet(cells[0], "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc1 := hc.AddTestTablet(cells[1], "1.1.1.1", 1002, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil) + + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) + addTabletToSandboxTopo(t, st, ks, "-20", sbc1.Tablet()) + + vsm := newTestVStreamManager(hc, st, cells[0], false) + + // Always have the local cell tablet error so it's ignored on retry and we pick the other one + // if the error requires ignoring the tablet on retry. + sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg)) + + if tcase.ignoreTablet { + sbc1.AddVStreamEvents(commit, nil) + } else { + sbc0.AddVStreamEvents(commit, nil) + } + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }}, + } + + ch := make(chan *binlogdatapb.VStreamResponse) + done := make(chan struct{}) + go func() { + err := vsm.VStream(ctx, topodatapb.TabletType_REPLICA, vgtid, nil, &vtgatepb.VStreamFlags{Cells: strings.Join(cells, ",")}, func(events []*binlogdatapb.VEvent) error { + ch <- &binlogdatapb.VStreamResponse{Events: events} + return nil + }) + wantErr := "context canceled" + + if !tcase.shouldRetry { + wantErr = tcase.msg + } + + if err == nil || !strings.Contains(err.Error(), wantErr) { + t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + } + close(done) + }() + + Loop: + for { + if tcase.shouldRetry { + select { + case event := <-ch: + got := proto.Clone(event).(*binlogdatapb.VStreamResponse) + if !proto.Equal(got, want) { + t.Errorf("got different vstream event than expected") + } + cancel() + case <-done: + // The goroutine has completed, so break out of the loop + break Loop + } + } else { + <-done + break Loop + } + } + }) } - time.Sleep(100 * time.Millisecond) // wait for goroutine within VStream to finish - assert.Equal(t, int32(2), count.Get()) } func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { @@ -1315,6 +1400,22 @@ func getSandboxTopo(ctx context.Context, cell string, keyspace string, shards [] return st } +func getSandboxTopoMultiCell(ctx context.Context, cells []string, keyspace string, shards []string) *sandboxTopo { + st := newSandboxForCells(cells) + ts := st.topoServer + + for _, cell := range cells { + ts.CreateCellInfo(ctx, cell, &topodatapb.CellInfo{}) + } + + ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}) + + for _, shard := range shards { + ts.CreateShard(ctx, keyspace, shard) + } + return st +} + func addTabletToSandboxTopo(t *testing.T, st *sandboxTopo, ks, shard string, tablet *topodatapb.Tablet) { _, err := st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { si.PrimaryAlias = tablet.Alias