From d96718767e5bc57fdbce3581539fda2a079fb0f8 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 1 Mar 2024 13:51:22 -0500 Subject: [PATCH 01/10] Update tablet picker options if keyspace was resharded. Signed-off-by: Matt Lord --- examples/local/vstream_client.go | 4 +- go/vt/vtgate/vstream_manager.go | 69 +++++++++++++++++++++++++++----- 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/examples/local/vstream_client.go b/examples/local/vstream_client.go index 98d2129f898..45c33835181 100644 --- a/examples/local/vstream_client.go +++ b/examples/local/vstream_client.go @@ -38,7 +38,7 @@ import ( */ func main() { ctx := context.Background() - streamCustomer := true + streamCustomer := false var vgtid *binlogdatapb.VGtid if streamCustomer { vgtid = &binlogdatapb.VGtid{ @@ -56,7 +56,7 @@ func main() { } else { vgtid = &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: "commerce", + Keyspace: "customer", Shard: "0", Gtid: "", }}} diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 08553969a50..5a73052b58a 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/srvtopo" @@ -497,24 +498,40 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) + tpo := vs.tabletPickerOptions + resharded, err := vs.keyspaceHasBeenResharded(ctx, sgtid.Keyspace) if err != nil { - log.Errorf(err.Error()) - return err + return vterrors.Wrapf(err, "failed to determine if keyspace %s has been resharded", sgtid.Keyspace) + } + if resharded { + // The non-serving tablet in the old / non-serving shard will contain all of + // the GTIDs that we need before transitioning to the new shards along with + // the journal event that will then allow us to automatically transition to + // the new shards (provided the stop_on_reshard option is not set). + tpo.IncludeNonServingTablets = true } + tabletPickerErr := func(err error) error { + tperr := vterrors.Wrapf(err, "failed to find a %s tablet for VStream in %s/%s within the %s cell(s)", + vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + log.Errorf("%v", tperr) + return tperr + } + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.GetKeyspace(), sgtid.GetShard(), vs.tabletType.String(), tpo, ignoreTablets...) + if err != nil { + return tabletPickerErr(err) + } // Create a child context with a stricter timeout when picking a tablet. // This will prevent hanging in the case no tablets are found. tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout) defer tpCancel() - tablet, err := tp.PickForStreaming(tpCtx) if err != nil { - log.Errorf(err.Error()) - return err + return tabletPickerErr(err) } - log.Infof("Picked tablet %s for for %s/%s/%s/%s", tablet.Alias.String(), strings.Join(cells, ","), - sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) + log.Infof("Picked a %s tablet for VStream in %s/%s within the %s cell(s)", + vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + target := &querypb.Target{ Keyspace: sgtid.Keyspace, Shard: sgtid.Shard, @@ -737,7 +754,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e if err := vs.getError(); err != nil { return err } - // convert all gtids to vgtids. This should be done here while holding the lock. + // Convert all gtids to vgtids. This should be done here while holding the lock. for j, event := range events { if event.Type == binlogdatapb.VEventType_GTID { // Update the VGtid and send that instead. @@ -921,3 +938,37 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar close(je.done) return je, nil } + +// keyspaceHasBeenResharded returns true if the keyspace's serving shard set has changed +// since the last VStream as indicated by the shard definitions provided in the vgtid. +func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string) (bool, error) { + shards, err := vs.ts.FindAllShardsInKeyspace(ctx, keyspace, nil) + if err != nil || len(shards) == 0 { + return false, err + } + for _, g := range vs.vgtid.ShardGtids { + if g.GetKeyspace() == keyspace { + // If we already have the correct (serving) shards in our vgtid then the + // keyspace MAY have an active Reshard workflow but the keyspace has not + // been resharded (meaning traffic has been switched in an active Reshard + // workflow) since we were last streaming. + if shards[g.GetShard()].GetIsPrimaryServing() { + return false, nil + } + } + } + for _, i := range shards { + for _, j := range shards { + if i.ShardName() == j.ShardName() && key.KeyRangeEqual(i.GetKeyRange(), j.GetKeyRange()) { + // It's the same shard so skip it. + continue + } + if key.KeyRangeIntersect(i.GetKeyRange(), j.GetKeyRange()) { + // We have different shards with overlapping keyranges so we know + // that a reshard has occurred. + return true, nil + } + } + } + return false, nil +} From 055e735920fed814765941168750749d45f653dd Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 3 Mar 2024 18:18:04 -0500 Subject: [PATCH 02/10] Add unit test Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 30 ++- go/vt/vtgate/vstream_manager_test.go | 304 ++++++++++++++++++++++++++- 2 files changed, 322 insertions(+), 12 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 5a73052b58a..900042e4150 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -946,17 +946,30 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string if err != nil || len(shards) == 0 { return false, err } + + // First check the typical case, where the vgtid shards match the serving shards. + // In that case it's NOT possible that an applicable reshard has happened because + // the vgtid contains shards that are all serving. + reshardPossible := false + ksShardGTIDs := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids)) for _, g := range vs.vgtid.ShardGtids { if g.GetKeyspace() == keyspace { - // If we already have the correct (serving) shards in our vgtid then the - // keyspace MAY have an active Reshard workflow but the keyspace has not - // been resharded (meaning traffic has been switched in an active Reshard - // workflow) since we were last streaming. - if shards[g.GetShard()].GetIsPrimaryServing() { - return false, nil - } + ksShardGTIDs = append(ksShardGTIDs, g) + } + } + for _, s := range ksShardGTIDs { + if !shards[s.GetShard()].GetIsPrimaryServing() { + reshardPossible = true + break } } + log.Errorf("DEBUG: reshard possible: %v", reshardPossible) + if !reshardPossible { + return false, nil + } + + // Now that we know there MAY have been an applicable reshard, let's make a + // definitive determination by looking at the shard keyranges. for _, i := range shards { for _, j := range shards { if i.ShardName() == j.ShardName() && key.KeyRangeEqual(i.GetKeyRange(), j.GetKeyRange()) { @@ -965,10 +978,11 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string } if key.KeyRangeIntersect(i.GetKeyRange(), j.GetKeyRange()) { // We have different shards with overlapping keyranges so we know - // that a reshard has occurred. + // that a reshard has happened. return true, nil } } } + return false, nil } diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 4c1e9ec6764..c402dfdce8f 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -25,12 +25,12 @@ import ( "testing" "time" - "google.golang.org/protobuf/proto" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" @@ -41,8 +41,6 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - - "vitess.io/vitess/go/test/utils" ) var mu sync.Mutex @@ -1279,6 +1277,304 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } } +func TestKeyspaceHasBeenSharded(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + cell := "zone1" + ks := "testks" + + type testcase struct { + name string + oldshards []string + newshards []string + vgtid *binlogdatapb.VGtid + trafficSwitched bool + want bool + wantErr string + } + testcases := []testcase{ + { + name: "2 to 4, split both, traffic not switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "2 to 4, split both, traffic not switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "2 to 8, split both, traffic switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-20", + "20-40", + "40-60", + "60-80", + "80-a0", + "a0-c0", + "c0-e0", + "e0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "2 to 4, split only first shard, traffic switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-20", + "20-40", + "40-60", + "60-80", + // -80 is not being resharded + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "4 to 2, merge both shards, traffic switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + "-80", + "80-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "4 to 3, merge second half, traffic not switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + // -40, 40-80, and 80-c0 are not being resharded + "80-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "4 to 3, merge second half, traffic switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + // -40, 40-80, and 80-c0 are not being resharded + "80-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + hc := discovery.NewFakeHealthCheck(nil) + _ = createSandbox(ks) + st := getSandboxTopo(ctx, cell, ks, append(tc.oldshards, tc.newshards...)) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vs := vstream{ + vgtid: tc.vgtid, + tabletType: topodatapb.TabletType_PRIMARY, + optCells: cell, + vsm: vsm, + ts: st.topoServer, + } + for i, shard := range tc.oldshards { + serving := true + if tc.trafficSwitched { + serving = false + } + tabletconn := hc.AddTestTablet(cell, fmt.Sprintf("1.1.0.%d", i), int32(1000+i), ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil) + err := st.topoServer.CreateTablet(ctx, tabletconn.Tablet()) + require.NoError(t, err) + var alias *topodatapb.TabletAlias + if serving { + alias = tabletconn.Tablet().Alias + } + _, err = st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = alias + si.IsPrimaryServing = serving + return nil + }) + require.NoError(t, err) + } + for i, shard := range tc.newshards { + serving := false + if tc.trafficSwitched { + serving = true + } + tabletconn := hc.AddTestTablet(cell, fmt.Sprintf("1.1.1.%d", i), int32(2000+i), ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil) + err := st.topoServer.CreateTablet(ctx, tabletconn.Tablet()) + require.NoError(t, err) + var alias *topodatapb.TabletAlias + if serving { + alias = tabletconn.Tablet().Alias + } + _, err = st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = alias + si.IsPrimaryServing = serving + return nil + }) + require.NoError(t, err) + } + got, err := vs.keyspaceHasBeenResharded(ctx, ks) + if tc.wantErr != "" { + require.EqualError(t, err, tc.wantErr) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, got) + }) + } +} + func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { gw := NewTabletGateway(ctx, hc, serv, cell) srvResolver := srvtopo.NewResolver(serv, gw, cell) From 77894d7dc5b7c64863f8227d39b9232aad1d788c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 3 Mar 2024 21:15:30 -0500 Subject: [PATCH 03/10] Add e2e test Signed-off-by: Matt Lord --- .../vreplication/vreplication_test.go | 2 +- go/test/endtoend/vreplication/vstream_test.go | 199 +++++++++++++++++- go/vt/topo/faketopo/faketopo.go | 5 +- 3 files changed, 199 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index c28118a97cc..4341a9602ea 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -1453,7 +1453,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t action, workflow, output) } if err != nil { - t.Fatalf("Reshard %s command failed with %+v\n", action, err) + t.Fatalf("Reshard %s command failed with %+v\nOutput: %s", action, err, output) } } diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index dee8243d5e9..ff118f0a407 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -223,7 +223,7 @@ func insertRow(keyspace, table string, id int) { vtgateConn.ExecuteFetch("begin", 1000, false) _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false) if err != nil { - log.Infof("error inserting row %d: %v", id, err) + log.Errorf("error inserting row %d: %v", id, err) } vtgateConn.ExecuteFetch("commit", 1000, false) } @@ -387,13 +387,15 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven defer vc.TearDown() defaultCell := vc.Cells[vc.CellNames[0]] - vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + require.NoError(t, err) vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() verifyClusterHealth(t, vc) - vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + _, err = vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + require.NoError(t, err) ctx := context.Background() vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) @@ -512,6 +514,197 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven return ne } +// Validate that we can resume a VStream when the keyspace has been resharded +// while not streaming. Ensure that there we successfully transition from the +// old shards -- which are in the VGTID from the previous stream -- and that +// we miss no row events during the process. +func TestMultiVStreamsKeyspaceReshard(t *testing.T) { + ctx := context.Background() + ks := "testks" + wf := "multiVStreamsKeyspaceReshard" + baseTabletID := 100 + tabletType := topodatapb.TabletType_PRIMARY.String() + oldShards := "-80,80-" + newShards := "-40,40-80,80-c0,c0-" + oldShardRowEvents, newShardRowEvents := 0, 0 + vc = NewVitessCluster(t, nil) + defer vc.TearDown() + defaultCell := vc.Cells[vc.CellNames[0]] + ogdr := defaultReplicas + defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets + defer func(dr int) { defaultReplicas = dr }(ogdr) + + // For our sequences etc. + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil) + require.NoError(t, err) + + // Setup the keyspace with our old shards. + keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil) + require.NoError(t, err) + + // Add the new shards. + err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts) + require.NoError(t, err) + + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + require.NoError(t, err) + defer vstreamConn.Close() + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "/.*", // Match all keyspaces just to be more realistic. + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + // Only stream the customer table. + Match: "customer", + }}, + } + flags := &vtgatepb.VStreamFlags{} + + // Ensure that we're starting with a clean slate. + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false) + require.NoError(t, err) + + streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute) + defer streamCancel() + done := make(chan struct{}) + + // First goroutine that keeps inserting rows into the table being streamed until the + // stream context is cancelled. + go func() { + id := 1 + for { + select { + case <-streamCtx.Done(): + // Give the VStream a little catch-up time before telling it to stop + // via the done channel. + time.Sleep(10 * time.Second) + close(done) + return + default: + insertRow(ks, "customer", id) + time.Sleep(250 * time.Millisecond) + id++ + } + } + }() + + // Create the Reshard workflow and wait for it to finish the copy phase. + reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String()) + + // Stream events but stop once we have a VGTID with positions for the old/original shards. + var newVGTID *binlogdatapb.VGtid + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.GetRowEvent().GetShard() + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "0": + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + case binlogdatapb.VEventType_JOURNAL: + require.FailNow(t, fmt.Sprintf("received unexpected journal event for keyspace: %s", ev.GetKeyspace())) + case binlogdatapb.VEventType_VGTID: + newVGTID = ev.GetVgtid() + if len(newVGTID.GetShardGtids()) == 3 { + // We want a VGTID with a position for the global shard and the old shards. + canStop := true + for _, sg := range newVGTID.GetShardGtids() { + if sg.GetGtid() == "" { + canStop = false + } + } + if canStop { + return + } + } + } + } + default: + require.FailNow(t, "VStream returned unexpected error: %v", err) + return + } + select { + case <-streamCtx.Done(): + return + default: + } + } + }() + + require.Len(t, newVGTID.GetShardGtids(), 3) + + // Switch the traffic to the new shards. + reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType) + + // Now start a new VStream from our previous VGTID which only has the old/original shards. + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.RowEvent.Shard + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "-40", "40-80", "80-c0", "c0-": + newShardRowEvents++ + case "0": + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + } + } + case io.EOF: + log.Infof("Stream Ended") + streamCancel() + default: + log.Errorf("Returned err %v", err) + streamCancel() + } + select { + case <-done: + return + default: + } + } + }() + + require.GreaterOrEqual(t, oldShardRowEvents, 1) + require.GreaterOrEqual(t, newShardRowEvents, 1) + + // The number of row events streamed by the VStream API should match the number of rows inserted. + customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer") + insertedCustomerRows, err := customerResult.Rows[0][0].ToCastInt64() + require.NoError(t, err) + require.Equal(t, insertedCustomerRows, int64(oldShardRowEvents+newShardRowEvents)) +} + func TestVStreamFailover(t *testing.T) { testVStreamWithFailover(t, true) } diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index 8601d28f5b6..69ccf08a969 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -21,12 +21,11 @@ import ( "strings" "sync" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" - "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - - "vitess.io/vitess/go/vt/topo" ) // FakeFactory implements the Factory interface. This is supposed to be used only for testing From 342968e6c3d1ba99ea068a1f6c35f4fa315f9152 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 4 Mar 2024 12:33:55 -0500 Subject: [PATCH 04/10] Improve unit test Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager_test.go | 62 +++++++++++----------------- 1 file changed, 23 insertions(+), 39 deletions(-) diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index c402dfdce8f..6eec06a1bac 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -1389,7 +1389,7 @@ func TestKeyspaceHasBeenSharded(t *testing.T) { "20-40", "40-60", "60-80", - // -80 is not being resharded + // 80- is not being resharded. }, vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{ @@ -1450,8 +1450,8 @@ func TestKeyspaceHasBeenSharded(t *testing.T) { "c0-", }, newshards: []string{ - // -40, 40-80, and 80-c0 are not being resharded - "80-", + // -40 and 40-80 are not being resharded. + "80-", // Merge of 80-c0 and c0- }, vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{ @@ -1485,8 +1485,8 @@ func TestKeyspaceHasBeenSharded(t *testing.T) { "c0-", }, newshards: []string{ - // -40, 40-80, and 80-c0 are not being resharded - "80-", + // -40 and 40-80 are not being resharded. + "80-", // Merge of 80-c0 and c0- }, vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{ @@ -1513,6 +1513,22 @@ func TestKeyspaceHasBeenSharded(t *testing.T) { }, } + addTablet := func(t *testing.T, ctx context.Context, host string, port int32, cell, ks, shard string, ts *topo.Server, hc *discovery.FakeHealthCheck, serving bool) { + tabletconn := hc.AddTestTablet(cell, host, port, ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil) + err := ts.CreateTablet(ctx, tabletconn.Tablet()) + require.NoError(t, err) + var alias *topodatapb.TabletAlias + if serving { + alias = tabletconn.Tablet().Alias + } + _, err = ts.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = alias + si.IsPrimaryServing = serving + return nil + }) + require.NoError(t, err) + } + for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) @@ -1527,42 +1543,10 @@ func TestKeyspaceHasBeenSharded(t *testing.T) { ts: st.topoServer, } for i, shard := range tc.oldshards { - serving := true - if tc.trafficSwitched { - serving = false - } - tabletconn := hc.AddTestTablet(cell, fmt.Sprintf("1.1.0.%d", i), int32(1000+i), ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil) - err := st.topoServer.CreateTablet(ctx, tabletconn.Tablet()) - require.NoError(t, err) - var alias *topodatapb.TabletAlias - if serving { - alias = tabletconn.Tablet().Alias - } - _, err = st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { - si.PrimaryAlias = alias - si.IsPrimaryServing = serving - return nil - }) - require.NoError(t, err) + addTablet(t, ctx, fmt.Sprintf("1.1.0.%d", i), int32(1000+i), cell, ks, shard, st.topoServer, hc, !tc.trafficSwitched) } for i, shard := range tc.newshards { - serving := false - if tc.trafficSwitched { - serving = true - } - tabletconn := hc.AddTestTablet(cell, fmt.Sprintf("1.1.1.%d", i), int32(2000+i), ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil) - err := st.topoServer.CreateTablet(ctx, tabletconn.Tablet()) - require.NoError(t, err) - var alias *topodatapb.TabletAlias - if serving { - alias = tabletconn.Tablet().Alias - } - _, err = st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { - si.PrimaryAlias = alias - si.IsPrimaryServing = serving - return nil - }) - require.NoError(t, err) + addTablet(t, ctx, fmt.Sprintf("1.1.1.%d", i), int32(2000+i), cell, ks, shard, st.topoServer, hc, tc.trafficSwitched) } got, err := vs.keyspaceHasBeenResharded(ctx, ks) if tc.wantErr != "" { From e03b3f08fb0fb097f5025878119f7949442fb8e0 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 4 Mar 2024 12:34:42 -0500 Subject: [PATCH 05/10] Remove local testing changes Signed-off-by: Matt Lord --- examples/local/vstream_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/local/vstream_client.go b/examples/local/vstream_client.go index 45c33835181..98d2129f898 100644 --- a/examples/local/vstream_client.go +++ b/examples/local/vstream_client.go @@ -38,7 +38,7 @@ import ( */ func main() { ctx := context.Background() - streamCustomer := false + streamCustomer := true var vgtid *binlogdatapb.VGtid if streamCustomer { vgtid = &binlogdatapb.VGtid{ @@ -56,7 +56,7 @@ func main() { } else { vgtid = &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: "customer", + Keyspace: "commerce", Shard: "0", Gtid: "", }}} From 51ed45a63e0772b168bd498e02e5a7323993dbab Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 4 Mar 2024 12:53:40 -0500 Subject: [PATCH 06/10] Changes after self review Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vstream_test.go | 40 ++++++++++--------- go/vt/vtgate/vstream_manager.go | 19 +++++---- 2 files changed, 33 insertions(+), 26 deletions(-) diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index ff118f0a407..59a21bd0836 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -538,7 +538,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil) require.NoError(t, err) - // Setup the keyspace with our old shards. + // Setup the keyspace with our old/original shards. keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil) require.NoError(t, err) @@ -553,23 +553,12 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) require.NoError(t, err) defer vstreamConn.Close() - vgtid := &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: "/.*", // Match all keyspaces just to be more realistic. - }}} - - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - // Only stream the customer table. - Match: "customer", - }}, - } - flags := &vtgatepb.VStreamFlags{} // Ensure that we're starting with a clean slate. _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false) require.NoError(t, err) + // Coordinate go-routines. streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute) defer streamCancel() done := make(chan struct{}) @@ -598,6 +587,19 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType) waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String()) + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "/.*", // Match all keyspaces just to be more realistic. + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + // Only stream the customer table and its sequence backing table. + Match: "/customer.*", + }}, + } + flags := &vtgatepb.VStreamFlags{} + // Stream events but stop once we have a VGTID with positions for the old/original shards. var newVGTID *binlogdatapb.VGtid func() { @@ -639,7 +641,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { } } default: - require.FailNow(t, "VStream returned unexpected error: %v", err) + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) return } select { @@ -650,6 +652,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { } }() + // Confirm that we have shard GTIDs for the global shard and the old/original shards. require.Len(t, newVGTID.GetShardGtids(), 3) // Switch the traffic to the new shards. @@ -695,14 +698,15 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { } }() - require.GreaterOrEqual(t, oldShardRowEvents, 1) - require.GreaterOrEqual(t, newShardRowEvents, 1) + // We should have a mix of events across the old and new shards. + require.NotZero(t, oldShardRowEvents) + require.NotZero(t, newShardRowEvents) // The number of row events streamed by the VStream API should match the number of rows inserted. customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer") - insertedCustomerRows, err := customerResult.Rows[0][0].ToCastInt64() + customerCount, err := customerResult.Rows[0][0].ToInt64() require.NoError(t, err) - require.Equal(t, insertedCustomerRows, int64(oldShardRowEvents+newShardRowEvents)) + require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents)) } func TestVStreamFailover(t *testing.T) { diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 900042e4150..41d3833f6f6 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -940,30 +940,33 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar } // keyspaceHasBeenResharded returns true if the keyspace's serving shard set has changed -// since the last VStream as indicated by the shard definitions provided in the vgtid. +// since the last VStream as indicated by the shard definitions provided in the VGTID. func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string) (bool, error) { shards, err := vs.ts.FindAllShardsInKeyspace(ctx, keyspace, nil) if err != nil || len(shards) == 0 { return false, err } - // First check the typical case, where the vgtid shards match the serving shards. + // First check the typical case, where the VGTID shards match the serving shards. // In that case it's NOT possible that an applicable reshard has happened because - // the vgtid contains shards that are all serving. + // the VGTID contains shards that are all serving. reshardPossible := false ksShardGTIDs := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids)) - for _, g := range vs.vgtid.ShardGtids { - if g.GetKeyspace() == keyspace { - ksShardGTIDs = append(ksShardGTIDs, g) + for _, s := range vs.vgtid.ShardGtids { + if s.GetKeyspace() == keyspace { + ksShardGTIDs = append(ksShardGTIDs, s) } } for _, s := range ksShardGTIDs { - if !shards[s.GetShard()].GetIsPrimaryServing() { + shard := shards[s.GetShard()] + if shard == nil { + return false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard provided in VGTID, %s, not found in keyspace %s", s.GetShard(), keyspace) + } + if !shard.GetIsPrimaryServing() { reshardPossible = true break } } - log.Errorf("DEBUG: reshard possible: %v", reshardPossible) if !reshardPossible { return false, nil } From 3ed17b00021a25689bf6b67647c079c4ee484542 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 4 Mar 2024 23:30:56 -0500 Subject: [PATCH 07/10] Nitty test improvements Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vstream_test.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 59a21bd0836..e13c3e24e80 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -619,11 +619,10 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { case "-80", "80-": oldShardRowEvents++ case "0": + // We expect some for the sequence backing table, but don't care. default: require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) } - case binlogdatapb.VEventType_JOURNAL: - require.FailNow(t, fmt.Sprintf("received unexpected journal event for keyspace: %s", ev.GetKeyspace())) case binlogdatapb.VEventType_VGTID: newVGTID = ev.GetVgtid() if len(newVGTID.GetShardGtids()) == 3 { @@ -642,7 +641,6 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { } default: require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) - return } select { case <-streamCtx.Done(): @@ -678,17 +676,14 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { case "-40", "40-80", "80-c0", "c0-": newShardRowEvents++ case "0": + // Again, we expect some for the sequence backing table, but don't care. default: require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) } } } - case io.EOF: - log.Infof("Stream Ended") - streamCancel() default: - log.Errorf("Returned err %v", err) - streamCancel() + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) } select { case <-done: From c772c66f5bbf2897a8c3d7c91c9cf4832d5d069f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 5 Mar 2024 09:56:45 -0500 Subject: [PATCH 08/10] Nitty nit Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 41d3833f6f6..5cd0edcf2a1 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -960,7 +960,7 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string for _, s := range ksShardGTIDs { shard := shards[s.GetShard()] if shard == nil { - return false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard provided in VGTID, %s, not found in keyspace %s", s.GetShard(), keyspace) + return false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard provided in VGTID, %s, not found in the %s keyspace", s.GetShard(), keyspace) } if !shard.GetIsPrimaryServing() { reshardPossible = true From 86749b775db2158e5733baddd9b4c3560d13470c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 9 Mar 2024 20:21:49 -0500 Subject: [PATCH 09/10] Execute the new e2e test in the CI Signed-off-by: Matt Lord --- test/config.json | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/config.json b/test/config.json index 14c05cb8df6..7d429ce6978 100644 --- a/test/config.json +++ b/test/config.json @@ -1121,6 +1121,15 @@ "RetryMax": 1, "Tags": [] }, + "multi_vstreams_keyspace_reshard": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMultiVStreamsKeyspaceReshard", "-timeout", "15m"], + "Command": [], + "Manual": false, + "Shard": "vstream", + "RetryMax": 1, + "Tags": [] + }, "vstream_failover": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "VStreamFailover"], From 2ca07bbc53c032a2463374d9c704710e4ce4c75d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 18 Mar 2024 09:06:10 -0400 Subject: [PATCH 10/10] Slight performance optimization Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 5cd0edcf2a1..d520bec1dbf 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -25,6 +25,8 @@ import ( "sync" "time" + "golang.org/x/exp/maps" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" @@ -973,13 +975,15 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string // Now that we know there MAY have been an applicable reshard, let's make a // definitive determination by looking at the shard keyranges. - for _, i := range shards { - for _, j := range shards { - if i.ShardName() == j.ShardName() && key.KeyRangeEqual(i.GetKeyRange(), j.GetKeyRange()) { + // All we care about are the shard info records now. + sis := maps.Values(shards) + for i := range sis { + for j := range sis { + if sis[i].ShardName() == sis[j].ShardName() && key.KeyRangeEqual(sis[i].GetKeyRange(), sis[j].GetKeyRange()) { // It's the same shard so skip it. continue } - if key.KeyRangeIntersect(i.GetKeyRange(), j.GetKeyRange()) { + if key.KeyRangeIntersect(sis[i].GetKeyRange(), sis[j].GetKeyRange()) { // We have different shards with overlapping keyranges so we know // that a reshard has happened. return true, nil