Skip to content

Commit

Permalink
Backport Use GetTabletsByCell in healthcheck
Browse files Browse the repository at this point in the history
This backports upstram PR vitessio#14693, with a few minor changes to make it work with the Go
version we are using and a small change to topology_watcher.go so that test cases reflect
and test for the same behavior as the upstream code. The description of the original PR
follows:

VTGate's healthcheck module currently calls GetTablet for each tablet alias that it discovers
in a cell. Instead we can use GetTabletsForCell to fetch all tablets for a cell at once.

This PR does a few more things:

* GetTabletsForCell now handles the case where the response size violates gRPC limits by
  falling back to one tablet at a time in case of error.
* Previously, the one tablet at a time method had unlimited concurrency. In this PR we
  introduce a configuration option for concurrency.
* We pass topoReadConcurrency from healthcheck into GetTabletsForCell.
* The behavior of --refresh_known_tablets flag is different now. Previously we would not
  read those tablets at all, now we do read them, but ignore any changes if they are
  already known.

The basic fix has already been tried in production and shown to reduce the number of Get
calls from vtgate -> topo from O(n) to O(1).

We can consider deprecating and deleting --refresh_known_tablets in a future release.
The concerns that originally motivated adding that flag in vitessio#3965 are alleviated by fetching
all tablets in one call to the topo.
  • Loading branch information
ejortegau committed Sep 16, 2024
1 parent bd70d86 commit 64e4bde
Show file tree
Hide file tree
Showing 19 changed files with 441 additions and 344 deletions.
21 changes: 17 additions & 4 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ limitations under the License.
// Alternatively, use a Watcher implementation which will constantly watch
// a source (e.g. the topology) and add and remove tablets as they are
// added or removed from the source.
// For a Watcher example have a look at NewCellTabletsWatcher().
// For a Watcher example have a look at NewTopologyWatcher().
//
// Internally, the HealthCheck module is connected to each tablet and has a
// streaming RPC (StreamHealth) open to receive periodic health infos.
Expand Down Expand Up @@ -92,7 +92,7 @@ var (
refreshKnownTablets = true

// topoReadConcurrency tells us how many topo reads are allowed in parallel.
topoReadConcurrency = 32
topoReadConcurrency int64 = 32

// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
healthCheckDialConcurrency int64 = 1024
Expand Down Expand Up @@ -178,7 +178,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.IntVar(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
ParseTabletURLTemplateFromFlag()
}
Expand Down Expand Up @@ -365,6 +365,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cellAliases: make(map[string]string),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
Expand All @@ -375,7 +376,19 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}
fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
}
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases)
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases, nil)
if err != nil {
log.Warningf("error fetching tablets from topo: %v", err)
// If we get a partial result we can still use it, otherwise return
Expand Down
81 changes: 25 additions & 56 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ type TopologyWatcher struct {
cell string
refreshInterval time.Duration
refreshKnownTablets bool
getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)
sem chan int
concurrency int64
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -93,34 +92,28 @@ type TopologyWatcher struct {
}

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets that it is configured to watch, and reloads them periodically if needed.
// As of now there is only one implementation: watch all tablets in a cell.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
// the tablets in a cell, and reloads them as needed.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
tabletFilter: filter,
tabletFilter: f,
cell: cell,
refreshInterval: refreshInterval,
refreshKnownTablets: refreshKnownTablets,
getTablets: getTablets,
sem: make(chan int, topoReadConcurrency),
concurrency: topoReadConcurrency,
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})

// We want the span from the context, but not the cancelation that comes with it
// We want the span from the context, but not the cancellation that comes with it
spanContext := trace.CopySpan(context.Background(), ctx)
tw.ctx, tw.cancelFunc = context.WithCancel(spanContext)
return tw
}

// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
})
func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency})
}

// Start starts the topology watcher.
Expand Down Expand Up @@ -149,30 +142,33 @@ func (tw *TopologyWatcher) Stop() {
}

func (tw *TopologyWatcher) loadTablets() {
var wg sync.WaitGroup
newTablets := make(map[string]*tabletInfo)

// First get the list of relevant tabletAliases.
tabletAliases, err := tw.getTablets(tw)
// First get the list of all tablets.
tabletInfos, err := tw.getTablets()
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1)
select {
case <-tw.ctx.Done():
// If we get a partial result error, we just log it and process the tablets that we did manage to fetch.
if topo.IsErrType(err, topo.PartialResult) {
log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err)
} else { // For all other errors, just return.
log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err)
return
default:
}
log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err)
return
}

// Accumulate a list of all known alias strings to use later
// when sorting.
tabletAliasStrs := make([]string, 0, len(tabletAliases))
tabletAliasStrs := make([]string, 0, len(tabletInfos))

tw.mu.Lock()
for _, tAlias := range tabletAliases {
aliasStr := topoproto.TabletAliasString(tAlias)
defer tw.mu.Unlock()

for _, tInfo := range tabletInfos {
aliasStr := topoproto.TabletAliasString(tInfo.Alias)
tabletAliasStrs = append(tabletAliasStrs, aliasStr)

if !tw.refreshKnownTablets {
Expand All @@ -182,38 +178,13 @@ func (tw *TopologyWatcher) loadTablets() {
continue
}
}

wg.Add(1)
go func(alias *topodata.TabletAlias) {
defer wg.Done()
tw.sem <- 1 // Wait for active queue to drain.
tablet, err := tw.topoServer.GetTablet(tw.ctx, alias)
topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1)
<-tw.sem // Done; enable next request to run.
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1)
select {
case <-tw.ctx.Done():
return
default:
}
log.Errorf("cannot get tablet for alias %v: %v", alias, err)
return
}
tw.mu.Lock()
aliasStr := topoproto.TabletAliasString(alias)
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
tablet: tablet.Tablet,
}
tw.mu.Unlock()
}(tAlias)
// There's no network call here, so we just do the tablets one at a time instead of in parallel goroutines.
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
tablet: tInfo.Tablet,
}
}

tw.mu.Unlock()
wg.Wait()
tw.mu.Lock()

for alias, newVal := range newTablets {
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) {
continue
Expand Down Expand Up @@ -266,8 +237,6 @@ func (tw *TopologyWatcher) loadTablets() {
tw.topoChecksum = crc32.ChecksumIEEE(buf.Bytes())
tw.lastRefresh = time.Now()

tw.mu.Unlock()

}

// RefreshLag returns the time since the last refresh.
Expand Down
Loading

0 comments on commit 64e4bde

Please sign in to comment.