Skip to content

Commit

Permalink
Add e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Mar 4, 2024
1 parent 055e735 commit 77894d7
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 7 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t
action, workflow, output)
}
if err != nil {
t.Fatalf("Reshard %s command failed with %+v\n", action, err)
t.Fatalf("Reshard %s command failed with %+v\nOutput: %s", action, err, output)
}
}

Expand Down
199 changes: 196 additions & 3 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func insertRow(keyspace, table string, id int) {
vtgateConn.ExecuteFetch("begin", 1000, false)
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false)
if err != nil {
log.Infof("error inserting row %d: %v", id, err)
log.Errorf("error inserting row %d: %v", id, err)
}
vtgateConn.ExecuteFetch("commit", 1000, false)
}
Expand Down Expand Up @@ -387,13 +387,15 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
defer vc.TearDown()

defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
require.NoError(t, err)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)
_, err = vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)
require.NoError(t, err)

ctx := context.Background()
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
Expand Down Expand Up @@ -512,6 +514,197 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
return ne
}

// Validate that we can resume a VStream when the keyspace has been resharded
// while not streaming. Ensure that there we successfully transition from the
// old shards -- which are in the VGTID from the previous stream -- and that
// we miss no row events during the process.
func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
ctx := context.Background()
ks := "testks"
wf := "multiVStreamsKeyspaceReshard"
baseTabletID := 100
tabletType := topodatapb.TabletType_PRIMARY.String()
oldShards := "-80,80-"
newShards := "-40,40-80,80-c0,c0-"
oldShardRowEvents, newShardRowEvents := 0, 0
vc = NewVitessCluster(t, nil)
defer vc.TearDown()
defaultCell := vc.Cells[vc.CellNames[0]]
ogdr := defaultReplicas
defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets
defer func(dr int) { defaultReplicas = dr }(ogdr)

// For our sequences etc.
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil)
require.NoError(t, err)

// Setup the keyspace with our old shards.
keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil)
require.NoError(t, err)

// Add the new shards.
err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts)
require.NoError(t, err)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "/.*", // Match all keyspaces just to be more realistic.
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// Only stream the customer table.
Match: "customer",
}},
}
flags := &vtgatepb.VStreamFlags{}

// Ensure that we're starting with a clean slate.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false)
require.NoError(t, err)

streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
insertRow(ks, "customer", id)
time.Sleep(250 * time.Millisecond)
id++
}
}
}()

// Create the Reshard workflow and wait for it to finish the copy phase.
reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String())

// Stream events but stop once we have a VGTID with positions for the old/original shards.
var newVGTID *binlogdatapb.VGtid
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.GetRowEvent().GetShard()
switch shard {
case "-80", "80-":
oldShardRowEvents++
case "0":
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_JOURNAL:
require.FailNow(t, fmt.Sprintf("received unexpected journal event for keyspace: %s", ev.GetKeyspace()))
case binlogdatapb.VEventType_VGTID:
newVGTID = ev.GetVgtid()
if len(newVGTID.GetShardGtids()) == 3 {
// We want a VGTID with a position for the global shard and the old shards.
canStop := true
for _, sg := range newVGTID.GetShardGtids() {
if sg.GetGtid() == "" {
canStop = false
}
}
if canStop {
return
}
}
}
}
default:
require.FailNow(t, "VStream returned unexpected error: %v", err)
return
}
select {
case <-streamCtx.Done():
return
default:
}
}
}()

require.Len(t, newVGTID.GetShardGtids(), 3)

// Switch the traffic to the new shards.
reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType)

// Now start a new VStream from our previous VGTID which only has the old/original shards.
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "-80", "80-":
oldShardRowEvents++
case "-40", "40-80", "80-c0", "c0-":
newShardRowEvents++
case "0":
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
}
}
case io.EOF:
log.Infof("Stream Ended")
streamCancel()
default:
log.Errorf("Returned err %v", err)
streamCancel()
}
select {
case <-done:
return
default:
}
}
}()

require.GreaterOrEqual(t, oldShardRowEvents, 1)
require.GreaterOrEqual(t, newShardRowEvents, 1)

// The number of row events streamed by the VStream API should match the number of rows inserted.
customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer")
insertedCustomerRows, err := customerResult.Rows[0][0].ToCastInt64()
require.NoError(t, err)
require.Equal(t, insertedCustomerRows, int64(oldShardRowEvents+newShardRowEvents))
}

func TestVStreamFailover(t *testing.T) {
testVStreamWithFailover(t, true)
}
Expand Down
5 changes: 2 additions & 3 deletions go/vt/topo/faketopo/faketopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"strings"
"sync"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/vt/topo"
)

// FakeFactory implements the Factory interface. This is supposed to be used only for testing
Expand Down

0 comments on commit 77894d7

Please sign in to comment.