diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index c28118a97cc..4341a9602ea 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -1453,7 +1453,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t action, workflow, output) } if err != nil { - t.Fatalf("Reshard %s command failed with %+v\n", action, err) + t.Fatalf("Reshard %s command failed with %+v\nOutput: %s", action, err, output) } } diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index dee8243d5e9..ff118f0a407 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -223,7 +223,7 @@ func insertRow(keyspace, table string, id int) { vtgateConn.ExecuteFetch("begin", 1000, false) _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false) if err != nil { - log.Infof("error inserting row %d: %v", id, err) + log.Errorf("error inserting row %d: %v", id, err) } vtgateConn.ExecuteFetch("commit", 1000, false) } @@ -387,13 +387,15 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven defer vc.TearDown() defaultCell := vc.Cells[vc.CellNames[0]] - vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + require.NoError(t, err) vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() verifyClusterHealth(t, vc) - vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + _, err = vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + require.NoError(t, err) ctx := context.Background() vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) @@ -512,6 +514,197 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven return ne } +// Validate that we can resume a VStream when the keyspace has been resharded +// while not streaming. Ensure that there we successfully transition from the +// old shards -- which are in the VGTID from the previous stream -- and that +// we miss no row events during the process. +func TestMultiVStreamsKeyspaceReshard(t *testing.T) { + ctx := context.Background() + ks := "testks" + wf := "multiVStreamsKeyspaceReshard" + baseTabletID := 100 + tabletType := topodatapb.TabletType_PRIMARY.String() + oldShards := "-80,80-" + newShards := "-40,40-80,80-c0,c0-" + oldShardRowEvents, newShardRowEvents := 0, 0 + vc = NewVitessCluster(t, nil) + defer vc.TearDown() + defaultCell := vc.Cells[vc.CellNames[0]] + ogdr := defaultReplicas + defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets + defer func(dr int) { defaultReplicas = dr }(ogdr) + + // For our sequences etc. + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil) + require.NoError(t, err) + + // Setup the keyspace with our old shards. + keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil) + require.NoError(t, err) + + // Add the new shards. + err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts) + require.NoError(t, err) + + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + require.NoError(t, err) + defer vstreamConn.Close() + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "/.*", // Match all keyspaces just to be more realistic. + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + // Only stream the customer table. + Match: "customer", + }}, + } + flags := &vtgatepb.VStreamFlags{} + + // Ensure that we're starting with a clean slate. + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false) + require.NoError(t, err) + + streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute) + defer streamCancel() + done := make(chan struct{}) + + // First goroutine that keeps inserting rows into the table being streamed until the + // stream context is cancelled. + go func() { + id := 1 + for { + select { + case <-streamCtx.Done(): + // Give the VStream a little catch-up time before telling it to stop + // via the done channel. + time.Sleep(10 * time.Second) + close(done) + return + default: + insertRow(ks, "customer", id) + time.Sleep(250 * time.Millisecond) + id++ + } + } + }() + + // Create the Reshard workflow and wait for it to finish the copy phase. + reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String()) + + // Stream events but stop once we have a VGTID with positions for the old/original shards. + var newVGTID *binlogdatapb.VGtid + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.GetRowEvent().GetShard() + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "0": + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + case binlogdatapb.VEventType_JOURNAL: + require.FailNow(t, fmt.Sprintf("received unexpected journal event for keyspace: %s", ev.GetKeyspace())) + case binlogdatapb.VEventType_VGTID: + newVGTID = ev.GetVgtid() + if len(newVGTID.GetShardGtids()) == 3 { + // We want a VGTID with a position for the global shard and the old shards. + canStop := true + for _, sg := range newVGTID.GetShardGtids() { + if sg.GetGtid() == "" { + canStop = false + } + } + if canStop { + return + } + } + } + } + default: + require.FailNow(t, "VStream returned unexpected error: %v", err) + return + } + select { + case <-streamCtx.Done(): + return + default: + } + } + }() + + require.Len(t, newVGTID.GetShardGtids(), 3) + + // Switch the traffic to the new shards. + reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType) + + // Now start a new VStream from our previous VGTID which only has the old/original shards. + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.RowEvent.Shard + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "-40", "40-80", "80-c0", "c0-": + newShardRowEvents++ + case "0": + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + } + } + case io.EOF: + log.Infof("Stream Ended") + streamCancel() + default: + log.Errorf("Returned err %v", err) + streamCancel() + } + select { + case <-done: + return + default: + } + } + }() + + require.GreaterOrEqual(t, oldShardRowEvents, 1) + require.GreaterOrEqual(t, newShardRowEvents, 1) + + // The number of row events streamed by the VStream API should match the number of rows inserted. + customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer") + insertedCustomerRows, err := customerResult.Rows[0][0].ToCastInt64() + require.NoError(t, err) + require.Equal(t, insertedCustomerRows, int64(oldShardRowEvents+newShardRowEvents)) +} + func TestVStreamFailover(t *testing.T) { testVStreamWithFailover(t, true) } diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index 8601d28f5b6..69ccf08a969 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -21,12 +21,11 @@ import ( "strings" "sync" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" - "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - - "vitess.io/vitess/go/vt/topo" ) // FakeFactory implements the Factory interface. This is supposed to be used only for testing