Skip to content

Commit

Permalink
feat: also wait for the keyspace event watcher to have seen the healt…
Browse files Browse the repository at this point in the history
…hcheck updates

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Aug 27, 2024
1 parent 5703208 commit 5d60cef
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 17 deletions.
3 changes: 3 additions & 0 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ var (
// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

// waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent.
waitConsistentKeyspacesCheck = 100 * time.Millisecond

// HealthCheckCacheTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Cache` title to create the
// HTML code required to render the cache of the HealthCheck.
HealthCheckCacheTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Cache")
Expand Down
39 changes: 39 additions & 0 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sync"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -703,3 +704,41 @@ func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string {
}
return servingKeyspaces
}

// WaitForConsistentKeyspaces waits for the given set of keyspaces to be marked consistent.
func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, keyspaces []string) error {
for {
// We empty keyspaces as we find them to be consistent.
allConsistent := true
for i, ks := range keyspaces {
if ks == "" {
continue
}

// Get the keyspace status and see it is consistent yet or not.
kss := kew.getKeyspaceStatus(ctx, ks)
if kss.consistent {
keyspaces[i] = ""
} else {
allConsistent = false
}
}

if allConsistent {
// all the keyspaces are consistent.
return nil
}

// Unblock after the sleep or when the context has expired.
select {
case <-ctx.Done():
for _, ks := range keyspaces {
if ks != "" {
log.Infof("keyspace %v didn't become consistent", ks)
}
}
return ctx.Err()
case <-time.After(waitConsistentKeyspacesCheck):
}
}
}
14 changes: 7 additions & 7 deletions go/vt/srvtopo/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ limitations under the License.
package srvtopo

import (
"sync"

"context"
"sync"

"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
Expand All @@ -29,15 +28,16 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// FindAllTargets goes through all serving shards in the topology for the provided keyspaces
// FindAllTargetsAndKeyspaces goes through all serving shards in the topology for the provided keyspaces
// and tablet types. If no keyspaces are provided all available keyspaces in the topo are
// fetched. It returns one Target object per keyspace/shard/matching TabletType.
func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) {
// It also returns all the keyspaces that it found.
func FindAllTargetsAndKeyspaces(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, []string, error) {
var err error
if len(keyspaces) == 0 {
keyspaces, err = ts.GetSrvKeyspaceNames(ctx, cell, true)
if err != nil {
return nil, err
return nil, nil, err
}
}

Expand Down Expand Up @@ -95,8 +95,8 @@ func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []str
}
wg.Wait()
if errRecorder.HasErrors() {
return nil, errRecorder.Error()
return nil, nil, errRecorder.Error()
}

return targets, nil
return targets, keyspaces, nil
}
16 changes: 8 additions & 8 deletions go/vt/srvtopo/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (a TargetArray) Less(i, j int) bool {
return a[i].TabletType < a[j].TabletType
}

func TestFindAllTargets(t *testing.T) {
func TestFindAllTargetsAndKeyspaces(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := memorytopo.NewServer(ctx, "cell1", "cell2")
Expand All @@ -65,7 +65,7 @@ func TestFindAllTargets(t *testing.T) {
rs := NewResilientServer(ctx, ts, counts)

// No keyspace / shards.
ks, err := FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
ks, _, err := FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
assert.NoError(t, err)
assert.Len(t, ks, 0)

Expand All @@ -84,7 +84,7 @@ func TestFindAllTargets(t *testing.T) {
}))

// Get it.
ks, err = FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
assert.NoError(t, err)
assert.EqualValues(t, []*querypb.Target{
{
Expand All @@ -96,7 +96,7 @@ func TestFindAllTargets(t *testing.T) {
}, ks)

// Get any keyspace.
ks, err = FindAllTargets(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
assert.NoError(t, err)
assert.EqualValues(t, []*querypb.Target{
{
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestFindAllTargets(t *testing.T) {
}))

// Get it for any keyspace, all types.
ks, err = FindAllTargets(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
sort.Sort(TargetArray(ks))
assert.EqualValues(t, []*querypb.Target{
Expand All @@ -155,7 +155,7 @@ func TestFindAllTargets(t *testing.T) {
}, ks)

// Only get 1 keyspace for all types.
ks, err = FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace2"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace2"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
assert.EqualValues(t, []*querypb.Target{
{
Expand All @@ -173,7 +173,7 @@ func TestFindAllTargets(t *testing.T) {
}, ks)

// Only get the REPLICA targets for any keyspace.
ks, err = FindAllTargets(ctx, rs, "cell1", []string{}, []topodatapb.TabletType{topodatapb.TabletType_REPLICA})
ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{}, []topodatapb.TabletType{topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
assert.Equal(t, []*querypb.Target{
{
Expand All @@ -185,7 +185,7 @@ func TestFindAllTargets(t *testing.T) {
}, ks)

// Get non-existent keyspace.
ks, err = FindAllTargets(ctx, rs, "cell1", []string{"doesnt-exist"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"doesnt-exist"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
assert.Len(t, ks, 0)
}
17 changes: 15 additions & 2 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,24 @@ func (gw *TabletGateway) WaitForTablets(ctx context.Context, tabletTypesToWait [
}

// Finds the targets to look for.
targets, err := srvtopo.FindAllTargets(ctx, gw.srvTopoServer, gw.localCell, discovery.KeyspacesToWatch, tabletTypesToWait)
targets, keyspaces, err := srvtopo.FindAllTargetsAndKeyspaces(ctx, gw.srvTopoServer, gw.localCell, discovery.KeyspacesToWatch, tabletTypesToWait)
if err != nil {
return err
}
return gw.hc.WaitForAllServingTablets(ctx, targets)
err = gw.hc.WaitForAllServingTablets(ctx, targets)
if err != nil {
return err
}
// After having waited for all serving tablets. We should also wait for the keyspace event watcher to have seen
// the updates and marked all the keyspaces as consistent.
// Otherwise, we could be in a situation where even though the healthchecks have arrived, the keyspace event watcher hasn't finished processing them.
// So, if a primary tablet goes non-serving (because of a PRS or some other reason), we won't be able to start buffering.
// Waiting for the keyspaces to become consistent ensures that all the primary tablets for all the shards should be serving as seen by the keyspace event watcher
// and any disruption from now on, will make sure we start buffering properly.
if gw.kev != nil {
return gw.kev.WaitForConsistentKeyspaces(ctx, keyspaces)
}
return nil
}

// Close shuts down underlying connections.
Expand Down

0 comments on commit 5d60cef

Please sign in to comment.