Skip to content

Commit

Permalink
Filter by keyspace earlier in tabletgateways WaitForTablets(...) (v…
Browse files Browse the repository at this point in the history
…itessio#15347)

Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt authored and arthurschreiber committed Nov 19, 2024
1 parent 381cf4d commit 24e0e37
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 94 deletions.
22 changes: 0 additions & 22 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,30 +788,8 @@ func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets
return hc.waitForTablets(ctx, targets, true)
}

// FilterTargetsByKeyspaces only returns the targets that are part of the provided keyspaces
func FilterTargetsByKeyspaces(keyspaces []string, targets []*query.Target) []*query.Target {
filteredTargets := make([]*query.Target, 0)

// Keep them all if there are no keyspaces to watch
if len(KeyspacesToWatch) == 0 {
return append(filteredTargets, targets...)
}

// Let's remove from the target shards that are not in the keyspaceToWatch list.
for _, target := range targets {
for _, keyspaceToWatch := range keyspaces {
if target.Keyspace == keyspaceToWatch {
filteredTargets = append(filteredTargets, target)
}
}
}
return filteredTargets
}

// waitForTablets is the internal method that polls for tablets.
func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.Target, requireServing bool) error {
targets = FilterTargetsByKeyspaces(KeyspacesToWatch, targets)

for {
// We nil targets as we find them.
allPresent := true
Expand Down
21 changes: 0 additions & 21 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,27 +728,6 @@ func TestWaitForAllServingTablets(t *testing.T) {

err = hc.WaitForAllServingTablets(ctx, targets)
assert.NotNil(t, err, "error should not be nil (there are no tablets on this keyspace")

targets = []*querypb.Target{

{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
},
{
Keyspace: "newkeyspace",
Shard: tablet.Shard,
TabletType: tablet.Type,
},
}

KeyspacesToWatch = []string{tablet.Keyspace}

err = hc.WaitForAllServingTablets(ctx, targets)
assert.Nil(t, err, "error should be nil. Keyspace with no tablets is filtered")

KeyspacesToWatch = []string{}
}

// TestRemoveTablet tests the behavior when a tablet goes away.
Expand Down
19 changes: 11 additions & 8 deletions go/vt/srvtopo/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,23 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// FindAllTargets goes through all serving shards in the topology
// for the provided tablet types. It returns one Target object per
// keyspace / shard / matching TabletType.
func FindAllTargets(ctx context.Context, ts Server, cell string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) {
ksNames, err := ts.GetSrvKeyspaceNames(ctx, cell, true)
if err != nil {
return nil, err
// FindAllTargets goes through all serving shards in the topology for the provided keyspaces
// and tablet types. If no keyspaces are provided all available keyspaces in the topo are
// fetched. It returns one Target object per keyspace/shard/matching TabletType.
func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) {
var err error
if len(keyspaces) == 0 {
keyspaces, err = ts.GetSrvKeyspaceNames(ctx, cell, true)
if err != nil {
return nil, err
}
}

var targets []*querypb.Target
var wg sync.WaitGroup
var mu sync.Mutex
var errRecorder concurrency.AllErrorRecorder
for _, ksName := range ksNames {
for _, ksName := range keyspaces {
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
Expand Down
100 changes: 58 additions & 42 deletions go/vt/srvtopo/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package srvtopo

import (
"context"
"reflect"
"sort"
"testing"
"time"

"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/vt/topo/memorytopo"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -62,16 +63,12 @@ func TestFindAllTargets(t *testing.T) {
rs := NewResilientServer(ctx, ts, "TestFindAllKeyspaceShards")

// No keyspace / shards.
ks, err := FindAllTargets(ctx, rs, "cell1", []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(ks) > 0 {
t.Errorf("why did I get anything? %v", ks)
}
ks, err := FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
assert.NoError(t, err)
assert.Len(t, ks, 0)

// Add one.
if err := ts.UpdateSrvKeyspace(ctx, "cell1", "test_keyspace", &topodatapb.SrvKeyspace{
assert.NoError(t, ts.UpdateSrvKeyspace(ctx, "cell1", "test_keyspace", &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ServedType: topodatapb.TabletType_PRIMARY,
Expand All @@ -82,28 +79,34 @@ func TestFindAllTargets(t *testing.T) {
},
},
},
}); err != nil {
t.Fatalf("can't add srvKeyspace: %v", err)
}
}))

// Get it.
ks, err = FindAllTargets(ctx, rs, "cell1", []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(ks, []*querypb.Target{
ks, err = FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
assert.NoError(t, err)
assert.EqualValues(t, []*querypb.Target{
{
Cell: "cell1",
Keyspace: "test_keyspace",
Shard: "test_shard0",
TabletType: topodatapb.TabletType_PRIMARY,
},
}) {
t.Errorf("got wrong value: %v", ks)
}
}, ks)

// Get any keyspace.
ks, err = FindAllTargets(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
assert.NoError(t, err)
assert.EqualValues(t, []*querypb.Target{
{
Cell: "cell1",
Keyspace: "test_keyspace",
Shard: "test_shard0",
TabletType: topodatapb.TabletType_PRIMARY,
},
}, ks)

// Add another one.
if err := ts.UpdateSrvKeyspace(ctx, "cell1", "test_keyspace2", &topodatapb.SrvKeyspace{
assert.NoError(t, ts.UpdateSrvKeyspace(ctx, "cell1", "test_keyspace2", &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ServedType: topodatapb.TabletType_PRIMARY,
Expand All @@ -122,17 +125,13 @@ func TestFindAllTargets(t *testing.T) {
},
},
},
}); err != nil {
t.Fatalf("can't add srvKeyspace: %v", err)
}
}))

// Get it for all types.
ks, err = FindAllTargets(ctx, rs, "cell1", []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// Get it for any keyspace, all types.
ks, err = FindAllTargets(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
sort.Sort(TargetArray(ks))
if !reflect.DeepEqual(ks, []*querypb.Target{
assert.EqualValues(t, []*querypb.Target{
{
Cell: "cell1",
Keyspace: "test_keyspace",
Expand All @@ -151,23 +150,40 @@ func TestFindAllTargets(t *testing.T) {
Shard: "test_shard2",
TabletType: topodatapb.TabletType_REPLICA,
},
}) {
t.Errorf("got wrong value: %v", ks)
}
}, ks)

// Only get the REPLICA targets.
ks, err = FindAllTargets(ctx, rs, "cell1", []topodatapb.TabletType{topodatapb.TabletType_REPLICA})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(ks, []*querypb.Target{
// Only get 1 keyspace for all types.
ks, err = FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace2"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
assert.EqualValues(t, []*querypb.Target{
{
Cell: "cell1",
Keyspace: "test_keyspace2",
Shard: "test_shard1",
TabletType: topodatapb.TabletType_PRIMARY,
},
{
Cell: "cell1",
Keyspace: "test_keyspace2",
Shard: "test_shard2",
TabletType: topodatapb.TabletType_REPLICA,
},
}) {
t.Errorf("got wrong value: %v", ks)
}
}, ks)

// Only get the REPLICA targets for any keyspace.
ks, err = FindAllTargets(ctx, rs, "cell1", []string{}, []topodatapb.TabletType{topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
assert.Equal(t, []*querypb.Target{
{
Cell: "cell1",
Keyspace: "test_keyspace2",
Shard: "test_shard2",
TabletType: topodatapb.TabletType_REPLICA,
},
}, ks)

// Get non-existent keyspace.
ks, err = FindAllTargets(ctx, rs, "cell1", []string{"doesnt-exist"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA})
assert.NoError(t, err)
assert.Len(t, ks, 0)
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (gw *TabletGateway) WaitForTablets(ctx context.Context, tabletTypesToWait [
}

// Finds the targets to look for.
targets, err := srvtopo.FindAllTargets(ctx, gw.srvTopoServer, gw.localCell, tabletTypesToWait)
targets, err := srvtopo.FindAllTargets(ctx, gw.srvTopoServer, gw.localCell, discovery.KeyspacesToWatch, tabletTypesToWait)
if err != nil {
return err
}
Expand Down

0 comments on commit 24e0e37

Please sign in to comment.