Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix v18 tablets removed from healthcheck when topo server GetTablet call fails #201

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ func (tw *TopologyWatcher) loadTablets() {
topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1)
<-tw.sem // Done; enable next request to run
if err != nil {
if !topo.IsErrType(err, topo.NoNode) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the error be a NoNode even when it's a network partition?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the NoNode error means the path does not exist in zookeeper. We would get a NoNode error if the tablet was removed from the topo server between the call to get all the tablet aliases (tw.getTablets(tw)) and the call to get that tablet (tw.topoServer.GetTablet(tw.ctx, alias)). So we allow the tablet to be removed from the healthcheck on a NoNode error.

// We failed to get the tablet, but it may still exist.
// We don't want this tablet to be removed from the tw.tablets map or the healthcheck,
// so we fill the gap in the newTablets map using the existing tablet.
tw.mu.Lock()
aliasStr := topoproto.TabletAliasString(alias)
if val, ok := tw.tablets[aliasStr]; ok {
newTablets[aliasStr] = val
}
tw.mu.Unlock()
}
topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1)
select {
case <-tw.ctx.Done():
Expand Down
143 changes: 143 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package discovery

import (
"context"
"errors"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -630,3 +631,145 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {

tw.Stop()
}

func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

ts, factory := memorytopo.NewServerAndFactory(ctx, "aa")
defer ts.Close()
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5)
defer tw.Stop()

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)

// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": 123,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias)

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check the tablet is returned by GetAllTablets().
allTablets := fhc.GetAllTablets()
key1 := TabletToMapKey(tablet1)
assert.Len(t, allTablets, 1)
assert.Contains(t, allTablets, key1)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))

// Add a second tablet to the topology.
tablet2 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 2,
},
Hostname: "host2",
PortMap: map[string]int32{
"vt": 789,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet2), "CreateTablet failed for %v", tablet2.Alias)

// Cause the Get for the first tablet to fail.
factory.AddOperationError(memorytopo.Get, "tablets/aa-0000000000/Tablet", errors.New("fake error"))

// Ensure that a topo GetTablet error fails. If not, the rest of this test is invalid.
_, err := ts.GetTablet(ctx, tablet1.Alias)
require.ErrorContains(t, err, "fake error")

// Now force the error during loadTablets.
tw.loadTablets()
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1})
checkChecksum(t, tw, 2762153755)

// Ensure the first tablet is still returned by GetAllTablets() and the second tablet has been added.
allTablets = fhc.GetAllTablets()
key2 := TabletToMapKey(tablet2)
assert.Len(t, allTablets, 2)
assert.Contains(t, allTablets, key1)
assert.Contains(t, allTablets, key2)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))
assert.True(t, proto.Equal(tablet2, allTablets[key2]))
}

func TestGetTabletNoNodeErrorRemovesFromHealthcheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

ts, factory := memorytopo.NewServerAndFactory(ctx, "aa")
defer ts.Close()
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5)
defer tw.Stop()

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)

// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": 123,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias)

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check the tablet is returned by GetAllTablets().
allTablets := fhc.GetAllTablets()
key1 := TabletToMapKey(tablet1)
assert.Len(t, allTablets, 1)
assert.Contains(t, allTablets, key1)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))

// Cause the Get for the tablet to fail with a NoNode error. This simulates a race condition where
// the tablet is removed from the topo after the ListTablets call but before the GetTablet call.
factory.AddOperationError(
memorytopo.Get,
"tablets/aa-0000000000/Tablet",
topo.NewError(topo.NoNode, "tablets/aa-0000000000/Tablet"),
)

// Ensure that a topo GetTablet error fails. If not, the rest of this test is invalid.
_, err := ts.GetTablet(ctx, tablet1.Alias)
require.Error(t, err)
require.True(t, topo.IsErrType(err, topo.NoNode))

// Now force the error during loadTablets.
tw.loadTablets()
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1})
checkChecksum(t, tw, 0)

// Ensure the tablet is no longer returned by GetAllTablets()
allTablets = fhc.GetAllTablets()
assert.Len(t, allTablets, 0)
}
3 changes: 3 additions & 0 deletions go/vt/topo/memorytopo/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.D
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(ListDir, dirPath); err != nil {
return nil, err
}

isRoot := false
if dirPath == "" || dirPath == "/" {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation
c.factory.mu.Lock()
defer c.factory.mu.Unlock()

if err := c.factory.getOperationError(NewLeaderParticipation, id); err != nil {
return nil, err
}

// Make sure the global path exists.
electionPath := path.Join(electionsPath, name)
if n := c.factory.getOrCreatePath(c.cell, electionPath); n == nil {
Expand Down
15 changes: 15 additions & 0 deletions go/vt/topo/memorytopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(Create, filePath); err != nil {
return nil, err
}

// Get the parent dir.
dir, file := path.Split(filePath)
Expand Down Expand Up @@ -88,6 +91,9 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(Update, filePath); err != nil {
return nil, err
}

// Get the parent dir, we'll need it in case of creation.
dir, file := path.Split(filePath)
Expand Down Expand Up @@ -162,6 +168,9 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version,
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(Get, filePath); err != nil {
return nil, nil, err
}

// Get the node.
n := c.factory.nodeByPath(c.cell, filePath)
Expand All @@ -187,6 +196,9 @@ func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo,
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(List, filePathPrefix); err != nil {
return nil, err
}

dir, file := path.Split(filePathPrefix)
// Get the node to list.
Expand Down Expand Up @@ -246,6 +258,9 @@ func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version
if c.factory.err != nil {
return c.factory.err
}
if err := c.factory.getOperationError(Delete, filePath); err != nil {
return err
}

// Get the parent dir.
dir, file := path.Split(filePath)
Expand Down
14 changes: 14 additions & 0 deletions go/vt/topo/memorytopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,25 @@ type memoryTopoLockDescriptor struct {

// TryLock is part of the topo.Conn interface. Its implementation is same as Lock
func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
c.factory.mu.Lock()
err := c.factory.getOperationError(TryLock, dirPath)
c.factory.mu.Unlock()
if err != nil {
return nil, err
}

return c.Lock(ctx, dirPath, contents)
}

// Lock is part of the topo.Conn interface.
func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
c.factory.mu.Lock()
err := c.factory.getOperationError(Lock, dirPath)
c.factory.mu.Unlock()
if err != nil {
return nil, err
}

return c.lock(ctx, dirPath, contents)
}

Expand Down
55 changes: 53 additions & 2 deletions go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"errors"
"math/rand"
"regexp"
"strings"
"sync"
"sync/atomic"
Expand All @@ -49,6 +50,25 @@ const (
UnreachableServerAddr = "unreachable"
)

// Operation is one of the operations defined by topo.Conn
type Operation int

// The following is the list of topo.Conn operations
const (
ListDir = Operation(iota)
Create
Update
Get
List
Delete
Lock
TryLock
Watch
WatchRecursive
NewLeaderParticipation
Close
)

// Factory is a memory-based implementation of topo.Factory. It
// takes a file-system like approach, with directories at each level
// being an actual directory node. This is meant to be closer to
Expand All @@ -71,6 +91,15 @@ type Factory struct {
// err is used for testing purposes to force queries / watches
// to return the given error
err error
// operationErrors is used for testing purposes to fake errors from
// operations and paths matching the spec
operationErrors map[Operation][]errorSpec
}

type errorSpec struct {
op Operation
pathPattern *regexp.Regexp
err error
}

// HasGlobalReadOnlyCell is part of the topo.Factory interface.
Expand Down Expand Up @@ -236,8 +265,9 @@ func (n *node) PropagateWatchError(err error) {
// in case of a problem.
func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *Factory) {
f := &Factory{
cells: make(map[string]*node),
generation: uint64(rand.Int63n(1 << 60)),
cells: make(map[string]*node),
generation: uint64(rand.Int63n(1 << 60)),
operationErrors: make(map[Operation][]errorSpec),
}
f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil)

Expand Down Expand Up @@ -349,3 +379,24 @@ func (f *Factory) recursiveDelete(n *node) {
f.recursiveDelete(parent)
}
}

func (f *Factory) AddOperationError(op Operation, pathPattern string, err error) {
austenLacy marked this conversation as resolved.
Show resolved Hide resolved
f.mu.Lock()
defer f.mu.Unlock()

f.operationErrors[op] = append(f.operationErrors[op], errorSpec{
op: op,
pathPattern: regexp.MustCompile(pathPattern),
err: err,
})
}

func (f *Factory) getOperationError(op Operation, path string) error {
specs := f.operationErrors[op]
for _, spec := range specs {
if spec.pathPattern.MatchString(path) {
return spec.err
}
}
return nil
}
6 changes: 6 additions & 0 deletions go/vt/topo/memorytopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(Watch, filePath); err != nil {
return nil, nil, err
}

n := c.factory.nodeByPath(c.cell, filePath)
if n == nil {
Expand Down Expand Up @@ -85,6 +88,9 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(WatchRecursive, dirpath); err != nil {
return nil, nil, err
}

n := c.factory.getOrCreatePath(c.cell, dirpath)
if n == nil {
Expand Down
Loading