Skip to content

Commit

Permalink
discovery: Fix tablets removed from healthcheck when topo server GetT…
Browse files Browse the repository at this point in the history
…ablet call fails (#15633)

Signed-off-by: Brendan Dougherty <[email protected]>
  • Loading branch information
brendar authored Apr 8, 2024
1 parent cc0748d commit 4caa8d5
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 15 deletions.
14 changes: 14 additions & 0 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (tw *TopologyWatcher) Stop() {

func (tw *TopologyWatcher) loadTablets() {
newTablets := make(map[string]*tabletInfo)
var partialResult bool

// First get the list of all tablets.
tabletInfos, err := tw.getTablets()
Expand All @@ -152,6 +153,7 @@ func (tw *TopologyWatcher) loadTablets() {
// If we get a partial result error, we just log it and process the tablets that we did manage to fetch.
if topo.IsErrType(err, topo.PartialResult) {
log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err)
partialResult = true
} else { // For all other errors, just return.
log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err)
return
Expand Down Expand Up @@ -183,6 +185,18 @@ func (tw *TopologyWatcher) loadTablets() {
}
}

if partialResult {
// We don't want to remove any tablets from the tablets map or the healthcheck if we got a partial result
// because we don't know if they were actually deleted or if we simply failed to fetch them.
// Fill any gaps in the newTablets map using the existing tablets.
for alias, val := range tw.tablets {
if _, ok := newTablets[alias]; !ok {
tabletAliasStrs = append(tabletAliasStrs, alias)
newTablets[alias] = val
}
}
}

for alias, newVal := range newTablets {
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) {
continue
Expand Down
82 changes: 82 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package discovery

import (
"context"
"errors"
"math/rand/v2"
"testing"
"time"
Expand Down Expand Up @@ -576,3 +577,84 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {

tw.Stop()
}

func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

ts, factory := memorytopo.NewServerAndFactory(ctx, "aa")
defer ts.Close()
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5)
defer tw.Stop()

// Force fallback to getting tablets individually.
factory.AddOperationError(memorytopo.List, ".*", topo.NewError(topo.NoImplementation, "List not supported"))

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)

// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": 123,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias)

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check the tablet is returned by GetAllTablets().
allTablets := fhc.GetAllTablets()
key1 := TabletToMapKey(tablet1)
assert.Len(t, allTablets, 1)
assert.Contains(t, allTablets, key1)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))

// Add a second tablet to the topology.
tablet2 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 2,
},
Hostname: "host2",
PortMap: map[string]int32{
"vt": 789,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet2), "CreateTablet failed for %v", tablet2.Alias)

// Cause the Get for the first tablet to fail.
factory.AddOperationError(memorytopo.Get, "tablets/aa-0000000000/Tablet", errors.New("fake error"))

// Ensure that a topo Get error results in a partial results error. If not, the rest of this test is invalid.
_, err := ts.GetTabletsByCell(ctx, "aa", &topo.GetTabletsByCellOptions{})
require.ErrorContains(t, err, "partial result")

// Now force the error during loadTablets.
tw.loadTablets()
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1})
checkChecksum(t, tw, 2762153755)

// Ensure the first tablet is still returned by GetAllTablets() and the second tablet has been added.
allTablets = fhc.GetAllTablets()
key2 := TabletToMapKey(tablet2)
assert.Len(t, allTablets, 2)
assert.Contains(t, allTablets, key1)
assert.Contains(t, allTablets, key2)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))
assert.True(t, proto.Equal(tablet2, allTablets[key2]))
}
2 changes: 1 addition & 1 deletion go/vt/topo/keyspace_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestServerGetServingShards(t *testing.T) {
require.NotNil(t, stats)

if tt.fallback {
factory.SetListError(errNoListImpl)
factory.AddOperationError(memorytopo.List, ".*", errNoListImpl)
}

err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})
Expand Down
3 changes: 3 additions & 0 deletions go/vt/topo/memorytopo/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.D
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(ListDir, dirPath); err != nil {
return nil, err
}

isRoot := false
if dirPath == "" || dirPath == "/" {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation
c.factory.mu.Lock()
defer c.factory.mu.Unlock()

if err := c.factory.getOperationError(NewLeaderParticipation, id); err != nil {
return nil, err
}

// Make sure the global path exists.
electionPath := path.Join(electionsPath, name)
if n := c.factory.getOrCreatePath(c.cell, electionPath); n == nil {
Expand Down
16 changes: 14 additions & 2 deletions go/vt/topo/memorytopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(Create, filePath); err != nil {
return nil, err
}

// Get the parent dir.
dir, file := path.Split(filePath)
Expand Down Expand Up @@ -92,6 +95,9 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(Update, filePath); err != nil {
return nil, err
}

// Get the parent dir, we'll need it in case of creation.
dir, file := path.Split(filePath)
Expand Down Expand Up @@ -168,6 +174,9 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version,
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(Get, filePath); err != nil {
return nil, nil, err
}

// Get the node.
n := c.factory.nodeByPath(c.cell, filePath)
Expand Down Expand Up @@ -195,8 +204,8 @@ func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo,
if c.factory.err != nil {
return nil, c.factory.err
}
if c.factory.listErr != nil {
return nil, c.factory.listErr
if err := c.factory.getOperationError(List, filePathPrefix); err != nil {
return nil, err
}

dir, file := path.Split(filePathPrefix)
Expand Down Expand Up @@ -259,6 +268,9 @@ func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version
if c.factory.err != nil {
return c.factory.err
}
if err := c.factory.getOperationError(Delete, filePath); err != nil {
return err
}

// Get the parent dir.
dir, file := path.Split(filePath)
Expand Down
14 changes: 14 additions & 0 deletions go/vt/topo/memorytopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,27 @@ type memoryTopoLockDescriptor struct {
func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
c.factory.callstats.Add([]string{"TryLock"}, 1)

c.factory.mu.Lock()
err := c.factory.getOperationError(TryLock, dirPath)
c.factory.mu.Unlock()
if err != nil {
return nil, err
}

return c.Lock(ctx, dirPath, contents)
}

// Lock is part of the topo.Conn interface.
func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
c.factory.callstats.Add([]string{"Lock"}, 1)

c.factory.mu.Lock()
err := c.factory.getOperationError(Lock, dirPath)
c.factory.mu.Unlock()
if err != nil {
return nil, err
}

return c.lock(ctx, dirPath, contents)
}

Expand Down
57 changes: 49 additions & 8 deletions go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"errors"
"math/rand/v2"
"regexp"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -50,6 +51,25 @@ const (
UnreachableServerAddr = "unreachable"
)

// Operation is one of the operations defined by topo.Conn
type Operation int

// The following is the list of topo.Conn operations
const (
ListDir = Operation(iota)
Create
Update
Get
List
Delete
Lock
TryLock
Watch
WatchRecursive
NewLeaderParticipation
Close
)

// Factory is a memory-based implementation of topo.Factory. It
// takes a file-system like approach, with directories at each level
// being an actual directory node. This is meant to be closer to
Expand All @@ -72,14 +92,20 @@ type Factory struct {
// err is used for testing purposes to force queries / watches
// to return the given error
err error
// listErr is used for testing purposed to fake errors from
// calls to List.
listErr error
// operationErrors is used for testing purposes to fake errors from
// operations and paths matching the spec
operationErrors map[Operation][]errorSpec
// callstats allows us to keep track of how many topo.Conn calls
// we make (Create, Get, Update, Delete, List, ListDir, etc).
callstats *stats.CountersWithMultiLabels
}

type errorSpec struct {
op Operation
pathPattern *regexp.Regexp
err error
}

// HasGlobalReadOnlyCell is part of the topo.Factory interface.
func (f *Factory) HasGlobalReadOnlyCell(serverAddr, root string) bool {
return false
Expand Down Expand Up @@ -248,9 +274,10 @@ func (n *node) PropagateWatchError(err error) {
// in case of a problem.
func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *Factory) {
f := &Factory{
cells: make(map[string]*node),
generation: uint64(rand.Int64N(1 << 60)),
callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}),
cells: make(map[string]*node),
generation: uint64(rand.Int64N(1 << 60)),
callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}),
operationErrors: make(map[Operation][]errorSpec),
}
f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil)

Expand Down Expand Up @@ -363,9 +390,23 @@ func (f *Factory) recursiveDelete(n *node) {
}
}

func (f *Factory) SetListError(err error) {
func (f *Factory) AddOperationError(op Operation, pathPattern string, err error) {
f.mu.Lock()
defer f.mu.Unlock()

f.listErr = err
f.operationErrors[op] = append(f.operationErrors[op], errorSpec{
op: op,
pathPattern: regexp.MustCompile(pathPattern),
err: err,
})
}

func (f *Factory) getOperationError(op Operation, path string) error {
specs := f.operationErrors[op]
for _, spec := range specs {
if spec.pathPattern.MatchString(path) {
return spec.err
}
}
return nil
}
6 changes: 6 additions & 0 deletions go/vt/topo/memorytopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(Watch, filePath); err != nil {
return nil, nil, err
}

n := c.factory.nodeByPath(c.cell, filePath)
if n == nil {
Expand Down Expand Up @@ -89,6 +92,9 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(WatchRecursive, dirpath); err != nil {
return nil, nil, err
}

n := c.factory.getOrCreatePath(c.cell, dirpath)
if n == nil {
Expand Down
12 changes: 9 additions & 3 deletions go/vt/topo/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ type GetTabletsByCellOptions struct {

// GetTabletsByCell returns all the tablets in the cell.
// It returns ErrNoNode if the cell doesn't exist.
// It returns ErrPartialResult if some tablets couldn't be read. The results in the slice are incomplete.
// It returns (nil, nil) if the cell exists, but there are no tablets in it.
func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) {
// If the cell doesn't exist, this will return ErrNoNode.
Expand Down Expand Up @@ -277,6 +278,7 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G
// GetTabletsIndividuallyByCell returns a sorted list of tablets for topo servers that do not
// directly support the topoConn.List() functionality.
// It returns ErrNoNode if the cell doesn't exist.
// It returns ErrPartialResult if some tablets couldn't be read. The results in the slice are incomplete.
// It returns (nil, nil) if the cell exists, but there are no tablets in it.
func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) {
// If the cell doesn't exist, this will return ErrNoNode.
Expand All @@ -286,10 +288,14 @@ func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string,
}
sort.Sort(topoproto.TabletAliasList(aliases))

var partialResultErr error
tabletMap, err := ts.GetTabletMap(ctx, aliases, opt)
if err != nil {
// we got another error than topo.ErrNoNode
return nil, err
if IsErrType(err, PartialResult) {
partialResultErr = err
} else {
return nil, err
}
}
tablets := make([]*TabletInfo, 0, len(aliases))
for _, tabletAlias := range aliases {
Expand All @@ -303,7 +309,7 @@ func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string,
}
}

return tablets, nil
return tablets, partialResultErr
}

// UpdateTablet updates the tablet data only - not associated replication paths.
Expand Down
Loading

0 comments on commit 4caa8d5

Please sign in to comment.