Skip to content

Commit

Permalink
Fix v18 tablets removed from healthcheck when topo server GetTablet c…
Browse files Browse the repository at this point in the history
…all fails

Signed-off-by: Brendan Dougherty <[email protected]>
  • Loading branch information
brendar committed Nov 6, 2024
1 parent 17603cc commit 8c3d28d
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 2 deletions.
11 changes: 11 additions & 0 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ func (tw *TopologyWatcher) loadTablets() {
topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1)
<-tw.sem // Done; enable next request to run
if err != nil {
if !topo.IsErrType(err, topo.NoNode) {
// We failed to get the tablet, but it may still exist.
// We don't want this tablet to be removed from the tw.tablets map or the healthcheck,
// so we fill the gap in the newTablets map using the existing tablet.
tw.mu.Lock()
aliasStr := topoproto.TabletAliasString(alias)
if val, ok := tw.tablets[aliasStr]; ok {
newTablets[aliasStr] = val
}
tw.mu.Unlock()
}
topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1)
select {
case <-tw.ctx.Done():
Expand Down
143 changes: 143 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package discovery

import (
"context"
"errors"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -630,3 +631,145 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {

tw.Stop()
}

func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

ts, factory := memorytopo.NewServerAndFactory(ctx, "aa")
defer ts.Close()
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5)
defer tw.Stop()

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)

// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": 123,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias)

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check the tablet is returned by GetAllTablets().
allTablets := fhc.GetAllTablets()
key1 := TabletToMapKey(tablet1)
assert.Len(t, allTablets, 1)
assert.Contains(t, allTablets, key1)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))

// Add a second tablet to the topology.
tablet2 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 2,
},
Hostname: "host2",
PortMap: map[string]int32{
"vt": 789,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet2), "CreateTablet failed for %v", tablet2.Alias)

// Cause the Get for the first tablet to fail.
factory.AddOperationError(memorytopo.Get, "tablets/aa-0000000000/Tablet", errors.New("fake error"))

// Ensure that a topo GetTablet error fails. If not, the rest of this test is invalid.
_, err := ts.GetTablet(ctx, tablet1.Alias)
require.ErrorContains(t, err, "fake error")

// Now force the error during loadTablets.
tw.loadTablets()
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1})
checkChecksum(t, tw, 2762153755)

// Ensure the first tablet is still returned by GetAllTablets() and the second tablet has been added.
allTablets = fhc.GetAllTablets()
key2 := TabletToMapKey(tablet2)
assert.Len(t, allTablets, 2)
assert.Contains(t, allTablets, key1)
assert.Contains(t, allTablets, key2)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))
assert.True(t, proto.Equal(tablet2, allTablets[key2]))
}

func TestGetTabletNoNodeErrorRemovesFromHealthcheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

ts, factory := memorytopo.NewServerAndFactory(ctx, "aa")
defer ts.Close()
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5)
defer tw.Stop()

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)

// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": 123,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias)

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check the tablet is returned by GetAllTablets().
allTablets := fhc.GetAllTablets()
key1 := TabletToMapKey(tablet1)
assert.Len(t, allTablets, 1)
assert.Contains(t, allTablets, key1)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))

// Cause the Get for the tablet to fail with a NoNode error. This simulates a race condition where
// the tablet is removed from the topo after the ListTablets call but before the GetTablet call.
factory.AddOperationError(
memorytopo.Get,
"tablets/aa-0000000000/Tablet",
topo.NewError(topo.NoNode, "tablets/aa-0000000000/Tablet"),
)

// Ensure that a topo GetTablet error fails. If not, the rest of this test is invalid.
_, err := ts.GetTablet(ctx, tablet1.Alias)
require.Error(t, err)
require.True(t, topo.IsErrType(err, topo.NoNode))

// Now force the error during loadTablets.
tw.loadTablets()
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1})
checkChecksum(t, tw, 0)

// Ensure the tablet is no longer returned by GetAllTablets()
allTablets = fhc.GetAllTablets()
assert.Len(t, allTablets, 0)
}
3 changes: 3 additions & 0 deletions go/vt/topo/memorytopo/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.D
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(ListDir, dirPath); err != nil {
return nil, err
}

isRoot := false
if dirPath == "" || dirPath == "/" {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation
c.factory.mu.Lock()
defer c.factory.mu.Unlock()

if err := c.factory.getOperationError(NewLeaderParticipation, id); err != nil {
return nil, err
}

// Make sure the global path exists.
electionPath := path.Join(electionsPath, name)
if n := c.factory.getOrCreatePath(c.cell, electionPath); n == nil {
Expand Down
15 changes: 15 additions & 0 deletions go/vt/topo/memorytopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(Create, filePath); err != nil {
return nil, err
}

// Get the parent dir.
dir, file := path.Split(filePath)
Expand Down Expand Up @@ -88,6 +91,9 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(Update, filePath); err != nil {
return nil, err
}

// Get the parent dir, we'll need it in case of creation.
dir, file := path.Split(filePath)
Expand Down Expand Up @@ -162,6 +168,9 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version,
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(Get, filePath); err != nil {
return nil, nil, err
}

// Get the node.
n := c.factory.nodeByPath(c.cell, filePath)
Expand All @@ -187,6 +196,9 @@ func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo,
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(List, filePathPrefix); err != nil {
return nil, err
}

dir, file := path.Split(filePathPrefix)
// Get the node to list.
Expand Down Expand Up @@ -246,6 +258,9 @@ func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version
if c.factory.err != nil {
return c.factory.err
}
if err := c.factory.getOperationError(Delete, filePath); err != nil {
return err
}

// Get the parent dir.
dir, file := path.Split(filePath)
Expand Down
14 changes: 14 additions & 0 deletions go/vt/topo/memorytopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,25 @@ type memoryTopoLockDescriptor struct {

// TryLock is part of the topo.Conn interface. Its implementation is same as Lock
func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
c.factory.mu.Lock()
err := c.factory.getOperationError(TryLock, dirPath)
c.factory.mu.Unlock()
if err != nil {
return nil, err
}

return c.Lock(ctx, dirPath, contents)
}

// Lock is part of the topo.Conn interface.
func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
c.factory.mu.Lock()
err := c.factory.getOperationError(Lock, dirPath)
c.factory.mu.Unlock()
if err != nil {
return nil, err
}

return c.lock(ctx, dirPath, contents)
}

Expand Down
55 changes: 53 additions & 2 deletions go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"errors"
"math/rand"
"regexp"
"strings"
"sync"
"sync/atomic"
Expand All @@ -49,6 +50,25 @@ const (
UnreachableServerAddr = "unreachable"
)

// Operation is one of the operations defined by topo.Conn
type Operation int

// The following is the list of topo.Conn operations
const (
ListDir = Operation(iota)
Create
Update
Get
List
Delete
Lock
TryLock
Watch
WatchRecursive
NewLeaderParticipation
Close
)

// Factory is a memory-based implementation of topo.Factory. It
// takes a file-system like approach, with directories at each level
// being an actual directory node. This is meant to be closer to
Expand All @@ -71,6 +91,15 @@ type Factory struct {
// err is used for testing purposes to force queries / watches
// to return the given error
err error
// operationErrors is used for testing purposes to fake errors from
// operations and paths matching the spec
operationErrors map[Operation][]errorSpec
}

type errorSpec struct {
op Operation
pathPattern *regexp.Regexp
err error
}

// HasGlobalReadOnlyCell is part of the topo.Factory interface.
Expand Down Expand Up @@ -236,8 +265,9 @@ func (n *node) PropagateWatchError(err error) {
// in case of a problem.
func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *Factory) {
f := &Factory{
cells: make(map[string]*node),
generation: uint64(rand.Int63n(1 << 60)),
cells: make(map[string]*node),
generation: uint64(rand.Int63n(1 << 60)),
operationErrors: make(map[Operation][]errorSpec),
}
f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil)

Expand Down Expand Up @@ -349,3 +379,24 @@ func (f *Factory) recursiveDelete(n *node) {
f.recursiveDelete(parent)
}
}

func (f *Factory) AddOperationError(op Operation, pathPattern string, err error) {
f.mu.Lock()
defer f.mu.Unlock()

f.operationErrors[op] = append(f.operationErrors[op], errorSpec{
op: op,
pathPattern: regexp.MustCompile(pathPattern),
err: err,
})
}

func (f *Factory) getOperationError(op Operation, path string) error {
specs := f.operationErrors[op]
for _, spec := range specs {
if spec.pathPattern.MatchString(path) {
return spec.err
}
}
return nil
}
6 changes: 6 additions & 0 deletions go/vt/topo/memorytopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(Watch, filePath); err != nil {
return nil, nil, err
}

n := c.factory.nodeByPath(c.cell, filePath)
if n == nil {
Expand Down Expand Up @@ -85,6 +88,9 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(WatchRecursive, dirpath); err != nil {
return nil, nil, err
}

n := c.factory.getOrCreatePath(c.cell, dirpath)
if n == nil {
Expand Down

0 comments on commit 8c3d28d

Please sign in to comment.