diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index d1bd2d3acf8..105741aaac1 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -190,6 +190,17 @@ func (tw *TopologyWatcher) loadTablets() { topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1) <-tw.sem // Done; enable next request to run if err != nil { + if !topo.IsErrType(err, topo.NoNode) { + // We failed to get the tablet, but it may still exist. + // We don't want this tablet to be removed from the tw.tablets map or the healthcheck, + // so we fill the gap in the newTablets map using the existing tablet. + tw.mu.Lock() + aliasStr := topoproto.TabletAliasString(alias) + if val, ok := tw.tablets[aliasStr]; ok { + newTablets[aliasStr] = val + } + tw.mu.Unlock() + } topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1) select { case <-tw.ctx.Done(): diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 3ac567acef8..254943b07bd 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -18,6 +18,7 @@ package discovery import ( "context" + "errors" "math/rand" "testing" "time" @@ -630,3 +631,145 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { tw.Stop() } + +func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + ts, factory := memorytopo.NewServerAndFactory(ctx, "aa") + defer ts.Close() + fhc := NewFakeHealthCheck(nil) + defer fhc.Close() + topologyWatcherOperations.ZeroAll() + counts := topologyWatcherOperations.Counts() + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5) + defer tw.Stop() + + counts = checkOpCounts(t, counts, map[string]int64{}) + checkChecksum(t, tw, 0) + + // Add a tablet to the topology. + tablet1 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 0, + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": 123, + }, + Keyspace: "keyspace", + Shard: "shard", + } + require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias) + + tw.loadTablets() + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) + checkChecksum(t, tw, 3238442862) + + // Check the tablet is returned by GetAllTablets(). + allTablets := fhc.GetAllTablets() + key1 := TabletToMapKey(tablet1) + assert.Len(t, allTablets, 1) + assert.Contains(t, allTablets, key1) + assert.True(t, proto.Equal(tablet1, allTablets[key1])) + + // Add a second tablet to the topology. + tablet2 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 2, + }, + Hostname: "host2", + PortMap: map[string]int32{ + "vt": 789, + }, + Keyspace: "keyspace", + Shard: "shard", + } + require.NoError(t, ts.CreateTablet(ctx, tablet2), "CreateTablet failed for %v", tablet2.Alias) + + // Cause the Get for the first tablet to fail. + factory.AddOperationError(memorytopo.Get, "tablets/aa-0000000000/Tablet", errors.New("fake error")) + + // Ensure that a topo GetTablet error fails. If not, the rest of this test is invalid. + _, err := ts.GetTablet(ctx, tablet1.Alias) + require.ErrorContains(t, err, "fake error") + + // Now force the error during loadTablets. + tw.loadTablets() + checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1}) + checkChecksum(t, tw, 2762153755) + + // Ensure the first tablet is still returned by GetAllTablets() and the second tablet has been added. + allTablets = fhc.GetAllTablets() + key2 := TabletToMapKey(tablet2) + assert.Len(t, allTablets, 2) + assert.Contains(t, allTablets, key1) + assert.Contains(t, allTablets, key2) + assert.True(t, proto.Equal(tablet1, allTablets[key1])) + assert.True(t, proto.Equal(tablet2, allTablets[key2])) +} + +func TestGetTabletNoNodeErrorRemovesFromHealthcheck(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + ts, factory := memorytopo.NewServerAndFactory(ctx, "aa") + defer ts.Close() + fhc := NewFakeHealthCheck(nil) + defer fhc.Close() + topologyWatcherOperations.ZeroAll() + counts := topologyWatcherOperations.Counts() + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5) + defer tw.Stop() + + counts = checkOpCounts(t, counts, map[string]int64{}) + checkChecksum(t, tw, 0) + + // Add a tablet to the topology. + tablet1 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 0, + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": 123, + }, + Keyspace: "keyspace", + Shard: "shard", + } + require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias) + + tw.loadTablets() + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) + checkChecksum(t, tw, 3238442862) + + // Check the tablet is returned by GetAllTablets(). + allTablets := fhc.GetAllTablets() + key1 := TabletToMapKey(tablet1) + assert.Len(t, allTablets, 1) + assert.Contains(t, allTablets, key1) + assert.True(t, proto.Equal(tablet1, allTablets[key1])) + + // Cause the Get for the tablet to fail with a NoNode error. This simulates a race condition where + // the tablet is removed from the topo after the ListTablets call but before the GetTablet call. + factory.AddOperationError( + memorytopo.Get, + "tablets/aa-0000000000/Tablet", + topo.NewError(topo.NoNode, "tablets/aa-0000000000/Tablet"), + ) + + // Ensure that a topo GetTablet error fails. If not, the rest of this test is invalid. + _, err := ts.GetTablet(ctx, tablet1.Alias) + require.Error(t, err) + require.True(t, topo.IsErrType(err, topo.NoNode)) + + // Now force the error during loadTablets. + tw.loadTablets() + checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1}) + checkChecksum(t, tw, 0) + + // Ensure the tablet is no longer returned by GetAllTablets() + allTablets = fhc.GetAllTablets() + assert.Len(t, allTablets, 0) +} diff --git a/go/vt/topo/memorytopo/directory.go b/go/vt/topo/memorytopo/directory.go index f68c87a2166..a3e28999fb1 100644 --- a/go/vt/topo/memorytopo/directory.go +++ b/go/vt/topo/memorytopo/directory.go @@ -37,6 +37,9 @@ func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.D if c.factory.err != nil { return nil, c.factory.err } + if err := c.factory.getOperationError(ListDir, dirPath); err != nil { + return nil, err + } isRoot := false if dirPath == "" || dirPath == "/" { diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index ad173695099..a979dd306a5 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -33,6 +33,10 @@ func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation c.factory.mu.Lock() defer c.factory.mu.Unlock() + if err := c.factory.getOperationError(NewLeaderParticipation, id); err != nil { + return nil, err + } + // Make sure the global path exists. electionPath := path.Join(electionsPath, name) if n := c.factory.getOrCreatePath(c.cell, electionPath); n == nil { diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index 0007203799f..f114d1baf03 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -44,6 +44,9 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to if c.factory.err != nil { return nil, c.factory.err } + if err := c.factory.getOperationError(Create, filePath); err != nil { + return nil, err + } // Get the parent dir. dir, file := path.Split(filePath) @@ -88,6 +91,9 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver if c.factory.err != nil { return nil, c.factory.err } + if err := c.factory.getOperationError(Update, filePath); err != nil { + return nil, err + } // Get the parent dir, we'll need it in case of creation. dir, file := path.Split(filePath) @@ -162,6 +168,9 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, if c.factory.err != nil { return nil, nil, c.factory.err } + if err := c.factory.getOperationError(Get, filePath); err != nil { + return nil, nil, err + } // Get the node. n := c.factory.nodeByPath(c.cell, filePath) @@ -187,6 +196,9 @@ func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, if c.factory.err != nil { return nil, c.factory.err } + if err := c.factory.getOperationError(List, filePathPrefix); err != nil { + return nil, err + } dir, file := path.Split(filePathPrefix) // Get the node to list. @@ -246,6 +258,9 @@ func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version if c.factory.err != nil { return c.factory.err } + if err := c.factory.getOperationError(Delete, filePath); err != nil { + return err + } // Get the parent dir. dir, file := path.Split(filePath) diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index 5c2a2462495..106a158aad5 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -42,11 +42,25 @@ type memoryTopoLockDescriptor struct { // TryLock is part of the topo.Conn interface. Its implementation is same as Lock func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + c.factory.mu.Lock() + err := c.factory.getOperationError(TryLock, dirPath) + c.factory.mu.Unlock() + if err != nil { + return nil, err + } + return c.Lock(ctx, dirPath, contents) } // Lock is part of the topo.Conn interface. func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + c.factory.mu.Lock() + err := c.factory.getOperationError(Lock, dirPath) + c.factory.mu.Unlock() + if err != nil { + return nil, err + } + return c.lock(ctx, dirPath, contents) } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index b881be1b785..35c1f2fad49 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -23,6 +23,7 @@ import ( "context" "errors" "math/rand" + "regexp" "strings" "sync" "sync/atomic" @@ -49,6 +50,25 @@ const ( UnreachableServerAddr = "unreachable" ) +// Operation is one of the operations defined by topo.Conn +type Operation int + +// The following is the list of topo.Conn operations +const ( + ListDir = Operation(iota) + Create + Update + Get + List + Delete + Lock + TryLock + Watch + WatchRecursive + NewLeaderParticipation + Close +) + // Factory is a memory-based implementation of topo.Factory. It // takes a file-system like approach, with directories at each level // being an actual directory node. This is meant to be closer to @@ -71,6 +91,15 @@ type Factory struct { // err is used for testing purposes to force queries / watches // to return the given error err error + // operationErrors is used for testing purposes to fake errors from + // operations and paths matching the spec + operationErrors map[Operation][]errorSpec +} + +type errorSpec struct { + op Operation + pathPattern *regexp.Regexp + err error } // HasGlobalReadOnlyCell is part of the topo.Factory interface. @@ -236,8 +265,9 @@ func (n *node) PropagateWatchError(err error) { // in case of a problem. func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *Factory) { f := &Factory{ - cells: make(map[string]*node), - generation: uint64(rand.Int63n(1 << 60)), + cells: make(map[string]*node), + generation: uint64(rand.Int63n(1 << 60)), + operationErrors: make(map[Operation][]errorSpec), } f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil) @@ -349,3 +379,24 @@ func (f *Factory) recursiveDelete(n *node) { f.recursiveDelete(parent) } } + +func (f *Factory) AddOperationError(op Operation, pathPattern string, err error) { + f.mu.Lock() + defer f.mu.Unlock() + + f.operationErrors[op] = append(f.operationErrors[op], errorSpec{ + op: op, + pathPattern: regexp.MustCompile(pathPattern), + err: err, + }) +} + +func (f *Factory) getOperationError(op Operation, path string) error { + specs := f.operationErrors[op] + for _, spec := range specs { + if spec.pathPattern.MatchString(path) { + return spec.err + } + } + return nil +} diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 0f245c95b5f..f0760a7a773 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -35,6 +35,9 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c if c.factory.err != nil { return nil, nil, c.factory.err } + if err := c.factory.getOperationError(Watch, filePath); err != nil { + return nil, nil, err + } n := c.factory.nodeByPath(c.cell, filePath) if n == nil { @@ -85,6 +88,9 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc if c.factory.err != nil { return nil, nil, c.factory.err } + if err := c.factory.getOperationError(WatchRecursive, dirpath); err != nil { + return nil, nil, err + } n := c.factory.getOrCreatePath(c.cell, dirpath) if n == nil {