diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 5a73052b58a..900042e4150 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -946,17 +946,30 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string if err != nil || len(shards) == 0 { return false, err } + + // First check the typical case, where the vgtid shards match the serving shards. + // In that case it's NOT possible that an applicable reshard has happened because + // the vgtid contains shards that are all serving. + reshardPossible := false + ksShardGTIDs := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids)) for _, g := range vs.vgtid.ShardGtids { if g.GetKeyspace() == keyspace { - // If we already have the correct (serving) shards in our vgtid then the - // keyspace MAY have an active Reshard workflow but the keyspace has not - // been resharded (meaning traffic has been switched in an active Reshard - // workflow) since we were last streaming. - if shards[g.GetShard()].GetIsPrimaryServing() { - return false, nil - } + ksShardGTIDs = append(ksShardGTIDs, g) + } + } + for _, s := range ksShardGTIDs { + if !shards[s.GetShard()].GetIsPrimaryServing() { + reshardPossible = true + break } } + log.Errorf("DEBUG: reshard possible: %v", reshardPossible) + if !reshardPossible { + return false, nil + } + + // Now that we know there MAY have been an applicable reshard, let's make a + // definitive determination by looking at the shard keyranges. for _, i := range shards { for _, j := range shards { if i.ShardName() == j.ShardName() && key.KeyRangeEqual(i.GetKeyRange(), j.GetKeyRange()) { @@ -965,10 +978,11 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string } if key.KeyRangeIntersect(i.GetKeyRange(), j.GetKeyRange()) { // We have different shards with overlapping keyranges so we know - // that a reshard has occurred. + // that a reshard has happened. return true, nil } } } + return false, nil } diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 4c1e9ec6764..c402dfdce8f 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -25,12 +25,12 @@ import ( "testing" "time" - "google.golang.org/protobuf/proto" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" @@ -41,8 +41,6 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - - "vitess.io/vitess/go/test/utils" ) var mu sync.Mutex @@ -1279,6 +1277,304 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } } +func TestKeyspaceHasBeenSharded(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + cell := "zone1" + ks := "testks" + + type testcase struct { + name string + oldshards []string + newshards []string + vgtid *binlogdatapb.VGtid + trafficSwitched bool + want bool + wantErr string + } + testcases := []testcase{ + { + name: "2 to 4, split both, traffic not switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "2 to 4, split both, traffic not switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "2 to 8, split both, traffic switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-20", + "20-40", + "40-60", + "60-80", + "80-a0", + "a0-c0", + "c0-e0", + "e0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "2 to 4, split only first shard, traffic switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-20", + "20-40", + "40-60", + "60-80", + // -80 is not being resharded + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "4 to 2, merge both shards, traffic switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + "-80", + "80-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "4 to 3, merge second half, traffic not switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + // -40, 40-80, and 80-c0 are not being resharded + "80-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "4 to 3, merge second half, traffic switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + // -40, 40-80, and 80-c0 are not being resharded + "80-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + hc := discovery.NewFakeHealthCheck(nil) + _ = createSandbox(ks) + st := getSandboxTopo(ctx, cell, ks, append(tc.oldshards, tc.newshards...)) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vs := vstream{ + vgtid: tc.vgtid, + tabletType: topodatapb.TabletType_PRIMARY, + optCells: cell, + vsm: vsm, + ts: st.topoServer, + } + for i, shard := range tc.oldshards { + serving := true + if tc.trafficSwitched { + serving = false + } + tabletconn := hc.AddTestTablet(cell, fmt.Sprintf("1.1.0.%d", i), int32(1000+i), ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil) + err := st.topoServer.CreateTablet(ctx, tabletconn.Tablet()) + require.NoError(t, err) + var alias *topodatapb.TabletAlias + if serving { + alias = tabletconn.Tablet().Alias + } + _, err = st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = alias + si.IsPrimaryServing = serving + return nil + }) + require.NoError(t, err) + } + for i, shard := range tc.newshards { + serving := false + if tc.trafficSwitched { + serving = true + } + tabletconn := hc.AddTestTablet(cell, fmt.Sprintf("1.1.1.%d", i), int32(2000+i), ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil) + err := st.topoServer.CreateTablet(ctx, tabletconn.Tablet()) + require.NoError(t, err) + var alias *topodatapb.TabletAlias + if serving { + alias = tabletconn.Tablet().Alias + } + _, err = st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = alias + si.IsPrimaryServing = serving + return nil + }) + require.NoError(t, err) + } + got, err := vs.keyspaceHasBeenResharded(ctx, ks) + if tc.wantErr != "" { + require.EqualError(t, err, tc.wantErr) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, got) + }) + } +} + func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { gw := NewTabletGateway(ctx, hc, serv, cell) srvResolver := srvtopo.NewResolver(serv, gw, cell)