Skip to content

Commit

Permalink
Support passing filters to discovery.NewHealthCheck(...) (#16170)
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt authored Sep 25, 2024
1 parent c4a9d39 commit 4058966
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 58 deletions.
52 changes: 32 additions & 20 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"net/http"
Expand Down Expand Up @@ -105,6 +106,9 @@ var (
// HealthCheckHealthyTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Healthy Tablets` title to
// create the HTML code required to render the list of healthy tablets from the HealthCheck.
HealthCheckHealthyTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Healthy Tablets")

// errKeyspacesToWatchAndTabletFilters is an error for cases where incompatible filters are defined.
errKeyspacesToWatchAndTabletFilters = errors.New("only one of --keyspaces_to_watch and --tablet_filters may be specified at a time")
)

// See the documentation for NewHealthCheck below for an explanation of these parameters.
Expand Down Expand Up @@ -301,6 +305,27 @@ type HealthCheckImpl struct {
healthCheckDialSem *semaphore.Weighted
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) {
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
return nil, errKeyspacesToWatchAndTabletFilters
}

fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
return nil, fmt.Errorf("failed to parse tablet_filters value %q: %v", strings.Join(tabletFilters, ","), err)
}
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
return filters, nil
}

// NewHealthCheck creates a new HealthCheck object.
// Parameters:
// retryDelay.
Expand All @@ -322,10 +347,14 @@ type HealthCheckImpl struct {
//
// The localCell for this healthcheck
//
// callback.
// cellsToWatch.
//
// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl {
// Is a list of cells to watch for tablets.
//
// filters.
//
// Is one or more filters to apply when determining what tablets we want to stream healthchecks from.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl {
log.Infof("loading tablets for cells: %v", cellsToWatch)

hc := &HealthCheckImpl{
Expand All @@ -348,27 +377,10 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
}

for _, c := range cells {
var filters TabletFilters
log.Infof("Setting up healthcheck for cell: %v", c)
if c == "" {
continue
}
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}

fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
}

Expand Down
79 changes: 75 additions & 4 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,77 @@ func init() {
refreshInterval = time.Minute
}

func TestNewVTGateHealthCheckFilters(t *testing.T) {
defer func() {
KeyspacesToWatch = nil
tabletFilters = nil
tabletFilterTags = nil
}()

testCases := []struct {
name string
keyspacesToWatch []string
tabletFilters []string
tabletFilterTags map[string]string
expectedError string
expectedFilterTypes []any
}{
{
name: "noFilters",
},
{
name: "tabletFilters",
tabletFilters: []string{"ks1|-80"},
expectedFilterTypes: []any{&FilterByShard{}},
},
{
name: "keyspacesToWatch",
keyspacesToWatch: []string{"ks1"},
expectedFilterTypes: []any{&FilterByKeyspace{}},
},
{
name: "tabletFiltersAndTags",
tabletFilters: []string{"ks1|-80"},
tabletFilterTags: map[string]string{"test": "true"},
expectedFilterTypes: []any{&FilterByShard{}, &FilterByTabletTags{}},
},
{
name: "keyspacesToWatchAndTags",
tabletFilterTags: map[string]string{"test": "true"},
keyspacesToWatch: []string{"ks1"},
expectedFilterTypes: []any{&FilterByKeyspace{}, &FilterByTabletTags{}},
},
{
name: "failKeyspacesToWatchAndFilters",
tabletFilters: []string{"ks1|-80"},
keyspacesToWatch: []string{"ks1"},
expectedError: errKeyspacesToWatchAndTabletFilters.Error(),
},
{
name: "failInvalidTabletFilters",
tabletFilters: []string{"shouldfail|"},
expectedError: "failed to parse tablet_filters value \"shouldfail|\": error parsing shard name : Code: INVALID_ARGUMENT\nempty name\n",
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
KeyspacesToWatch = testCase.keyspacesToWatch
tabletFilters = testCase.tabletFilters
tabletFilterTags = testCase.tabletFilterTags

filters, err := NewVTGateHealthCheckFilters()
if testCase.expectedError != "" {
assert.EqualError(t, err, testCase.expectedError)
}
assert.Len(t, filters, len(testCase.expectedFilterTypes))
for i, filter := range filters {
assert.IsType(t, testCase.expectedFilterTypes[i], filter)
}
})
}
}

func TestHealthCheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)
// reset error counters
Expand Down Expand Up @@ -1100,7 +1171,7 @@ func TestPrimaryInOtherCell(t *testing.T) {

ts := memorytopo.NewServer(ctx, "cell1", "cell2")
defer ts.Close()
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

// add a tablet as primary in different cell
Expand Down Expand Up @@ -1160,7 +1231,7 @@ func TestReplicaInOtherCell(t *testing.T) {

ts := memorytopo.NewServer(ctx, "cell1", "cell2")
defer ts.Close()
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

// add a tablet as replica
Expand Down Expand Up @@ -1265,7 +1336,7 @@ func TestCellAliases(t *testing.T) {

ts := memorytopo.NewServer(ctx, "cell1", "cell2")
defer ts.Close()
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

cellsAlias := &topodatapb.CellsAlias{
Expand Down Expand Up @@ -1416,7 +1487,7 @@ func tabletDialer(ctx context.Context, tablet *topodatapb.Tablet, _ grpcclient.F
}

func createTestHc(ctx context.Context, ts *topo.Server) *HealthCheckImpl {
return NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell", "")
return NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell", "", nil)
}

type fakeConn struct {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/discovery/keyspace_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) {
factory.AddCell(cell)
ts := faketopo.NewFakeTopoServer(ctx, factory)
ts2 := &fakeTopoServer{}
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "")
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "", nil)
defer hc.Close()
kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell)
kss := &keyspaceState{
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestKeyspaceEventTypes(t *testing.T) {
factory.AddCell(cell)
ts := faketopo.NewFakeTopoServer(ctx, factory)
ts2 := &fakeTopoServer{}
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "")
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "", nil)
defer hc.Close()
kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell)

Expand Down
36 changes: 30 additions & 6 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
defer ts.Close()
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
filter := NewFilterByKeyspace([]string{"keyspace"})
logger := logutil.NewMemoryLogger()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5)
tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5)

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)
Expand Down Expand Up @@ -172,10 +173,31 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
require.NoError(t, ts.CreateTablet(context.Background(), tablet2), "CreateTablet failed for %v", tablet2.Alias)
tw.loadTablets()

// Confirm second tablet triggers ListTablets + AddTablet calls.
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
checkChecksum(t, tw, 2762153755)

// Check the new tablet is returned by GetAllTablets().
// Add a third tablet in a filtered keyspace to the topology.
tablet3 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 3,
},
Hostname: "host3",
PortMap: map[string]int32{
"vt": 789,
},
Keyspace: "excluded",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(context.Background(), tablet3), "CreateTablet failed for %v", tablet3.Alias)
tw.loadTablets()

// Confirm filtered tablet did not trigger an AddTablet call.
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 0})
checkChecksum(t, tw, 3177315266)

// Check the second tablet is returned by GetAllTablets(). This should not contain the filtered tablet.
allTablets = fhc.GetAllTablets()
key = TabletToMapKey(tablet2)
assert.Len(t, allTablets, 2)
Expand Down Expand Up @@ -207,14 +229,14 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
assert.Contains(t, allTablets, key)
assert.True(t, proto.Equal(tablet, allTablets[key]))
assert.NotContains(t, allTablets, origKey)
checkChecksum(t, tw, 2762153755)
checkChecksum(t, tw, 3177315266)
} else {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 0})
assert.Len(t, allTablets, 2)
assert.Contains(t, allTablets, origKey)
assert.True(t, proto.Equal(origTablet, allTablets[origKey]))
assert.NotContains(t, allTablets, key)
checkChecksum(t, tw, 2762153755)
checkChecksum(t, tw, 3177315266)
}

// Both tablets restart on different hosts.
Expand Down Expand Up @@ -269,7 +291,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
require.Nil(t, err, "FixShardReplication failed")
tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1})
checkChecksum(t, tw, 789108290)
checkChecksum(t, tw, 852159264)

allTablets = fhc.GetAllTablets()
assert.Len(t, allTablets, 1)
Expand All @@ -280,8 +302,10 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
assert.Contains(t, allTablets, key)
assert.True(t, proto.Equal(tablet2, allTablets[key]))

// Remove the other and check that it is detected as being gone.
// Remove the other tablets and check that it is detected as being gone.
// Deleting the filtered tablet should not trigger a RemoveTablet call.
require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias))
require.NoError(t, ts.DeleteTablet(context.Background(), tablet3.Alias))
_, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard")
require.Nil(t, err, "FixShardReplication failed")
tw.loadTablets()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/throttler/demo/throttler_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func newClient(ctx context.Context, primary *primary, replica *replica, ts *topo
log.Fatal(err)
}

healthCheck := discovery.NewHealthCheck(ctx, 5*time.Second, 1*time.Minute, ts, "cell1", "")
healthCheck := discovery.NewHealthCheck(ctx, 5*time.Second, 1*time.Minute, ts, "cell1", "", nil)
c := &client{
primary: primary,
healthCheck: healthCheck,
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ type TabletGateway struct {
}

func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck {
return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch)
filters, err := discovery.NewVTGateHealthCheckFilters()
if err != nil {
log.Exit(err)
}
return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch, filters)
}

// NewTabletGateway creates and returns a new TabletGateway
Expand Down
Loading

0 comments on commit 4058966

Please sign in to comment.