Skip to content

Commit

Permalink
vtgate: support filtering tablets by tablet-tags
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed May 9, 2024
1 parent f27d287 commit b580794
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 13 deletions.
16 changes: 12 additions & 4 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -82,6 +83,9 @@ var (
// tabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets.
tabletFilters []string

// tabletFilterTags are the key/values filters to apply to the full set of tablets.
tabletFilterTags flagutil.StringMapValue

// refreshInterval is the interval at which healthcheck refreshes its list of tablets from topo.
refreshInterval = 1 * time.Minute

Expand Down Expand Up @@ -164,6 +168,7 @@ func init() {

func registerDiscoveryFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&tabletFilters, "tablet_filters", []string{}, "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.")
fs.Var(&tabletFilterTags, "tablet-filter-tags", "Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.")
fs.Var((*topoproto.TabletTypeListFlag)(&AllowedTabletTypes), "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types.")
fs.StringSliceVar(&KeyspacesToWatch, "keyspaces_to_watch", []string{}, "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema.")
}
Expand Down Expand Up @@ -337,7 +342,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
loadTabletsTrigger: make(chan struct{}),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
var filters []TabletFilter
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
Expand All @@ -357,11 +362,14 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
filters = []TabletFilter{fbs}
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
filters = []TabletFilter{NewFilterByKeyspace(KeyspacesToWatch)}
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
42 changes: 37 additions & 5 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type TopologyWatcher struct {
// set at construction time
topoServer *topo.Server
healthcheck HealthCheck
tabletFilter TabletFilter
tabletFilters []TabletFilter
cell string
refreshInterval time.Duration
refreshKnownTablets bool
Expand All @@ -92,11 +92,11 @@ type TopologyWatcher struct {

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f []TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
tabletFilter: f,
tabletFilters: f,
cell: cell,
refreshInterval: refreshInterval,
refreshKnownTablets: refreshKnownTablets,
Expand Down Expand Up @@ -141,6 +141,16 @@ func (tw *TopologyWatcher) Stop() {
tw.wg.Wait()
}

// hasTabletFiltersMatch returns true if a tablet matches all tablet filters.
func (tw *TopologyWatcher) hasTabletFiltersMatch(tablet *topodata.Tablet) bool {
for _, tabletFilter := range tw.tabletFilters {
if !tabletFilter.IsIncluded(tablet) {
return false
}
}
return true
}

func (tw *TopologyWatcher) loadTablets() {
newTablets := make(map[string]*tabletInfo)
var partialResult bool
Expand Down Expand Up @@ -198,7 +208,7 @@ func (tw *TopologyWatcher) loadTablets() {
}

for alias, newVal := range newTablets {
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) {
if tw.tabletFilters != nil && !tw.hasTabletFiltersMatch(newVal.tablet) {
continue
}

Expand All @@ -221,7 +231,7 @@ func (tw *TopologyWatcher) loadTablets() {
}

for _, val := range tw.tablets {
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(val.tablet) {
if tw.tabletFilters != nil && !tw.hasTabletFiltersMatch(val.tablet) {
continue
}

Expand Down Expand Up @@ -375,3 +385,25 @@ func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
}

// TODO:
type FilterByTabletTags struct {
tags map[string]string
}

func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags {
return &FilterByTabletTags{tags: tabletTags}
}

// IsIncluded returns true if the tablet's tags match what we expect.
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool {
if tablet.Tags == nil {
return false
}
for key, val := range fbtg.tags {
if tabletVal, found := tablet.Tags[key]; !found || tabletVal != val {
return false
}
}
return true
}
25 changes: 21 additions & 4 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestFilterByKeyspace(t *testing.T) {
ctx := utils.LeakCheckContext(t)

hc := NewFakeHealthCheck(nil)
f := NewFilterByKeyspace(testKeyspacesToWatch)
f := []TabletFilter{NewFilterByKeyspace(testKeyspacesToWatch)}
ts := memorytopo.NewServer(ctx, testCell)
defer ts.Close()
tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5)
Expand All @@ -414,7 +414,7 @@ func TestFilterByKeyspace(t *testing.T) {
Shard: testShard,
}

assert.Equal(t, test.expected, f.IsIncluded(tablet))
assert.Equal(t, test.expected, tw.hasTabletFiltersMatch(tablet))

// Make this fatal because there is no point continuing if CreateTablet fails
require.NoError(t, ts.CreateTablet(context.Background(), tablet))
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestFilterByKeyspace(t *testing.T) {
Keyspace: test.keyspace,
Shard: testShard,
}
assert.Equal(t, test.expected, f.IsIncluded(tabletReplacement))
assert.Equal(t, test.expected, tw.hasTabletFiltersMatch(tabletReplacement))
require.NoError(t, ts.CreateTablet(context.Background(), tabletReplacement))

tw.loadTablets()
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
f := NewFilterByKeyspace(testKeyspacesToWatch)
f := []TabletFilter{NewFilterByKeyspace(testKeyspacesToWatch)}
tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5)

counts = checkOpCounts(t, counts, map[string]int64{})
Expand Down Expand Up @@ -578,6 +578,23 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {
tw.Stop()
}

func TestNewFilterByTabletTags(t *testing.T) {
tags := map[string]string{
"instance_type": "i3.xlarge",
"some_key": "some_value",
}
filter := NewFilterByTabletTags(tags)
assert.False(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: nil,
}))
assert.False(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: map[string]string{},
}))
assert.True(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: tags,
}))
}

func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

Expand Down

0 comments on commit b580794

Please sign in to comment.