From c4e43114e0f4f717f1f2a1062106ba00bdd13305 Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Tue, 10 Oct 2023 10:52:37 -0700 Subject: [PATCH 01/11] allow tablet picker to exclude specified tablets from its candidate list Signed-off-by: Priya Bibra --- go/vt/discovery/tablet_picker.go | 8 ++++++- go/vt/discovery/tablet_picker_test.go | 23 ++++++++++--------- go/vt/vtgate/vstream_manager.go | 20 ++++++++++++++-- .../tabletmanager/vdiff/table_differ.go | 2 +- .../tabletmanager/vreplication/controller.go | 2 +- go/vt/wrangler/traffic_switcher.go | 2 +- go/vt/wrangler/vdiff.go | 2 +- tools/rowlog/rowlog.go | 1 + 8 files changed, 42 insertions(+), 18 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 99d95848d19..272d765a69e 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -135,6 +135,7 @@ type TabletPicker struct { inOrder bool cellPref TabletPickerCellPreference localCellInfo localCellInfo + ignoreTablets map[string]topodatapb.TabletAlias } // NewTabletPicker returns a TabletPicker. @@ -144,6 +145,7 @@ func NewTabletPicker( cells []string, localCell, keyspace, shard, tabletTypesStr string, options TabletPickerOptions, + ignoreTablets map[string]topodatapb.TabletAlias, ) (*TabletPicker, error) { // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. if tabletTypesStr == "" { @@ -228,6 +230,7 @@ func NewTabletPicker( tabletTypes: tabletTypes, inOrder: inOrder, cellPref: cellPref, + ignoreTablets: ignoreTablets, }, nil } @@ -433,7 +436,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") }); err == nil || err == io.EOF { - tablets = append(tablets, tabletInfo) + // if this tablet is not in the ignore list, then add it as a candidate + if _, ok := tp.ignoreTablets[tabletInfo.Alias.String()]; !ok { + tablets = append(tablets, tabletInfo) + } } _ = conn.Close(ctx) } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 2999c251e93..b633e8cc2ca 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -47,7 +47,7 @@ func TestPickPrimary(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond) @@ -284,7 +284,7 @@ func TestPickLocalPreferences(t *testing.T) { deleteTablet(t, te, tab) } }() - tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options) + tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) require.Equal(t, tp.localCellInfo.localCell, tcase.localCell) require.ElementsMatch(t, tp.cells, tcase.tpCells) @@ -313,7 +313,7 @@ func TestPickCellPreferenceLocalCell(t *testing.T) { defer deleteTablet(t, te, want1) // Local cell preference is default - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) tablet, err := tp.PickForStreaming(ctx) @@ -348,7 +348,7 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) { // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, ctx, []string{"cell", "otherCell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) // create a tablet in the other cell, it should be picked @@ -370,7 +370,7 @@ func TestPickUsingCellAsAlias(t *testing.T) { // added to the alias. te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2", "cell3"}, "xtracell") // Specify the alias as the cell. - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) // Create a tablet in one of the main cells, it should be @@ -399,7 +399,7 @@ func TestPickUsingCellAliasOnlySpecified(t *testing.T) { want1 := addTablet(ctx, te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(t, te, want1) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) tablet, err := tp.PickForStreaming(ctx) @@ -442,7 +442,7 @@ func TestTabletAppearsDuringSleep(t *testing.T) { ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond) te := newPickerTestEnv(t, ctx, []string{"cell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) delay := GetTabletPickerRetryDelay() @@ -472,10 +472,11 @@ func TestPickErrorLocalPreferenceDefault(t *testing.T) { ctx := utils.LeakCheckContext(t) te := newPickerTestEnv(t, ctx, []string{"cell"}) - _, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}) + var ignoreTablets map[string]topodatapb.TabletAlias + _, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}, ignoreTablets) assert.EqualError(t, err, "failed to parse list of tablet types: badtype") - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets) require.NoError(t, err) delay := GetTabletPickerRetryDelay() defer func() { @@ -503,7 +504,7 @@ func TestPickErrorOnlySpecified(t *testing.T) { te := newPickerTestEnv(t, ctx, []string{"cell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) delay := GetTabletPickerRetryDelay() defer func() { @@ -559,7 +560,7 @@ func TestPickFallbackType(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) + tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(ctx, 1*time.Second) defer cancel2() diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 38706a8fbee..bf9401b5a83 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -473,6 +473,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // journalDone is assigned a channel when a journal event is encountered. // It will be closed when all journal events converge. var journalDone chan struct{} + var ignoreTablets map[string]topodatapb.TabletAlias errCount := 0 for { @@ -490,7 +491,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions) + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets) if err != nil { log.Errorf(err.Error()) return err @@ -670,10 +671,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Unreachable. err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") } - if vterrors.Code(err) != vtrpcpb.Code_FAILED_PRECONDITION && vterrors.Code(err) != vtrpcpb.Code_UNAVAILABLE { + if !vs.isRetriableError(err) { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) return err } + ignoreTablets[tablet.Alias.String()] = *tablet.GetAlias() errCount++ if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) @@ -683,6 +685,20 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } +func (vs *vstream) isRetriableError(err error) bool { + errCode := vterrors.Code(err) + + if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE || errCode == vtrpcpb.Code_NOT_FOUND { + return true + } + + if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.HasPrefix(err.Error(), "GTIDSet Mismatch") { + return true + } + + return false +} + // sendAll sends a group of events together while holding the lock. func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock() diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index e65a0bad253..71ef5e29d87 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -255,7 +255,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { } func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { - tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}, make(map[string]topodata.TabletAlias)) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 94e4741eeee..871e3d69c64 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -129,7 +129,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor return nil, err } } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) if err != nil { return nil, err } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 654a5bd1588..90ef7116035 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -453,7 +453,7 @@ func (wr *Wrangler) areTabletsAvailableToStreamFrom(ctx context.Context, ts *tra if cells == nil { cells = append(cells, shard.PrimaryAlias.Cell) } - tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) if err != nil { allErrors.RecordError(err) return diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 85c82bb3574..9a146e629d0 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -810,7 +810,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { if ts.ExternalTopo() != nil { sourceTopo = ts.ExternalTopo() } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) if err != nil { return err } diff --git a/tools/rowlog/rowlog.go b/tools/rowlog/rowlog.go index 475006b2b59..92045fcd245 100644 --- a/tools/rowlog/rowlog.go +++ b/tools/rowlog/rowlog.go @@ -389,6 +389,7 @@ func getTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace st discovery.TabletPickerOptions{ CellPreference: "OnlySpecified", }, + make(map[string]topodatapb.TabletAlias), ) if err != nil { return "" From 2ec74d2e0a279052c058d4763d50462cdaa85499 Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Wed, 11 Oct 2023 11:05:16 -0700 Subject: [PATCH 02/11] fix more tabletpicker invokations Signed-off-by: Priya Bibra --- go/vt/vtctl/workflow/utils.go | 2 +- go/vt/wrangler/vdiff.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 1a723c6192c..7bd25e2b703 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -626,7 +626,7 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf if cells == nil { cells = append(cells, shard.PrimaryAlias.Cell) } - tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) if err != nil { allErrors.RecordError(err) return diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 9a146e629d0..356ac329553 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -828,7 +828,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { go func() { defer wg.Done() err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) if err != nil { return err } From 652d8e19f12cecb44603f144f1d32fa726e723ec Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Thu, 12 Oct 2023 17:42:29 -0700 Subject: [PATCH 03/11] add tests Signed-off-by: Priya Bibra --- go/vt/discovery/tablet_picker.go | 6 +- go/vt/discovery/tablet_picker_test.go | 46 +++-- go/vt/vtctl/workflow/utils.go | 2 +- go/vt/vtgate/vstream_manager.go | 7 +- go/vt/vtgate/vstream_manager_test.go | 165 ++++++++++++++---- .../tabletmanager/vdiff/table_differ.go | 2 +- .../tabletmanager/vreplication/controller.go | 2 +- go/vt/wrangler/traffic_switcher.go | 2 +- go/vt/wrangler/vdiff.go | 4 +- tools/rowlog/rowlog.go | 2 +- 10 files changed, 181 insertions(+), 57 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 272d765a69e..6eaebdcac1b 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -135,7 +135,7 @@ type TabletPicker struct { inOrder bool cellPref TabletPickerCellPreference localCellInfo localCellInfo - ignoreTablets map[string]topodatapb.TabletAlias + ignoreTablets map[string]*topodatapb.TabletAlias } // NewTabletPicker returns a TabletPicker. @@ -145,7 +145,7 @@ func NewTabletPicker( cells []string, localCell, keyspace, shard, tabletTypesStr string, options TabletPickerOptions, - ignoreTablets map[string]topodatapb.TabletAlias, + ignoreTablets map[string]*topodatapb.TabletAlias, ) (*TabletPicker, error) { // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. if tabletTypesStr == "" { @@ -437,7 +437,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") }); err == nil || err == io.EOF { // if this tablet is not in the ignore list, then add it as a candidate - if _, ok := tp.ignoreTablets[tabletInfo.Alias.String()]; !ok { + if _, ok := tp.ignoreTablets[tabletInfo.GetAlias().String()]; !ok { tablets = append(tablets, tabletInfo) } } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index b633e8cc2ca..af59a3f927e 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -47,7 +47,7 @@ func TestPickPrimary(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond) @@ -284,7 +284,7 @@ func TestPickLocalPreferences(t *testing.T) { deleteTablet(t, te, tab) } }() - tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options, make(map[string]topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options, make(map[string]*topodatapb.TabletAlias)) require.NoError(t, err) require.Equal(t, tp.localCellInfo.localCell, tcase.localCell) require.ElementsMatch(t, tp.cells, tcase.tpCells) @@ -313,7 +313,7 @@ func TestPickCellPreferenceLocalCell(t *testing.T) { defer deleteTablet(t, te, want1) // Local cell preference is default - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) require.NoError(t, err) tablet, err := tp.PickForStreaming(ctx) @@ -348,7 +348,7 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) { // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, ctx, []string{"cell", "otherCell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) require.NoError(t, err) // create a tablet in the other cell, it should be picked @@ -370,7 +370,7 @@ func TestPickUsingCellAsAlias(t *testing.T) { // added to the alias. te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2", "cell3"}, "xtracell") // Specify the alias as the cell. - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) require.NoError(t, err) // Create a tablet in one of the main cells, it should be @@ -391,6 +391,32 @@ func TestPickUsingCellAsAlias(t *testing.T) { } } +func TestPickWithIgnoreList(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2"}) + + want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true) + defer deleteTablet(t, te, want) + + noWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true) + defer deleteTablet(t, te, noWant) + + var ignoreTablets map[string]*topodatapb.TabletAlias + ignoreTablets[noWant.Alias.String()] = noWant.GetAlias() + + // Specify the alias as the cell. + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets) + require.NoError(t, err) + + // Try it many times to be sure we don't ever pick from the ignore list + for i := 0; i < 100; i++ { + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) + } +} + func TestPickUsingCellAliasOnlySpecified(t *testing.T) { ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond) @@ -399,7 +425,7 @@ func TestPickUsingCellAliasOnlySpecified(t *testing.T) { want1 := addTablet(ctx, te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(t, te, want1) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]*topodatapb.TabletAlias)) require.NoError(t, err) tablet, err := tp.PickForStreaming(ctx) @@ -442,7 +468,7 @@ func TestTabletAppearsDuringSleep(t *testing.T) { ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond) te := newPickerTestEnv(t, ctx, []string{"cell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) require.NoError(t, err) delay := GetTabletPickerRetryDelay() @@ -472,7 +498,7 @@ func TestPickErrorLocalPreferenceDefault(t *testing.T) { ctx := utils.LeakCheckContext(t) te := newPickerTestEnv(t, ctx, []string{"cell"}) - var ignoreTablets map[string]topodatapb.TabletAlias + var ignoreTablets map[string]*topodatapb.TabletAlias _, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}, ignoreTablets) assert.EqualError(t, err, "failed to parse list of tablet types: badtype") @@ -504,7 +530,7 @@ func TestPickErrorOnlySpecified(t *testing.T) { te := newPickerTestEnv(t, ctx, []string{"cell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]*topodatapb.TabletAlias)) require.NoError(t, err) delay := GetTabletPickerRetryDelay() defer func() { @@ -560,7 +586,7 @@ func TestPickFallbackType(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options, make(map[string]topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options, make(map[string]*topodatapb.TabletAlias)) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(ctx, 1*time.Second) defer cancel2() diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 7bd25e2b703..1c5c07d90fb 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -626,7 +626,7 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf if cells == nil { cells = append(cells, shard.PrimaryAlias.Cell) } - tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) if err != nil { allErrors.RecordError(err) return diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index bf9401b5a83..33f40b330b7 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -124,6 +124,7 @@ type journalEvent struct { func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager { exporter := servenv.NewExporter(cell, "VStreamManager") + return &vstreamManager{ resolver: resolver, toposerv: serv, @@ -473,7 +474,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // journalDone is assigned a channel when a journal event is encountered. // It will be closed when all journal events converge. var journalDone chan struct{} - var ignoreTablets map[string]topodatapb.TabletAlias + ignoreTablets := make(map[string]*topodatapb.TabletAlias) errCount := 0 for { @@ -675,7 +676,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) return err } - ignoreTablets[tablet.Alias.String()] = *tablet.GetAlias() + ignoreTablets[tablet.Alias.String()] = tablet.GetAlias() errCount++ if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) @@ -692,7 +693,7 @@ func (vs *vstream) isRetriableError(err error) bool { return true } - if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.HasPrefix(err.Error(), "GTIDSet Mismatch") { + if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { return true } diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 3018791964f..5d6b1434340 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -386,46 +386,127 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches") } -func TestVStreamRetry(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func TestVStreamRetriableErrors(t *testing.T) { + type testCase struct { + name string + code vtrpcpb.Code + msg string + shouldRetry bool + } - cell := "aa" - ks := "TestVStream" - _ = createSandbox(ks) - hc := discovery.NewFakeHealthCheck(nil) + tcases := []testCase{ + { + name: "failed precondition", + code: vtrpcpb.Code_FAILED_PRECONDITION, + msg: "", + shouldRetry: true, + }, + { + name: "gtid mismatch", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "GTIDSet Mismatch aa", + shouldRetry: true, + }, + { + name: "not found", + code: vtrpcpb.Code_NOT_FOUND, + msg: "", + shouldRetry: true, + }, + { + name: "unavailable", + code: vtrpcpb.Code_UNAVAILABLE, + msg: "", + shouldRetry: true, + }, + { + name: "should not retry", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "final error", + shouldRetry: false, + }, + } - st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(ctx, hc, st, "aa") - sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) - addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) commit := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_COMMIT}, } - sbc0.AddVStreamEvents(commit, nil) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa")) - sbc0.AddVStreamEvents(commit, nil) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb")) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc")) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error")) - var count atomic.Int32 - vgtid := &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: ks, - Shard: "-20", - Gtid: "pos", - }}, - } - err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { - count.Add(1) - return nil - }) - wantErr := "final error" - if err == nil || !strings.Contains(err.Error(), wantErr) { - t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + + want := &binlogdatapb.VStreamResponse{Events: commit} + + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // aa will be the local cell for this test, but that tablet will have a vstream error. + cells := []string{"aa", "ab"} + + ks := "TestVStream" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + + st := getSandboxTopoMultiCell(ctx, cells, ks, []string{"-20"}) + + sbc0 := hc.AddTestTablet(cells[0], "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc1 := hc.AddTestTablet(cells[1], "1.1.1.1", 1002, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil) + + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc1.Tablet()) + + vsm := newTestVStreamManager(ctx, hc, st, cells[0]) + // always have the local cell tablet error so it's ignored on retry and we pick the other one + sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg)) + sbc1.AddVStreamEvents(commit, nil) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }}, + } + + ch := make(chan *binlogdatapb.VStreamResponse) + done := make(chan struct{}) + go func() { + err := vsm.VStream(ctx, topodatapb.TabletType_REPLICA, vgtid, nil, &vtgatepb.VStreamFlags{Cells: strings.Join(cells, ",")}, func(events []*binlogdatapb.VEvent) error { + ch <- &binlogdatapb.VStreamResponse{Events: events} + return nil + }) + wantErr := "context canceled" + + if !tcase.shouldRetry { + wantErr = tcase.msg + } + + if err == nil || !strings.Contains(err.Error(), wantErr) { + t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + } + close(done) + }() + + Loop: + for { + if tcase.shouldRetry { + select { + case event := <-ch: + got := event.CloneVT() + if !proto.Equal(got, want) { + t.Errorf("got different vstream event than expected") + } + cancel() + case <-done: + // The goroutine has completed, so break out of the loop + break Loop + } + } else { + <-done + break Loop + } + } + }) } - time.Sleep(100 * time.Millisecond) // wait for goroutine within VStream to finish - assert.Equal(t, int32(2), count.Load()) + } func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { @@ -1266,6 +1347,22 @@ func getSandboxTopo(ctx context.Context, cell string, keyspace string, shards [] return st } +func getSandboxTopoMultiCell(ctx context.Context, cells []string, keyspace string, shards []string) *sandboxTopo { + st := newSandboxForCells(ctx, cells) + ts := st.topoServer + + for _, cell := range cells { + ts.CreateCellInfo(ctx, cell, &topodatapb.CellInfo{}) + } + + ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}) + + for _, shard := range shards { + ts.CreateShard(ctx, keyspace, shard) + } + return st +} + func addTabletToSandboxTopo(t *testing.T, ctx context.Context, st *sandboxTopo, ks, shard string, tablet *topodatapb.Tablet) { _, err := st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { si.PrimaryAlias = tablet.Alias diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 71ef5e29d87..e2cad685e2c 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -255,7 +255,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { } func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { - tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}, make(map[string]topodata.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}, make(map[string]*topodata.TabletAlias)) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 871e3d69c64..7c72243ab2b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -129,7 +129,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor return nil, err } } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) if err != nil { return nil, err } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 90ef7116035..a2ffa1a1675 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -453,7 +453,7 @@ func (wr *Wrangler) areTabletsAvailableToStreamFrom(ctx context.Context, ts *tra if cells == nil { cells = append(cells, shard.PrimaryAlias.Cell) } - tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) if err != nil { allErrors.RecordError(err) return diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 356ac329553..c18e51cb93e 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -810,7 +810,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { if ts.ExternalTopo() != nil { sourceTopo = ts.ExternalTopo() } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) if err != nil { return err } @@ -828,7 +828,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { go func() { defer wg.Done() err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) if err != nil { return err } diff --git a/tools/rowlog/rowlog.go b/tools/rowlog/rowlog.go index 92045fcd245..7e11ba0886e 100644 --- a/tools/rowlog/rowlog.go +++ b/tools/rowlog/rowlog.go @@ -389,7 +389,7 @@ func getTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace st discovery.TabletPickerOptions{ CellPreference: "OnlySpecified", }, - make(map[string]topodatapb.TabletAlias), + make(map[string]*topodatapb.TabletAlias), ) if err != nil { return "" From ce8797a9a560d4ceacd371656339a98b58d315db Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Thu, 12 Oct 2023 17:55:21 -0700 Subject: [PATCH 04/11] fix nul ref Signed-off-by: Priya Bibra --- go/vt/discovery/tablet_picker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index af59a3f927e..0e8d50edee9 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -402,7 +402,7 @@ func TestPickWithIgnoreList(t *testing.T) { noWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true) defer deleteTablet(t, te, noWant) - var ignoreTablets map[string]*topodatapb.TabletAlias + ignoreTablets := make(map[string]*topodatapb.TabletAlias) ignoreTablets[noWant.Alias.String()] = noWant.GetAlias() // Specify the alias as the cell. From a843d9da64b9162ca82e9d3e2a9848da938e5a27 Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Thu, 12 Oct 2023 22:08:26 -0700 Subject: [PATCH 05/11] ignore tablet on certain errors only Signed-off-by: Priya Bibra --- go/vt/vtgate/vstream_manager.go | 21 ++++++---- go/vt/vtgate/vstream_manager_test.go | 63 +++++++++++++++++----------- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 33f40b330b7..53f5fcead8d 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -672,11 +672,16 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Unreachable. err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") } - if !vs.isRetriableError(err) { + + retry, ignoreTablet := vs.isRetriableError(err) + if !retry { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) return err } - ignoreTablets[tablet.Alias.String()] = tablet.GetAlias() + if ignoreTablet { + ignoreTablets[tablet.Alias.String()] = tablet.GetAlias() + } + errCount++ if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) @@ -686,18 +691,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } -func (vs *vstream) isRetriableError(err error) bool { +func (vs *vstream) isRetriableError(err error) (bool, bool) { errCode := vterrors.Code(err) - if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE || errCode == vtrpcpb.Code_NOT_FOUND { - return true + if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE { + return true, false } - if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { - return true + if (errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch")) || errCode == vtrpcpb.Code_NOT_FOUND { + return true, true } - return false + return false, false } // sendAll sends a group of events together while holding the lock. diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 5d6b1434340..a26e9fc4db5 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -388,42 +388,48 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { func TestVStreamRetriableErrors(t *testing.T) { type testCase struct { - name string - code vtrpcpb.Code - msg string - shouldRetry bool + name string + code vtrpcpb.Code + msg string + shouldRetry bool + ignoreTablet bool } tcases := []testCase{ { - name: "failed precondition", - code: vtrpcpb.Code_FAILED_PRECONDITION, - msg: "", - shouldRetry: true, + name: "failed precondition", + code: vtrpcpb.Code_FAILED_PRECONDITION, + msg: "", + shouldRetry: true, + ignoreTablet: false, }, { - name: "gtid mismatch", - code: vtrpcpb.Code_INVALID_ARGUMENT, - msg: "GTIDSet Mismatch aa", - shouldRetry: true, + name: "gtid mismatch", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "GTIDSet Mismatch aa", + shouldRetry: true, + ignoreTablet: true, }, { - name: "not found", - code: vtrpcpb.Code_NOT_FOUND, - msg: "", - shouldRetry: true, + name: "not found", + code: vtrpcpb.Code_NOT_FOUND, + msg: "", + shouldRetry: true, + ignoreTablet: true, }, { - name: "unavailable", - code: vtrpcpb.Code_UNAVAILABLE, - msg: "", - shouldRetry: true, + name: "unavailable", + code: vtrpcpb.Code_UNAVAILABLE, + msg: "", + shouldRetry: true, + ignoreTablet: false, }, { - name: "should not retry", - code: vtrpcpb.Code_INVALID_ARGUMENT, - msg: "final error", - shouldRetry: false, + name: "should not retry", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "final error", + shouldRetry: false, + ignoreTablet: false, }, } @@ -454,9 +460,16 @@ func TestVStreamRetriableErrors(t *testing.T) { addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc1.Tablet()) vsm := newTestVStreamManager(ctx, hc, st, cells[0]) + // always have the local cell tablet error so it's ignored on retry and we pick the other one + // if the error requires ignoring the tablet on retry sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg)) - sbc1.AddVStreamEvents(commit, nil) + + if tcase.ignoreTablet { + sbc1.AddVStreamEvents(commit, nil) + } else { + sbc0.AddVStreamEvents(commit, nil) + } vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ From a1f1af9ca08a7dae3eb1a4d3dc786c9ca98e3139 Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Fri, 13 Oct 2023 10:20:29 -0700 Subject: [PATCH 06/11] add comments Signed-off-by: Priya Bibra --- go/vt/vtgate/vstream_manager.go | 6 ++++++ go/vt/vtgate/vstream_manager_test.go | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 53f5fcead8d..c8694606ec2 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -691,6 +691,10 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } +// isRetriable determines whether we should exit immediately or retry the vstream. +// The first return value determines if the error is retriable, the second indicates whether +// the tablet on which the error occurred should be ommitted from the candidate list of tablets +// to choose from on the retry. func (vs *vstream) isRetriableError(err error) (bool, bool) { errCode := vterrors.Code(err) @@ -698,6 +702,8 @@ func (vs *vstream) isRetriableError(err error) (bool, bool) { return true, false } + // If there is a GTIDSet Mismatch on the tablet or if the tablet cannot be found, + // omit it from the candidate list in the TabletPicker on retry. if (errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch")) || errCode == vtrpcpb.Code_NOT_FOUND { return true, true } diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index a26e9fc4db5..74c975e58a6 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -461,7 +461,7 @@ func TestVStreamRetriableErrors(t *testing.T) { vsm := newTestVStreamManager(ctx, hc, st, cells[0]) - // always have the local cell tablet error so it's ignored on retry and we pick the other one + // Always have the local cell tablet error so it's ignored on retry and we pick the other one // if the error requires ignoring the tablet on retry sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg)) From c052bc273280439e8ceb0ab39325f0b38920de9c Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Mon, 16 Oct 2023 16:13:41 -0700 Subject: [PATCH 07/11] change ignore list to variadic arg, create new child context with shorter timeout Signed-off-by: Priya Bibra --- go/vt/discovery/tablet_picker.go | 12 ++++--- go/vt/discovery/tablet_picker_test.go | 33 +++++++++---------- go/vt/vtctl/workflow/utils.go | 2 +- go/vt/vtgate/vstream_manager.go | 30 +++++++++++------ go/vt/vtgate/vstream_manager_test.go | 7 ---- .../tabletmanager/vdiff/table_differ.go | 2 +- .../tabletmanager/vreplication/controller.go | 2 +- go/vt/wrangler/traffic_switcher.go | 2 +- go/vt/wrangler/vdiff.go | 4 +-- tools/rowlog/rowlog.go | 1 - 10 files changed, 48 insertions(+), 47 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 6eaebdcac1b..b95e08120f2 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "math/rand" + "slices" "sort" "strings" "sync" @@ -135,7 +136,7 @@ type TabletPicker struct { inOrder bool cellPref TabletPickerCellPreference localCellInfo localCellInfo - ignoreTablets map[string]*topodatapb.TabletAlias + ignoreTablets []string } // NewTabletPicker returns a TabletPicker. @@ -145,7 +146,7 @@ func NewTabletPicker( cells []string, localCell, keyspace, shard, tabletTypesStr string, options TabletPickerOptions, - ignoreTablets map[string]*topodatapb.TabletAlias, + ignoreTablets ...string, ) (*TabletPicker, error) { // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. if tabletTypesStr == "" { @@ -419,7 +420,8 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn tablets := make([]*topo.TabletInfo, 0, len(aliases)) for _, tabletAlias := range aliases { - tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)] + tabletAliasString := topoproto.TabletAliasString(tabletAlias) + tabletInfo, ok := tabletMap[tabletAliasString] if !ok { // Either tablet disappeared on us, or we got a partial result // (GetTabletMap ignores topo.ErrNoNode); just log a warning. @@ -436,8 +438,8 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") }); err == nil || err == io.EOF { - // if this tablet is not in the ignore list, then add it as a candidate - if _, ok := tp.ignoreTablets[tabletInfo.GetAlias().String()]; !ok { + // If this tablet is not in the ignore list, then add it as a candidate. + if !slices.Contains(tp.ignoreTablets, tabletAliasString) { tablets = append(tablets, tabletInfo) } } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 0e8d50edee9..9191d161626 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -26,6 +26,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/topo/topoproto" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -47,7 +48,7 @@ func TestPickPrimary(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond) @@ -284,7 +285,7 @@ func TestPickLocalPreferences(t *testing.T) { deleteTablet(t, te, tab) } }() - tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options) require.NoError(t, err) require.Equal(t, tp.localCellInfo.localCell, tcase.localCell) require.ElementsMatch(t, tp.cells, tcase.tpCells) @@ -313,7 +314,7 @@ func TestPickCellPreferenceLocalCell(t *testing.T) { defer deleteTablet(t, te, want1) // Local cell preference is default - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) tablet, err := tp.PickForStreaming(ctx) @@ -348,7 +349,7 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) { // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, ctx, []string{"cell", "otherCell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) // create a tablet in the other cell, it should be picked @@ -370,7 +371,7 @@ func TestPickUsingCellAsAlias(t *testing.T) { // added to the alias. te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2", "cell3"}, "xtracell") // Specify the alias as the cell. - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) // Create a tablet in one of the main cells, it should be @@ -399,14 +400,11 @@ func TestPickWithIgnoreList(t *testing.T) { want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true) defer deleteTablet(t, te, want) - noWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true) - defer deleteTablet(t, te, noWant) - - ignoreTablets := make(map[string]*topodatapb.TabletAlias) - ignoreTablets[noWant.Alias.String()] = noWant.GetAlias() + dontWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true) + defer deleteTablet(t, te, dontWant) // Specify the alias as the cell. - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, topoproto.TabletAliasString(dontWant.GetAlias())) require.NoError(t, err) // Try it many times to be sure we don't ever pick from the ignore list @@ -425,7 +423,7 @@ func TestPickUsingCellAliasOnlySpecified(t *testing.T) { want1 := addTablet(ctx, te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(t, te, want1) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) require.NoError(t, err) tablet, err := tp.PickForStreaming(ctx) @@ -468,7 +466,7 @@ func TestTabletAppearsDuringSleep(t *testing.T) { ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond) te := newPickerTestEnv(t, ctx, []string{"cell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) delay := GetTabletPickerRetryDelay() @@ -498,11 +496,10 @@ func TestPickErrorLocalPreferenceDefault(t *testing.T) { ctx := utils.LeakCheckContext(t) te := newPickerTestEnv(t, ctx, []string{"cell"}) - var ignoreTablets map[string]*topodatapb.TabletAlias - _, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}, ignoreTablets) + _, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}) assert.EqualError(t, err, "failed to parse list of tablet types: badtype") - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) delay := GetTabletPickerRetryDelay() defer func() { @@ -530,7 +527,7 @@ func TestPickErrorOnlySpecified(t *testing.T) { te := newPickerTestEnv(t, ctx, []string{"cell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) require.NoError(t, err) delay := GetTabletPickerRetryDelay() defer func() { @@ -586,7 +583,7 @@ func TestPickFallbackType(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(ctx, 1*time.Second) defer cancel2() diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 1c5c07d90fb..1a723c6192c 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -626,7 +626,7 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf if cells == nil { cells = append(cells, shard.PrimaryAlias.Cell) } - tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { allErrors.RecordError(err) return diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index c8694606ec2..734dfaf6f2e 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -30,6 +30,7 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -474,7 +475,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // journalDone is assigned a channel when a journal event is encountered. // It will be closed when all journal events converge. var journalDone chan struct{} - ignoreTablets := make(map[string]*topodatapb.TabletAlias) + ignoreTablets := []string{} errCount := 0 for { @@ -492,12 +493,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets) + + // Create a child context with a stricter timeout. + tpCtx, tpCancel := context.WithTimeout(context.Background(), 60*time.Second) + defer tpCancel() + + tp, err := discovery.NewTabletPicker(tpCtx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) if err != nil { log.Errorf(err.Error()) return err } - tablet, err := tp.PickForStreaming(ctx) + tablet, err := tp.PickForStreaming(tpCtx) if err != nil { log.Errorf(err.Error()) return err @@ -673,16 +679,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") } - retry, ignoreTablet := vs.isRetriableError(err) + retry, ignoreTablet := vs.shouldRetry(err) if !retry { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) return err } if ignoreTablet { - ignoreTablets[tablet.Alias.String()] = tablet.GetAlias() + ignoreTablets = append(ignoreTablets, topoproto.TabletAliasString(tablet.GetAlias())) } errCount++ + // Retry, at most, 3 times if the error can be retried. if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) return err @@ -691,20 +698,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } -// isRetriable determines whether we should exit immediately or retry the vstream. -// The first return value determines if the error is retriable, the second indicates whether +// shouldRetry determines whether we should exit immediately or retry the vstream. +// The first return value determines if the error can be retried, the second indicates whether // the tablet on which the error occurred should be ommitted from the candidate list of tablets // to choose from on the retry. -func (vs *vstream) isRetriableError(err error) (bool, bool) { +// +// An error should be retried if it is expected to be transient. +// A tablet should be ignored upon retry if it's likely another tablet will succeed without the same error. +func (vs *vstream) shouldRetry(err error) (bool, bool) { errCode := vterrors.Code(err) if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE { return true, false } - // If there is a GTIDSet Mismatch on the tablet or if the tablet cannot be found, + // If there is a GTIDSet Mismatch on the tablet, // omit it from the candidate list in the TabletPicker on retry. - if (errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch")) || errCode == vtrpcpb.Code_NOT_FOUND { + if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { return true, true } diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 74c975e58a6..55575be1db4 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -410,13 +410,6 @@ func TestVStreamRetriableErrors(t *testing.T) { shouldRetry: true, ignoreTablet: true, }, - { - name: "not found", - code: vtrpcpb.Code_NOT_FOUND, - msg: "", - shouldRetry: true, - ignoreTablet: true, - }, { name: "unavailable", code: vtrpcpb.Code_UNAVAILABLE, diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index e2cad685e2c..e65a0bad253 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -255,7 +255,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { } func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { - tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}, make(map[string]*topodata.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 7c72243ab2b..94e4741eeee 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -129,7 +129,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor return nil, err } } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return nil, err } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index a2ffa1a1675..654a5bd1588 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -453,7 +453,7 @@ func (wr *Wrangler) areTabletsAvailableToStreamFrom(ctx context.Context, ts *tra if cells == nil { cells = append(cells, shard.PrimaryAlias.Cell) } - tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}) if err != nil { allErrors.RecordError(err) return diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index c18e51cb93e..85c82bb3574 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -810,7 +810,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { if ts.ExternalTopo() != nil { sourceTopo = ts.ExternalTopo() } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return err } @@ -828,7 +828,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { go func() { defer wg.Done() err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return err } diff --git a/tools/rowlog/rowlog.go b/tools/rowlog/rowlog.go index 7e11ba0886e..475006b2b59 100644 --- a/tools/rowlog/rowlog.go +++ b/tools/rowlog/rowlog.go @@ -389,7 +389,6 @@ func getTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace st discovery.TabletPickerOptions{ CellPreference: "OnlySpecified", }, - make(map[string]*topodatapb.TabletAlias), ) if err != nil { return "" From eaad958ed191045859c39f2cd4365d01d813640f Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Tue, 17 Oct 2023 10:10:50 -0700 Subject: [PATCH 08/11] make list of tablet alias instead of string Signed-off-by: Priya Bibra --- go/vt/discovery/tablet_picker.go | 25 +++++++++++------- go/vt/discovery/tablet_picker_test.go | 7 +++-- go/vt/vtgate/vstream_manager.go | 38 ++++++++++++++------------- 3 files changed, 38 insertions(+), 32 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index b95e08120f2..e3d04b3de9f 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -21,7 +21,6 @@ import ( "fmt" "io" "math/rand" - "slices" "sort" "strings" "sync" @@ -136,7 +135,8 @@ type TabletPicker struct { inOrder bool cellPref TabletPickerCellPreference localCellInfo localCellInfo - ignoreTablets []string + // This map is keyed on the results of TabletAlias.String(). + ignoreTablets map[string]struct{} } // NewTabletPicker returns a TabletPicker. @@ -146,7 +146,7 @@ func NewTabletPicker( cells []string, localCell, keyspace, shard, tabletTypesStr string, options TabletPickerOptions, - ignoreTablets ...string, + ignoreTablets ...*topodatapb.TabletAlias, ) (*TabletPicker, error) { // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. if tabletTypesStr == "" { @@ -222,7 +222,7 @@ func NewTabletPicker( } } - return &TabletPicker{ + tp := &TabletPicker{ ts: ts, cells: dedupeCells(cells), localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap}, @@ -231,8 +231,15 @@ func NewTabletPicker( tabletTypes: tabletTypes, inOrder: inOrder, cellPref: cellPref, - ignoreTablets: ignoreTablets, - }, nil + ignoreTablets: make(map[string]struct{}, len(ignoreTablets)), + } + + for _, ignoreTablet := range ignoreTablets { + tp.ignoreTablets[ignoreTablet.String()] = struct{}{} + } + + return tp, nil + } // dedupeCells is used to remove duplicates in the cell list in case it is passed in @@ -420,8 +427,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn tablets := make([]*topo.TabletInfo, 0, len(aliases)) for _, tabletAlias := range aliases { - tabletAliasString := topoproto.TabletAliasString(tabletAlias) - tabletInfo, ok := tabletMap[tabletAliasString] + tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)] if !ok { // Either tablet disappeared on us, or we got a partial result // (GetTabletMap ignores topo.ErrNoNode); just log a warning. @@ -438,8 +444,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") }); err == nil || err == io.EOF { - // If this tablet is not in the ignore list, then add it as a candidate. - if !slices.Contains(tp.ignoreTablets, tabletAliasString) { + if _, ignore := tp.ignoreTablets[tabletAlias.String()]; !ignore { tablets = append(tablets, tabletInfo) } } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 9191d161626..ac822124d58 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -26,7 +26,6 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" - "vitess.io/vitess/go/vt/topo/topoproto" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -404,14 +403,14 @@ func TestPickWithIgnoreList(t *testing.T) { defer deleteTablet(t, te, dontWant) // Specify the alias as the cell. - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, topoproto.TabletAliasString(dontWant.GetAlias())) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, dontWant.GetAlias()) require.NoError(t, err) - // Try it many times to be sure we don't ever pick from the ignore list + // Try it many times to be sure we don't ever pick from the ignore list. for i := 0; i < 100; i++ { tablet, err := tp.PickForStreaming(ctx) require.NoError(t, err) - assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) + require.False(t, proto.Equal(dontWant, tablet), "Picked the tablet we shouldn't have: %v", dontWant) } } diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 734dfaf6f2e..b31d5e7f408 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -27,18 +27,17 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" - querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/srvtopo" - "vitess.io/vitess/go/vt/vterrors" ) // vstreamManager manages vstream requests. @@ -475,7 +474,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // journalDone is assigned a channel when a journal event is encountered. // It will be closed when all journal events converge. var journalDone chan struct{} - ignoreTablets := []string{} + ignoreTablets := make([]*topodatapb.TabletAlias, 0) errCount := 0 for { @@ -494,15 +493,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var err error cells := vs.getCells() - // Create a child context with a stricter timeout. - tpCtx, tpCancel := context.WithTimeout(context.Background(), 60*time.Second) - defer tpCancel() - - tp, err := discovery.NewTabletPicker(tpCtx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) if err != nil { log.Errorf(err.Error()) return err } + + // Create a child context with a stricter timeout when picking a tablet. + // This will prevent hanging in the case no tablets are found + tpCtx, tpCancel := context.WithTimeout(context.Background(), 60*time.Second) + defer tpCancel() + tablet, err := tp.PickForStreaming(tpCtx) if err != nil { log.Errorf(err.Error()) @@ -685,7 +686,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return err } if ignoreTablet { - ignoreTablets = append(ignoreTablets, topoproto.TabletAliasString(tablet.GetAlias())) + ignoreTablets = append(ignoreTablets, tablet.GetAlias()) } errCount++ @@ -699,12 +700,13 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } // shouldRetry determines whether we should exit immediately or retry the vstream. -// The first return value determines if the error can be retried, the second indicates whether -// the tablet on which the error occurred should be ommitted from the candidate list of tablets -// to choose from on the retry. +// The first return value determines if the error can be retried, while the second +// indicates whether the tablet with which the error occurred should be ommitted +// from the candidate list of tablets to choose from on the retry. // // An error should be retried if it is expected to be transient. -// A tablet should be ignored upon retry if it's likely another tablet will succeed without the same error. +// A tablet should be ignored upon retry if it's likely another tablet will not +// produce the same error. func (vs *vstream) shouldRetry(err error) (bool, bool) { errCode := vterrors.Code(err) @@ -712,8 +714,8 @@ func (vs *vstream) shouldRetry(err error) (bool, bool) { return true, false } - // If there is a GTIDSet Mismatch on the tablet, - // omit it from the candidate list in the TabletPicker on retry. + // If there is a GTIDSet Mismatch on the tablet, omit it from the candidate + // list in the TabletPicker on retry. if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { return true, true } From 0b4f83f27107b2396f992fb7b4bce80a673a84c9 Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Tue, 17 Oct 2023 10:11:41 -0700 Subject: [PATCH 09/11] 90s ctx deadline Signed-off-by: Priya Bibra --- go/vt/vtgate/vstream_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index b31d5e7f408..79cd9be587a 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -501,7 +501,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Create a child context with a stricter timeout when picking a tablet. // This will prevent hanging in the case no tablets are found - tpCtx, tpCancel := context.WithTimeout(context.Background(), 60*time.Second) + tpCtx, tpCancel := context.WithTimeout(context.Background(), 90*time.Second) defer tpCancel() tablet, err := tp.PickForStreaming(tpCtx) From 0bc2ccecf4a2b5b434da77759715a4e228be777f Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Tue, 17 Oct 2023 10:32:55 -0700 Subject: [PATCH 10/11] refactor ignore check Signed-off-by: Priya Bibra --- go/vt/discovery/tablet_picker.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index e3d04b3de9f..e5177d81f3f 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -369,7 +369,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err) return nil } - aliases = append(aliases, si.PrimaryAlias) + if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore { + aliases = append(aliases, si.PrimaryAlias) + } } else { actualCells := make([]string, 0) for _, cell := range tp.cells { @@ -405,7 +407,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn continue } for _, node := range sri.Nodes { - aliases = append(aliases, node.TabletAlias) + if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore { + aliases = append(aliases, node.TabletAlias) + } } } } @@ -444,9 +448,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") }); err == nil || err == io.EOF { - if _, ignore := tp.ignoreTablets[tabletAlias.String()]; !ignore { - tablets = append(tablets, tabletInfo) - } + tablets = append(tablets, tabletInfo) } _ = conn.Close(ctx) } From ff8bfb415d1f2c5b7b762366d3b86d8161c1eb62 Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Tue, 17 Oct 2023 13:21:28 -0700 Subject: [PATCH 11/11] fix nits Signed-off-by: Priya Bibra --- go/vt/vtgate/vstream_manager.go | 8 ++++++-- go/vt/vtgate/vstream_manager_test.go | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 79cd9be587a..ffb8989ca5d 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -53,6 +53,10 @@ type vstreamManager struct { // maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set const maxSkewTimeoutSeconds = 10 * 60 +// tabletPickerContextTimeout is the timeout for the child context used to select candidate tablets +// for a vstream +const tabletPickerContextTimeout = 90 * time.Second + // vstream contains the metadata for one VStream request. type vstream struct { // mu protects parts of vgtid, the semantics of a send, and journaler. @@ -500,8 +504,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } // Create a child context with a stricter timeout when picking a tablet. - // This will prevent hanging in the case no tablets are found - tpCtx, tpCancel := context.WithTimeout(context.Background(), 90*time.Second) + // This will prevent hanging in the case no tablets are found. + tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout) defer tpCancel() tablet, err := tp.PickForStreaming(tpCtx) diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 55575be1db4..4c1e9ec6764 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -455,7 +455,7 @@ func TestVStreamRetriableErrors(t *testing.T) { vsm := newTestVStreamManager(ctx, hc, st, cells[0]) // Always have the local cell tablet error so it's ignored on retry and we pick the other one - // if the error requires ignoring the tablet on retry + // if the error requires ignoring the tablet on retry. sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg)) if tcase.ignoreTablet {