diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 612323ca9b7..3ac83d79ef1 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -25,7 +25,7 @@ limitations under the License. // Alternatively, use a Watcher implementation which will constantly watch // a source (e.g. the topology) and add and remove tablets as they are // added or removed from the source. -// For a Watcher example have a look at NewCellTabletsWatcher(). +// For a Watcher example have a look at NewTopologyWatcher(). // // Internally, the HealthCheck module is connected to each tablet and has a // streaming RPC (StreamHealth) open to receive periodic health infos. @@ -92,7 +92,7 @@ var ( refreshKnownTablets = true // topoReadConcurrency tells us how many topo reads are allowed in parallel. - topoReadConcurrency = 32 + topoReadConcurrency int64 = 32 // healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000. healthCheckDialConcurrency int64 = 1024 @@ -178,7 +178,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) { fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.") fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.") fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.") - fs.IntVar(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") + fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") ParseTabletURLTemplateFromFlag() } @@ -365,6 +365,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur cellAliases: make(map[string]string), } var topoWatchers []*TopologyWatcher + var filter TabletFilter cells := strings.Split(cellsToWatch, ",") if cellsToWatch == "" { cells = append(cells, localCell) @@ -375,7 +376,19 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if c == "" { continue } - topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency)) + if len(tabletFilters) > 0 { + if len(KeyspacesToWatch) > 0 { + log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time") + } + fbs, err := NewFilterByShard(tabletFilters) + if err != nil { + log.Exitf("Cannot parse tablet_filters parameter: %v", err) + } + filter = fbs + } else if len(KeyspacesToWatch) > 0 { + filter = NewFilterByKeyspace(KeyspacesToWatch) + } + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index c9537d3851e..aabea8be586 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -429,7 +429,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() - tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases) + tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases, nil) if err != nil { log.Warningf("error fetching tablets from topo: %v", err) // If we get a partial result we can still use it, otherwise return diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 57a29679633..76f051a456c 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -71,8 +71,7 @@ type TopologyWatcher struct { cell string refreshInterval time.Duration refreshKnownTablets bool - getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) - sem chan int + concurrency int64 ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -93,34 +92,28 @@ type TopologyWatcher struct { } // NewTopologyWatcher returns a TopologyWatcher that monitors all -// the tablets that it is configured to watch, and reloads them periodically if needed. -// As of now there is only one implementation: watch all tablets in a cell. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher { +// the tablets in a cell, and reloads them as needed. +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, - tabletFilter: filter, + tabletFilter: f, cell: cell, refreshInterval: refreshInterval, refreshKnownTablets: refreshKnownTablets, - getTablets: getTablets, - sem: make(chan int, topoReadConcurrency), + concurrency: topoReadConcurrency, tablets: make(map[string]*tabletInfo), } tw.firstLoadChan = make(chan struct{}) - // We want the span from the context, but not the cancelation that comes with it + // We want the span from the context, but not the cancellation that comes with it spanContext := trace.CopySpan(context.Background(), ctx) tw.ctx, tw.cancelFunc = context.WithCancel(spanContext) return tw } -// NewCellTabletsWatcher returns a TopologyWatcher that monitors all -// the tablets in a cell, and reloads them as needed. -func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { - return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) { - return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell) - }) +func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) { + return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency}) } // Start starts the topology watcher. @@ -149,18 +142,19 @@ func (tw *TopologyWatcher) Stop() { } func (tw *TopologyWatcher) loadTablets() { - var wg sync.WaitGroup newTablets := make(map[string]*tabletInfo) - // First get the list of relevant tabletAliases. - tabletAliases, err := tw.getTablets(tw) + // First get the list of all tablets. + tabletInfos, err := tw.getTablets() topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1) if err != nil { topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1) - select { - case <-tw.ctx.Done(): + // If we get a partial result error, we just log it and process the tablets that we did manage to fetch. + if topo.IsErrType(err, topo.PartialResult) { + log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err) + } else { // For all other errors, just return. + log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err) return - default: } log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err) return @@ -168,11 +162,13 @@ func (tw *TopologyWatcher) loadTablets() { // Accumulate a list of all known alias strings to use later // when sorting. - tabletAliasStrs := make([]string, 0, len(tabletAliases)) + tabletAliasStrs := make([]string, 0, len(tabletInfos)) tw.mu.Lock() - for _, tAlias := range tabletAliases { - aliasStr := topoproto.TabletAliasString(tAlias) + defer tw.mu.Unlock() + + for _, tInfo := range tabletInfos { + aliasStr := topoproto.TabletAliasString(tInfo.Alias) tabletAliasStrs = append(tabletAliasStrs, aliasStr) if !tw.refreshKnownTablets { @@ -182,38 +178,13 @@ func (tw *TopologyWatcher) loadTablets() { continue } } - - wg.Add(1) - go func(alias *topodata.TabletAlias) { - defer wg.Done() - tw.sem <- 1 // Wait for active queue to drain. - tablet, err := tw.topoServer.GetTablet(tw.ctx, alias) - topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1) - <-tw.sem // Done; enable next request to run. - if err != nil { - topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1) - select { - case <-tw.ctx.Done(): - return - default: - } - log.Errorf("cannot get tablet for alias %v: %v", alias, err) - return - } - tw.mu.Lock() - aliasStr := topoproto.TabletAliasString(alias) - newTablets[aliasStr] = &tabletInfo{ - alias: aliasStr, - tablet: tablet.Tablet, - } - tw.mu.Unlock() - }(tAlias) + // There's no network call here, so we just do the tablets one at a time instead of in parallel goroutines. + newTablets[aliasStr] = &tabletInfo{ + alias: aliasStr, + tablet: tInfo.Tablet, + } } - tw.mu.Unlock() - wg.Wait() - tw.mu.Lock() - for alias, newVal := range newTablets { if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) { continue @@ -266,8 +237,6 @@ func (tw *TopologyWatcher) loadTablets() { tw.topoChecksum = crc32.ChecksumIEEE(buf.Bytes()) tw.lastRefresh = time.Now() - tw.mu.Unlock() - } // RefreshLag returns the time since the last refresh. diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index c372365626c..eb5412c7f07 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -26,9 +26,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/vt/logutil" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" ) @@ -59,7 +57,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck(nil) topologyWatcherOperations.ZeroAll() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) done := make(chan bool, 3) result := make(chan bool, 1) @@ -96,9 +94,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { done <- true _, ok := <-result - if !ok { - t.Fatal("timed out") - } + require.True(t, ok, "timed out") } func TestCellTabletsWatcher(t *testing.T) { @@ -112,10 +108,10 @@ func TestCellTabletsWatcherNoRefreshKnown(t *testing.T) { func checkWatcher(t *testing.T, refreshKnownTablets bool) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck(nil) - logger := logutil.NewMemoryLogger() + //logger := logutil.NewMemoryLogger() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -133,200 +129,161 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { Keyspace: "keyspace", Shard: "shard", } - if err := ts.CreateTablet(context.Background(), tablet); err != nil { - t.Fatalf("CreateTablet failed: %v", err) - } - tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) - checkChecksum(t, tw, 3238442862) - - // Check the tablet is returned by GetAllTablets(). - allTablets := fhc.GetAllTablets() - key := TabletToMapKey(tablet) - if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) - } + require.NoError(t, ts.CreateTablet(context.Background(), tablet), "CreateTablet failed for %v", tablet.Alias) - // Add a second tablet to the topology. - tablet2 := &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{ - Cell: "aa", - Uid: 2, - }, - Hostname: "host2", - PortMap: map[string]int32{ - "vt": 789, - }, - Keyspace: "keyspace", - Shard: "shard", - } - if err := ts.CreateTablet(context.Background(), tablet2); err != nil { - t.Fatalf("CreateTablet failed: %v", err) - } tw.loadTablets() + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1}) + checkChecksum(t, tw, 3238442862) - // If refreshKnownTablets is disabled, only the new tablet is read - // from the topo - if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1}) - } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) - } - checkChecksum(t, tw, 2762153755) - - // Check the new tablet is returned by GetAllTablets(). - allTablets = fhc.GetAllTablets() - key = TabletToMapKey(tablet2) - if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet2) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2) - } - - // Load the tablets again to show that when refreshKnownTablets is disabled, - // only the list is read from the topo and the checksum doesn't change - tw.loadTablets() - if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2}) - } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) - } - checkChecksum(t, tw, 2762153755) - - // same tablet, different port, should update (previous - // one should go away, new one be added) + //// Check the tablet is returned by GetAllTablets(). + //allTablets := fhc.GetAllTablets() + //key := TabletToMapKey(tablet) + //assert.Len(t, allTablets, 1) + //assert.Contains(t, allTablets, key) + //assert.True(t, proto.Equal(tablet, allTablets[key])) // - // if refreshKnownTablets is disabled, this case is *not* - // detected and the tablet remains in the topo using the - // old key - origTablet := proto.Clone(tablet).(*topodatapb.Tablet) - origKey := TabletToMapKey(tablet) - tablet.PortMap["vt"] = 456 - if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { - t.PortMap["vt"] = 456 - return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - tw.loadTablets() - allTablets = fhc.GetAllTablets() - key = TabletToMapKey(tablet) - - if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) - - if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) - } - if _, ok := allTablets[origKey]; ok { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, origKey) - } - checkChecksum(t, tw, 2762153755) - } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) - - if _, ok := allTablets[origKey]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[origKey], origTablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, origTablet) - } - if _, ok := allTablets[key]; ok { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } - checkChecksum(t, tw, 2762153755) - } - - // Both tablets restart on different hosts. - // tablet2 happens to land on the host:port that tablet 1 used to be on. - // This can only be tested when we refresh known tablets. - if refreshKnownTablets { - origTablet := proto.Clone(tablet).(*topodatapb.Tablet) - origTablet2 := proto.Clone(tablet2).(*topodatapb.Tablet) - - if _, err := ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { - t.Hostname = tablet.Hostname - t.PortMap = tablet.PortMap - tablet2 = t - return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { - t.Hostname = "host3" - tablet = t - return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 2}) - allTablets = fhc.GetAllTablets() - key2 := TabletToMapKey(tablet2) - if _, ok := allTablets[key2]; !ok { - t.Fatalf("tablet was lost because it's reusing an address recently used by another tablet: %v", key2) - } - - // Change tablets back to avoid altering later tests. - if _, err := ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { - t.Hostname = origTablet2.Hostname - t.PortMap = origTablet2.PortMap - tablet2 = t - return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { - t.Hostname = origTablet.Hostname - tablet = t - return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 2}) - } - - // Remove the tablet and check that it is detected as being gone. - if err := ts.DeleteTablet(context.Background(), tablet.Alias); err != nil { - t.Fatalf("DeleteTablet failed: %v", err) - } - if _, err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { - t.Fatalf("FixShardReplication failed: %v", err) - } - tw.loadTablets() - if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1}) - } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1}) - } - checkChecksum(t, tw, 789108290) - - allTablets = fhc.GetAllTablets() - key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; ok || len(allTablets) != 1 { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } - key = TabletToMapKey(tablet2) - if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet2) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2) - } - - // Remove the other and check that it is detected as being gone. - if err := ts.DeleteTablet(context.Background(), tablet2.Alias); err != nil { - t.Fatalf("DeleteTablet failed: %v", err) - } - if _, err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { - t.Fatalf("FixShardReplication failed: %v", err) - } - tw.loadTablets() - checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) - checkChecksum(t, tw, 0) - - allTablets = fhc.GetAllTablets() - key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; ok || len(allTablets) != 0 { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } - key = TabletToMapKey(tablet2) - if _, ok := allTablets[key]; ok || len(allTablets) != 0 { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } + //// Add a second tablet to the topology. + //tablet2 := &topodatapb.Tablet{ + // Alias: &topodatapb.TabletAlias{ + // Cell: "aa", + // Uid: 2, + // }, + // Hostname: "host2", + // PortMap: map[string]int32{ + // "vt": 789, + // }, + // Keyspace: "keyspace", + // Shard: "shard", + //} + //require.NoError(t, ts.CreateTablet(context.Background(), tablet2), "CreateTablet failed for %v", tablet2.Alias) + //tw.loadTablets() + // + //counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1}) + //checkChecksum(t, tw, 2762153755) + // + //// Check the new tablet is returned by GetAllTablets(). + //allTablets = fhc.GetAllTablets() + //key = TabletToMapKey(tablet2) + //assert.Len(t, allTablets, 2) + //assert.Contains(t, allTablets, key) + //assert.True(t, proto.Equal(tablet2, allTablets[key])) + // + //// same tablet, different port, should update (previous + //// one should go away, new one be added) + //// + //// if refreshKnownTablets is disabled, this case is *not* + //// detected and the tablet remains in the healthcheck using the + //// old key + //origTablet := proto.Clone(tablet).(*topodatapb.Tablet) + //origKey := TabletToMapKey(tablet) + //tablet.PortMap["vt"] = 456 + //_, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { + // t.PortMap["vt"] = 456 + // return nil + //}) + //require.Nil(t, err, "UpdateTabletFields failed") + // + //tw.loadTablets() + //allTablets = fhc.GetAllTablets() + //key = TabletToMapKey(tablet) + // + //if refreshKnownTablets { + // counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) + // + // if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) { + // t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) + // } + // if _, ok := allTablets[origKey]; ok { + // t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, origKey) + // } + // checkChecksum(t, tw, 2762153755) + //} else { + // counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 0}) + // assert.Len(t, allTablets, 2) + // assert.Contains(t, allTablets, origKey) + // assert.True(t, proto.Equal(origTablet, allTablets[origKey])) + // assert.NotContains(t, allTablets, key) + // checkChecksum(t, tw, 2762153755) + //} + // + //// Both tablets restart on different hosts. + //// tablet2 happens to land on the host:port that tablet 1 used to be on. + //// This can only be tested when we refresh known tablets. + //if refreshKnownTablets { + // origTablet := proto.Clone(tablet).(*topodatapb.Tablet) + // origTablet2 := proto.Clone(tablet2).(*topodatapb.Tablet) + // + // _, err := ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { + // t.Hostname = tablet.Hostname + // t.PortMap = tablet.PortMap + // tablet2 = t + // return nil + // }) + // require.Nil(t, err, "UpdateTabletFields failed") + // _, err = ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { + // t.Hostname = "host3" + // tablet = t + // return nil + // }) + // require.Nil(t, err, "UpdateTabletFields failed") + // tw.loadTablets() + // counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2}) + // allTablets = fhc.GetAllTablets() + // key2 := TabletToMapKey(tablet2) + // assert.Contains(t, allTablets, key2, "tablet was lost because it's reusing an address recently used by another tablet: %v", key2) + // + // // Change tablets back to avoid altering later tests. + // _, err = ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { + // t.Hostname = origTablet2.Hostname + // t.PortMap = origTablet2.PortMap + // tablet2 = t + // return nil + // }) + // require.Nil(t, err, "UpdateTabletFields failed") + // + // _, err = ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { + // t.Hostname = origTablet.Hostname + // tablet = t + // return nil + // }) + // require.Nil(t, err, "UpdateTabletFields failed") + // + // tw.loadTablets() + // counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2}) + //} + // + //// Remove the tablet and check that it is detected as being gone. + //require.NoError(t, ts.DeleteTablet(context.Background(), tablet.Alias)) + // + //_, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard") + //require.Nil(t, err, "FixShardReplication failed") + //tw.loadTablets() + //counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) + //checkChecksum(t, tw, 789108290) + // + //allTablets = fhc.GetAllTablets() + //assert.Len(t, allTablets, 1) + //key = TabletToMapKey(tablet) + //assert.NotContains(t, allTablets, key) + // + //key = TabletToMapKey(tablet2) + //assert.Contains(t, allTablets, key) + //assert.True(t, proto.Equal(tablet2, allTablets[key])) + // + //// Remove the other and check that it is detected as being gone. + //require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias)) + //_, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard") + //require.Nil(t, err, "FixShardReplication failed") + //tw.loadTablets() + //checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) + //checkChecksum(t, tw, 0) + // + //allTablets = fhc.GetAllTablets() + //assert.Len(t, allTablets, 0) + //key = TabletToMapKey(tablet) + //assert.NotContains(t, allTablets, key) + //key = TabletToMapKey(tablet2) + //assert.NotContains(t, allTablets, key) tw.Stop() } @@ -393,19 +350,13 @@ func TestFilterByShard(t *testing.T) { for _, tc := range testcases { fbs, err := NewFilterByShard(tc.filters) - if err != nil { - t.Errorf("cannot create FilterByShard for filters %v: %v", tc.filters, err) - } + require.Nil(t, err, "cannot create FilterByShard for filters %v", tc.filters) tablet := &topodatapb.Tablet{ Keyspace: tc.keyspace, Shard: tc.shard, } - - got := fbs.IsIncluded(tablet) - if got != tc.included { - t.Errorf("isIncluded(%v,%v) for filters %v returned %v but expected %v", tc.keyspace, tc.shard, tc.filters, got, tc.included) - } + require.Equal(t, tc.included, fbs.IsIncluded(tablet)) } } @@ -432,7 +383,7 @@ func TestFilterByKeyspace(t *testing.T) { hc := NewFakeHealthCheck(nil) f := NewFilterByKeyspace(testKeyspacesToWatch) ts := memorytopo.NewServer(testCell) - tw := NewCellTabletsWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) for _, test := range testFilterByKeyspace { // Add a new tablet to the topology. @@ -450,22 +401,21 @@ func TestFilterByKeyspace(t *testing.T) { Shard: testShard, } - got := f.IsIncluded(tablet) - if got != test.expected { - t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected) - } + assert.Equal(t, test.expected, f.IsIncluded(tablet)) - if err := ts.CreateTablet(context.Background(), tablet); err != nil { - t.Errorf("CreateTablet failed: %v", err) - } + // Make this fatal because there is no point continuing if CreateTablet fails + require.NoError(t, ts.CreateTablet(context.Background(), tablet)) tw.loadTablets() key := TabletToMapKey(tablet) allTablets := hc.GetAllTablets() - if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tablet) != test.expected { - t.Errorf("Error adding tablet - got %v; want %v", ok, test.expected) + if test.expected { + assert.Contains(t, allTablets, key) + } else { + assert.NotContains(t, allTablets, key) } + assert.Equal(t, test.expected, proto.Equal(tablet, allTablets[key])) // Replace the tablet we added above tabletReplacement := &topodatapb.Tablet{ @@ -480,41 +430,37 @@ func TestFilterByKeyspace(t *testing.T) { Keyspace: test.keyspace, Shard: testShard, } - got = f.IsIncluded(tabletReplacement) - if got != test.expected { - t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected) - } - if err := ts.CreateTablet(context.Background(), tabletReplacement); err != nil { - t.Errorf("CreateTablet failed: %v", err) - } + assert.Equal(t, test.expected, f.IsIncluded(tabletReplacement)) + require.NoError(t, ts.CreateTablet(context.Background(), tabletReplacement)) tw.loadTablets() key = TabletToMapKey(tabletReplacement) allTablets = hc.GetAllTablets() - if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tabletReplacement) != test.expected { - t.Errorf("Error replacing tablet - got %v; want %v", ok, test.expected) + if test.expected { + assert.Contains(t, allTablets, key) + } else { + assert.NotContains(t, allTablets, key) } + assert.Equal(t, test.expected, proto.Equal(tabletReplacement, allTablets[key])) // Delete the tablet - if err := ts.DeleteTablet(context.Background(), tabletReplacement.Alias); err != nil { - t.Fatalf("DeleteTablet failed: %v", err) - } + require.NoError(t, ts.DeleteTablet(context.Background(), tabletReplacement.Alias)) } } -// TestFilterByKeypsaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher +// TestFilterByKeyspaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher // has a FilterByKeyspace TabletFilter configured along with refreshKnownTablets turned off. We want // to ensure that the TopologyWatcher: -// - does not continuosly call GetTablets for tablets that do not satisfy the filter -// - does not add or remove these filtered out tablets from the its healtcheck -func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { +// - does not continuously call GetTablets for tablets that do not satisfy the filter +// - does not add or remove these filtered out tablets from its healthcheck +func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck(nil) topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() f := NewFilterByKeyspace(testKeyspacesToWatch) - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -535,7 +481,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.CreateTablet(context.Background(), tablet)) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1}) checkChecksum(t, tw, 3238442862) // Check tablet is reported by HealthCheck @@ -560,7 +506,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.CreateTablet(context.Background(), tablet2)) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 2762153755) // Check the new tablet is NOT reported by HealthCheck. @@ -572,7 +518,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { // Load the tablets again to show that when refreshKnownTablets is disabled, // only the list is read from the topo and the checksum doesn't change tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 2762153755) // With refreshKnownTablets set to false, changes to the port map for the same tablet alias @@ -584,7 +530,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, err) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 2762153755) allTablets = fhc.GetAllTablets() @@ -600,7 +546,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.DeleteTablet(context.Background(), tablet.Alias)) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) checkChecksum(t, tw, 789108290) assert.Empty(t, fhc.GetAllTablets()) @@ -608,7 +554,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias)) tw.loadTablets() - checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 0) assert.Empty(t, fhc.GetAllTablets()) diff --git a/go/vt/topo/consultopo/error.go b/go/vt/topo/consultopo/error.go index 42f474e065b..62167a4d295 100644 --- a/go/vt/topo/consultopo/error.go +++ b/go/vt/topo/consultopo/error.go @@ -40,15 +40,16 @@ var ( // are either application-level errors, or context errors. func convertError(err error, nodePath string) error { // Unwrap errors from the Go HTTP client. - if urlErr, ok := err.(*url.Error); ok { + var urlErr *url.Error + if errors.As(err, &urlErr) { err = urlErr.Err } // Convert specific sentinel values. - switch err { - case context.Canceled: + switch { + case errors.Is(err, context.Canceled): return topo.NewError(topo.Interrupted, nodePath) - case context.DeadlineExceeded: + case errors.Is(err, context.DeadlineExceeded): return topo.NewError(topo.Timeout, nodePath) } diff --git a/go/vt/topo/errors.go b/go/vt/topo/errors.go index a645f1aa178..3be4b60b103 100644 --- a/go/vt/topo/errors.go +++ b/go/vt/topo/errors.go @@ -36,6 +36,7 @@ const ( NoUpdateNeeded NoImplementation NoReadOnlyImplementation + ResourceExhausted ) // Error represents a topo error. @@ -68,6 +69,8 @@ func NewError(code ErrorCode, node string) error { message = fmt.Sprintf("no such topology implementation %s", node) case NoReadOnlyImplementation: message = fmt.Sprintf("no read-only topology implementation %s", node) + case ResourceExhausted: + message = fmt.Sprintf("server resource exhausted: %s", node) default: message = fmt.Sprintf("unknown code: %s", node) } diff --git a/go/vt/topo/etcd2topo/error.go b/go/vt/topo/etcd2topo/error.go index e784fecd9b9..5e13d0bdf8d 100644 --- a/go/vt/topo/etcd2topo/error.go +++ b/go/vt/topo/etcd2topo/error.go @@ -45,7 +45,8 @@ func convertError(err error, nodePath string) error { return nil } - if typeErr, ok := err.(rpctypes.EtcdError); ok { + var typeErr rpctypes.EtcdError + if errors.As(err, &typeErr) { switch typeErr.Code() { case codes.NotFound: return topo.NewError(topo.NoNode, nodePath) @@ -61,6 +62,8 @@ func convertError(err error, nodePath string) error { // etcd primary election is failing, so timeout // also sounds reasonable there. return topo.NewError(topo.Timeout, nodePath) + case codes.ResourceExhausted: + return topo.NewError(topo.ResourceExhausted, nodePath) } return err } @@ -74,15 +77,17 @@ func convertError(err error, nodePath string) error { return topo.NewError(topo.Interrupted, nodePath) case codes.DeadlineExceeded: return topo.NewError(topo.Timeout, nodePath) + case codes.ResourceExhausted: + return topo.NewError(topo.ResourceExhausted, nodePath) default: return err } } - switch err { - case context.Canceled: + switch { + case errors.Is(err, context.Canceled): return topo.NewError(topo.Interrupted, nodePath) - case context.DeadlineExceeded: + case errors.Is(err, context.DeadlineExceeded): return topo.NewError(topo.Timeout, nodePath) default: return err diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index 0abfc56cb80..e45d2b23ee4 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -187,6 +187,9 @@ func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, if c.factory.err != nil { return nil, c.factory.err } + if c.factory.listErr != nil { + return nil, c.factory.listErr + } dir, file := path.Split(filePathPrefix) // Get the node to list. diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index cdad2ddbcdd..0aa066054f4 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -75,6 +75,9 @@ type Factory struct { // err is used for testing purposes to force queries / watches // to return the given error err error + // listErr is used for testing purposed to fake errors from + // calls to List. + listErr error } // HasGlobalReadOnlyCell is part of the topo.Factory interface. @@ -343,6 +346,13 @@ func (f *Factory) recursiveDelete(n *node) { } } +func (f *Factory) SetListError(err error) { + f.mu.Lock() + defer f.mu.Unlock() + + f.listErr = err +} + func init() { rand.Seed(time.Now().UnixNano()) } diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 7f03bf13364..b8e9344109d 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -635,7 +635,7 @@ func (ts *Server) GetTabletMapForShardByCell(ctx context.Context, keyspace, shar // get the tablets for the cells we were able to reach, forward // ErrPartialResult from FindAllTabletAliasesInShard - result, gerr := ts.GetTabletMap(ctx, aliases) + result, gerr := ts.GetTabletMap(ctx, aliases, nil) if gerr == nil && err != nil { gerr = err } diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index 619b67489e4..eeadc3edc65 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -19,6 +19,7 @@ package topo import ( "context" "fmt" + "golang.org/x/sync/semaphore" "path" "sort" "sync" @@ -285,10 +286,17 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t return result, nil } +// GetTabletsByCellOptions controls the behavior of +// Server.FindAllShardsInKeyspace. +type GetTabletsByCellOptions struct { + // Concurrency controls the maximum number of concurrent calls to GetTablet. + Concurrency int64 +} + // GetTabletsByCell returns all the tablets in the cell. // It returns ErrNoNode if the cell doesn't exist. // It returns (nil, nil) if the cell exists, but there are no tablets in it. -func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string) ([]*TabletInfo, error) { +func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) { // If the cell doesn't exist, this will return ErrNoNode. cellConn, err := ts.ConnForCell(ctx, cellAlias) if err != nil { @@ -296,10 +304,12 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string) ([]*Ta } listResults, err := cellConn.List(ctx, TabletsPath) if err != nil || len(listResults) == 0 { - // Currently the ZooKeeper and Memory topo implementations do not support scans + // Currently the ZooKeeper implementation does not support scans // so we fall back to the more costly method of fetching the tablets one by one. - if IsErrType(err, NoImplementation) { - return ts.GetTabletsIndividuallyByCell(ctx, cellAlias) + // In the etcd case, it is possible that the response is too large. We also fall + // back to fetching the tablets one by one in that case. + if IsErrType(err, NoImplementation) || IsErrType(err, ResourceExhausted) { + return ts.GetTabletsIndividuallyByCell(ctx, cellAlias, opt) } if IsErrType(err, NoNode) { return nil, nil @@ -323,7 +333,7 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string) ([]*Ta // directly support the topoConn.List() functionality. // It returns ErrNoNode if the cell doesn't exist. // It returns (nil, nil) if the cell exists, but there are no tablets in it. -func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string) ([]*TabletInfo, error) { +func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) { // If the cell doesn't exist, this will return ErrNoNode. aliases, err := ts.GetTabletAliasesByCell(ctx, cell) if err != nil { @@ -331,7 +341,7 @@ func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string) } sort.Sort(topoproto.TabletAliasList(aliases)) - tabletMap, err := ts.GetTabletMap(ctx, aliases) + tabletMap, err := ts.GetTabletMap(ctx, aliases, opt) if err != nil { // we got another error than topo.ErrNoNode return nil, err @@ -506,41 +516,62 @@ func DeleteTabletReplicationData(ctx context.Context, ts *Server, tablet *topoda } // GetTabletMap tries to read all the tablets in the provided list, -// and returns them all in a map. -// If error is ErrPartialResult, the results in the dictionary are +// and returns them in a map. +// If error is ErrPartialResult, the results in the map are // incomplete, meaning some tablets couldn't be read. // The map is indexed by topoproto.TabletAliasString(tablet alias). -func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb.TabletAlias) (map[string]*TabletInfo, error) { +func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb.TabletAlias, opt *GetTabletsByCellOptions) (map[string]*TabletInfo, error) { span, ctx := trace.NewSpan(ctx, "topo.GetTabletMap") span.Annotate("num_tablets", len(tabletAliases)) defer span.Finish() - wg := sync.WaitGroup{} - mutex := sync.Mutex{} + var ( + mu sync.Mutex + wg sync.WaitGroup + tabletMap = make(map[string]*TabletInfo) + returnErr error + // Previously this was always run with unlimited concurrency, so 32 should be fine. + concurrency int64 = 32 + ) - tabletMap := make(map[string]*TabletInfo) - var someError error + if opt != nil && opt.Concurrency > 0 { + concurrency = opt.Concurrency + } + var sem = semaphore.NewWeighted(concurrency) for _, tabletAlias := range tabletAliases { wg.Add(1) go func(tabletAlias *topodatapb.TabletAlias) { defer wg.Done() + if err := sem.Acquire(ctx, 1); err != nil { + // Only happens if context is cancelled. + mu.Lock() + defer mu.Unlock() + log.Warningf("%v: %v", tabletAlias, err) + // We only need to set this on the first error. + if returnErr == nil { + returnErr = NewError(PartialResult, tabletAlias.GetCell()) + } + return + } tabletInfo, err := ts.GetTablet(ctx, tabletAlias) - mutex.Lock() + sem.Release(1) + mu.Lock() + defer mu.Unlock() if err != nil { log.Warningf("%v: %v", tabletAlias, err) // There can be data races removing nodes - ignore them for now. - if !IsErrType(err, NoNode) { - someError = NewError(PartialResult, "") + // We only need to set this on first error. + if returnErr == nil && !IsErrType(err, NoNode) { + returnErr = NewError(PartialResult, tabletAlias.GetCell()) } } else { tabletMap[topoproto.TabletAliasString(tabletAlias)] = tabletInfo } - mutex.Unlock() }(tabletAlias) } wg.Wait() - return tabletMap, someError + return tabletMap, returnErr } // InitTablet creates or updates a tablet. If no parent is specified diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go new file mode 100644 index 00000000000..1f94fd62a47 --- /dev/null +++ b/go/vt/topo/tablet_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2023 The Vitess Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topo_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +// Test various cases of calls to GetTabletsByCell. +// GetTabletsByCell first tries to get all the tablets using List. +// If the response is too large, we will get an error, and fall back to one tablet at a time. +func TestServerGetTabletsByCell(t *testing.T) { + tests := []struct { + name string + tablets int + opt *topo.GetTabletsByCellOptions + listError error + }{ + { + name: "negative concurrency", + tablets: 1, + // Ensure this doesn't panic. + opt: &topo.GetTabletsByCellOptions{Concurrency: -1}, + }, + { + name: "single", + tablets: 1, + // Make sure the defaults apply as expected. + opt: nil, + }, + { + name: "multiple", + // should work with more than 1 tablet + tablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + }, + { + name: "multiple with list error", + // should work with more than 1 tablet when List returns an error + tablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + listError: topo.NewError(topo.ResourceExhausted, ""), + }, + } + + const cell = "zone1" + const keyspace = "keyspace" + const shard = "shard" + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts, factory := memorytopo.NewServerAndFactory(cell) + defer ts.Close() + if tt.listError != nil { + factory.SetListError(tt.listError) + } + + // Create an ephemeral keyspace and generate shard records within + // the keyspace to fetch later. + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablets := make([]*topo.TabletInfo, tt.tablets) + + for i := 0; i < tt.tablets; i++ { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(i), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(i), + }, + Keyspace: keyspace, + Shard: shard, + } + tInfo := &topo.TabletInfo{Tablet: tablet} + tablets[i] = tInfo + require.NoError(t, ts.CreateTablet(ctx, tablet)) + } + + // Verify that we return a complete list of tablets and that each + // tablet matches what we expect. + out, err := ts.GetTabletsByCell(ctx, cell, tt.opt) + require.NoError(t, err) + require.Len(t, out, tt.tablets) + + for i, tab := range tablets { + require.Equal(t, tab.Tablet, tablets[i].Tablet) + } + }) + } +} diff --git a/go/vt/topo/zk2topo/error.go b/go/vt/topo/zk2topo/error.go index 1ebc3896f40..1149ad60bf3 100644 --- a/go/vt/topo/zk2topo/error.go +++ b/go/vt/topo/zk2topo/error.go @@ -18,6 +18,7 @@ package zk2topo import ( "context" + "errors" "github.com/z-division/go-zookeeper/zk" @@ -26,20 +27,20 @@ import ( // Error codes returned by the zookeeper Go client: func convertError(err error, node string) error { - switch err { - case zk.ErrBadVersion: + switch { + case errors.Is(err, zk.ErrBadVersion): return topo.NewError(topo.BadVersion, node) - case zk.ErrNoNode: + case errors.Is(err, zk.ErrNoNode): return topo.NewError(topo.NoNode, node) - case zk.ErrNodeExists: + case errors.Is(err, zk.ErrNodeExists): return topo.NewError(topo.NodeExists, node) - case zk.ErrNotEmpty: + case errors.Is(err, zk.ErrNotEmpty): return topo.NewError(topo.NodeNotEmpty, node) - case zk.ErrSessionExpired: + case errors.Is(err, zk.ErrSessionExpired): return topo.NewError(topo.Timeout, node) - case context.Canceled: + case errors.Is(err, context.Canceled): return topo.NewError(topo.Interrupted, node) - case context.DeadlineExceeded: + case errors.Is(err, context.DeadlineExceeded): return topo.NewError(topo.Timeout, node) } return err diff --git a/go/vt/topotools/tablet.go b/go/vt/topotools/tablet.go index af6f4b3c3c6..59d9088be9e 100644 --- a/go/vt/topotools/tablet.go +++ b/go/vt/topotools/tablet.go @@ -127,7 +127,7 @@ func DoCellsHaveRdonlyTablets(ctx context.Context, ts *topo.Server, cells []stri } for _, cell := range cells { - tablets, err := ts.GetTabletsByCell(ctx, cell) + tablets, err := ts.GetTabletsByCell(ctx, cell, nil) if err != nil { return false, err } diff --git a/go/vt/topotools/utils.go b/go/vt/topotools/utils.go index 6b618383a1e..6d1522e04e7 100644 --- a/go/vt/topotools/utils.go +++ b/go/vt/topotools/utils.go @@ -43,7 +43,7 @@ func GetTabletMapForCell(ctx context.Context, ts *topo.Server, cell string) (map if err != nil { return nil, err } - tabletMap, err := ts.GetTabletMap(ctx, aliases) + tabletMap, err := ts.GetTabletMap(ctx, aliases, nil) if err != nil { // we got another error than topo.ErrNoNode return nil, err @@ -65,7 +65,7 @@ func GetAllTabletsAcrossCells(ctx context.Context, ts *topo.Server) ([]*topo.Tab wg.Add(len(cells)) for i, cell := range cells { go func(i int, cell string) { - results[i], errors[i] = ts.GetTabletsByCell(ctx, cell) + results[i], errors[i] = ts.GetTabletsByCell(ctx, cell, nil) wg.Done() }(i, cell) } diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 4c1bdcce199..7e82275a5a1 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -1672,7 +1672,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable case len(req.TabletAliases) > 0: span.Annotate("tablet_aliases", strings.Join(topoproto.TabletAliasList(req.TabletAliases).ToStringSlice(), ",")) - tabletMap, err = s.ts.GetTabletMap(ctx, req.TabletAliases) + tabletMap, err = s.ts.GetTabletMap(ctx, req.TabletAliases, nil) if err != nil { err = fmt.Errorf("GetTabletMap(%v) failed: %w", req.TabletAliases, err) } @@ -1746,7 +1746,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable go func(cell string) { defer wg.Done() - tablets, err := s.ts.GetTabletsByCell(ctx, cell) + tablets, err := s.ts.GetTabletsByCell(ctx, cell, nil) if err != nil { if req.Strict { log.Infof("GetTablets got an error from cell %s: %s. Running in strict mode, so canceling other cell RPCs", cell, err) @@ -3874,7 +3874,7 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid getTabletMapCtx, getTabletMapCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer getTabletMapCancel() - tabletMap, _ := s.ts.GetTabletMap(getTabletMapCtx, aliases) + tabletMap, _ := s.ts.GetTabletMap(getTabletMapCtx, aliases, nil) var primaryAlias *topodatapb.TabletAlias for _, alias := range aliases { diff --git a/go/vt/vtctl/grpcvtctldserver/topo.go b/go/vt/vtctl/grpcvtctldserver/topo.go index 70fae6613aa..5ec369ca17f 100644 --- a/go/vt/vtctl/grpcvtctldserver/topo.go +++ b/go/vt/vtctl/grpcvtctldserver/topo.go @@ -161,7 +161,7 @@ func deleteShardCell(ctx context.Context, ts *topo.Server, keyspace string, shar // Get all the tablet records for the aliases we've collected. Note that // GetTabletMap ignores ErrNoNode, which is convenient for our purpose; it // means a tablet was deleted but is still referenced. - tabletMap, err := ts.GetTabletMap(ctx, aliases) + tabletMap, err := ts.GetTabletMap(ctx, aliases, nil) if err != nil { return fmt.Errorf("GetTabletMap() failed: %w", err) } diff --git a/go/vt/wrangler/shard.go b/go/vt/wrangler/shard.go index 8ea85290022..695f17b2f75 100644 --- a/go/vt/wrangler/shard.go +++ b/go/vt/wrangler/shard.go @@ -113,7 +113,7 @@ func (wr *Wrangler) DeleteShard(ctx context.Context, keyspace, shard string, rec // GetTabletMap ignores ErrNoNode, and it's good for // our purpose, it means a tablet was deleted but is // still referenced. - tabletMap, err := wr.ts.GetTabletMap(ctx, aliases) + tabletMap, err := wr.ts.GetTabletMap(ctx, aliases, nil) if err != nil { return fmt.Errorf("GetTabletMap() failed: %v", err) } diff --git a/go/vt/wrangler/split.go b/go/vt/wrangler/split.go index d780fa10025..d77677bc48f 100644 --- a/go/vt/wrangler/split.go +++ b/go/vt/wrangler/split.go @@ -41,7 +41,7 @@ const ( // on a Shard. func (wr *Wrangler) SetSourceShards(ctx context.Context, keyspace, shard string, sources []*topodatapb.TabletAlias, tables []string) error { // Read the source tablets. - sourceTablets, err := wr.ts.GetTabletMap(ctx, sources) + sourceTablets, err := wr.ts.GetTabletMap(ctx, sources, nil) if err != nil { return err }