Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Mar 4, 2024
1 parent d967187 commit 055e735
Show file tree
Hide file tree
Showing 2 changed files with 322 additions and 12 deletions.
30 changes: 22 additions & 8 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,17 +946,30 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string
if err != nil || len(shards) == 0 {
return false, err

Check warning on line 947 in go/vt/vtgate/vstream_manager.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/vstream_manager.go#L947

Added line #L947 was not covered by tests
}

// First check the typical case, where the vgtid shards match the serving shards.
// In that case it's NOT possible that an applicable reshard has happened because
// the vgtid contains shards that are all serving.
reshardPossible := false
ksShardGTIDs := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids))
for _, g := range vs.vgtid.ShardGtids {
if g.GetKeyspace() == keyspace {
// If we already have the correct (serving) shards in our vgtid then the
// keyspace MAY have an active Reshard workflow but the keyspace has not
// been resharded (meaning traffic has been switched in an active Reshard
// workflow) since we were last streaming.
if shards[g.GetShard()].GetIsPrimaryServing() {
return false, nil
}
ksShardGTIDs = append(ksShardGTIDs, g)
}
}
for _, s := range ksShardGTIDs {
if !shards[s.GetShard()].GetIsPrimaryServing() {
reshardPossible = true
break
}
}
log.Errorf("DEBUG: reshard possible: %v", reshardPossible)
if !reshardPossible {
return false, nil
}

// Now that we know there MAY have been an applicable reshard, let's make a
// definitive determination by looking at the shard keyranges.
for _, i := range shards {
for _, j := range shards {
if i.ShardName() == j.ShardName() && key.KeyRangeEqual(i.GetKeyRange(), j.GetKeyRange()) {
Expand All @@ -965,10 +978,11 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string
}
if key.KeyRangeIntersect(i.GetKeyRange(), j.GetKeyRange()) {
// We have different shards with overlapping keyranges so we know
// that a reshard has occurred.
// that a reshard has happened.
return true, nil
}
}
}

return false, nil

Check warning on line 987 in go/vt/vtgate/vstream_manager.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/vstream_manager.go#L987

Added line #L987 was not covered by tests
}
304 changes: 300 additions & 4 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
"testing"
"time"

"google.golang.org/protobuf/proto"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
Expand All @@ -41,8 +41,6 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"

"vitess.io/vitess/go/test/utils"
)

var mu sync.Mutex
Expand Down Expand Up @@ -1279,6 +1277,304 @@ func TestVStreamIdleHeartbeat(t *testing.T) {
}
}

func TestKeyspaceHasBeenSharded(t *testing.T) {
ctx := utils.LeakCheckContext(t)

cell := "zone1"
ks := "testks"

type testcase struct {
name string
oldshards []string
newshards []string
vgtid *binlogdatapb.VGtid
trafficSwitched bool
want bool
wantErr string
}
testcases := []testcase{
{
name: "2 to 4, split both, traffic not switched",
oldshards: []string{
"-80",
"80-",
},
newshards: []string{
"-40",
"40-80",
"80-c0",
"c0-",
},
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{
Keyspace: ks,
Shard: "-80",
},
{
Keyspace: ks,
Shard: "80-",
},
},
},
trafficSwitched: false,
want: false,
},
{
name: "2 to 4, split both, traffic not switched",
oldshards: []string{
"-80",
"80-",
},
newshards: []string{
"-40",
"40-80",
"80-c0",
"c0-",
},
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{
Keyspace: ks,
Shard: "-80",
},
{
Keyspace: ks,
Shard: "80-",
},
},
},
trafficSwitched: false,
want: false,
},
{
name: "2 to 8, split both, traffic switched",
oldshards: []string{
"-80",
"80-",
},
newshards: []string{
"-20",
"20-40",
"40-60",
"60-80",
"80-a0",
"a0-c0",
"c0-e0",
"e0-",
},
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{
Keyspace: ks,
Shard: "-80",
},
{
Keyspace: ks,
Shard: "80-",
},
},
},
trafficSwitched: true,
want: true,
},
{
name: "2 to 4, split only first shard, traffic switched",
oldshards: []string{
"-80",
"80-",
},
newshards: []string{
"-20",
"20-40",
"40-60",
"60-80",
// -80 is not being resharded
},
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{
Keyspace: ks,
Shard: "-80",
},
{
Keyspace: ks,
Shard: "80-",
},
},
},
trafficSwitched: true,
want: true,
},
{
name: "4 to 2, merge both shards, traffic switched",
oldshards: []string{
"-40",
"40-80",
"80-c0",
"c0-",
},
newshards: []string{
"-80",
"80-",
},
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{
Keyspace: ks,
Shard: "-40",
},
{
Keyspace: ks,
Shard: "40-80",
},
{
Keyspace: ks,
Shard: "80-c0",
},
{
Keyspace: ks,
Shard: "c0-",
},
},
},
trafficSwitched: true,
want: true,
},
{
name: "4 to 3, merge second half, traffic not switched",
oldshards: []string{
"-40",
"40-80",
"80-c0",
"c0-",
},
newshards: []string{
// -40, 40-80, and 80-c0 are not being resharded
"80-",
},
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{
Keyspace: ks,
Shard: "-40",
},
{
Keyspace: ks,
Shard: "40-80",
},
{
Keyspace: ks,
Shard: "80-c0",
},
{
Keyspace: ks,
Shard: "c0-",
},
},
},
trafficSwitched: false,
want: false,
},
{
name: "4 to 3, merge second half, traffic switched",
oldshards: []string{
"-40",
"40-80",
"80-c0",
"c0-",
},
newshards: []string{
// -40, 40-80, and 80-c0 are not being resharded
"80-",
},
vgtid: &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{
Keyspace: ks,
Shard: "-40",
},
{
Keyspace: ks,
Shard: "40-80",
},
{
Keyspace: ks,
Shard: "80-c0",
},
{
Keyspace: ks,
Shard: "c0-",
},
},
},
trafficSwitched: true,
want: true,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)
_ = createSandbox(ks)
st := getSandboxTopo(ctx, cell, ks, append(tc.oldshards, tc.newshards...))
vsm := newTestVStreamManager(ctx, hc, st, cell)
vs := vstream{
vgtid: tc.vgtid,
tabletType: topodatapb.TabletType_PRIMARY,
optCells: cell,
vsm: vsm,
ts: st.topoServer,
}
for i, shard := range tc.oldshards {
serving := true
if tc.trafficSwitched {
serving = false
}
tabletconn := hc.AddTestTablet(cell, fmt.Sprintf("1.1.0.%d", i), int32(1000+i), ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil)
err := st.topoServer.CreateTablet(ctx, tabletconn.Tablet())
require.NoError(t, err)
var alias *topodatapb.TabletAlias
if serving {
alias = tabletconn.Tablet().Alias
}
_, err = st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = alias
si.IsPrimaryServing = serving
return nil
})
require.NoError(t, err)
}
for i, shard := range tc.newshards {
serving := false
if tc.trafficSwitched {
serving = true
}
tabletconn := hc.AddTestTablet(cell, fmt.Sprintf("1.1.1.%d", i), int32(2000+i), ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil)
err := st.topoServer.CreateTablet(ctx, tabletconn.Tablet())
require.NoError(t, err)
var alias *topodatapb.TabletAlias
if serving {
alias = tabletconn.Tablet().Alias
}
_, err = st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = alias
si.IsPrimaryServing = serving
return nil
})
require.NoError(t, err)
}
got, err := vs.keyspaceHasBeenResharded(ctx, ks)
if tc.wantErr != "" {
require.EqualError(t, err, tc.wantErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tc.want, got)
})
}
}

func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager {
gw := NewTabletGateway(ctx, hc, serv, cell)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
Expand Down

0 comments on commit 055e735

Please sign in to comment.