Skip to content

Commit

Permalink
Get e2e test working
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Dec 21, 2023
1 parent 9cd3cb3 commit e75bad7
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 26 deletions.
65 changes: 53 additions & 12 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
)

Expand All @@ -51,6 +53,7 @@ type testCase struct {
testCLIErrors bool // test CLI errors against this workflow (only needs to be done once)
testCLICreateWait bool // test CLI create and wait until done against this workflow (only needs to be done once)
testCLIFlagHandling bool // test vtctldclient flag handling from end-to-end
extraVDiffFlags map[string]string
}

const (
Expand Down Expand Up @@ -78,6 +81,10 @@ var testCases = []*testCase{
testCLIErrors: true, // test for errors in the simplest workflow
testCLICreateWait: true, // test wait on create feature against simplest workflow
testCLIFlagHandling: true, // test flag handling end-to-end against simplest workflow
// Uncomment this to test the max-diff-duration flag.
extraVDiffFlags: map[string]string{
"--max-diff-duration": "1s",
},
},
{
name: "Reshard Merge/split 2 to 3",
Expand Down Expand Up @@ -117,8 +124,13 @@ func TestVDiff2(t *testing.T) {
sourceShards := []string{"0"}
targetKs := "customer"
targetShards := []string{"-80", "80-"}
// This forces us to use multiple vstream packets even with small test tables.
extraVTTabletArgs = []string{"--vstream_packet_size=1"}
extraVTTabletArgs = []string{
// This forces us to use multiple vstream packets even with small test tables.
"--vstream_packet_size=1",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}

vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig)
require.NotNil(t, vc)
Expand Down Expand Up @@ -194,22 +206,51 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
err := vc.VtctlClient.ExecuteCommand(args...)
require.NoError(t, err)

for _, shard := range arrTargetShards {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
catchup(t, tab, tc.workflow, tc.typ)
}
if diffDuration, ok := tc.extraVDiffFlags["--max-diff-duration"]; ok {
if !strings.Contains(tc.tables, "customer") {
require.Fail(t, "customer table must be included in the table list to test --max-diff-duration")
}
// Generate enough customer table data so that the table diff gets restarted.
dur, err := time.ParseDuration(diffDuration)
require.NoError(t, err, "could not parse --max-diff-duration %q: %v", diffDuration, err)
seconds := int64(dur.Seconds())
chunkSize := int64(100000)
perSecondCount := int64(500000)
totalRowsToCreate := seconds * perSecondCount
for i := int64(0); i < totalRowsToCreate; i += chunkSize {
generateMoreCustomers(t, sourceKs, chunkSize)
}

vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)
// Wait for the workflow to finish the copy phase and catch up.
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
for _, shard := range arrTargetShards {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
catchup(t, tab, tc.workflow, tc.typ)
}

/*
doVtctldclientVDiff(t, tc.targetKs, tc.workflow, allCellNames, nil, "--max-diff-time=50ns")
// This flag is only implemented in vtctldclient.
doVtctldclientVDiff(t, tc.targetKs, tc.workflow, allCellNames, nil, "--max-diff-duration", diffDuration)

// Confirm that the customer table diff was restarted but not others.
tablet := vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0])
stat, err := getDebugVar(t, tablet.Port, []string{"VDiffRestartedTableDiffsCount"})
require.NoError(t, err, "failed to get VDiffRestartedTableDiffsCount stat: %v", err)
restarts := gjson.Parse(stat).Get("customer").Int()
require.Greater(t, restarts, int64(0), "expected VDiffRestartedTableDiffsCount stat to be greater than 0, got %d", restarts)
*/
customerRestarts := gjson.Parse(stat).Get("customer").Int()
require.Greater(t, customerRestarts, int64(0), "expected VDiffRestartedTableDiffsCount stat to be greater than 0 for the customer table, got %d", customerRestarts)
leadRestarts := gjson.Parse(stat).Get("lead").Int()
require.Equal(t, int64(0), leadRestarts, "expected VDiffRestartedTableDiffsCount stat to be 0 for the Lead table, got %d", leadRestarts)

// Cleanup the created customer records so as not to slow down the rest of the test.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer order by cid desc limit %d", sourceKs, totalRowsToCreate), -1, false)
require.NoError(t, err, "failed to cleanup added customer records: %v", err)
} else {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
for _, shard := range arrTargetShards {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
catchup(t, tab, tc.workflow, tc.typ)
}
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)
}

if tc.autoRetryError {
testAutoRetryError(t, tc, allCellNames)
Expand Down
6 changes: 4 additions & 2 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

const (
vdiffTimeout = 90 * time.Second // we can leverage auto retry on error with this longer-than-usual timeout
vdiffTimeout = 120 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout
vdiffRetryTimeout = 30 * time.Second
vdiffStatusCheckInterval = 1 * time.Second
vdiffRetryInterval = 5 * time.Second
Expand Down Expand Up @@ -159,7 +159,9 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e
t.Run(fmt.Sprintf("vtctldclient vdiff %s", ksWorkflow), func(t *testing.T) {
// update-table-stats is needed in order to test progress reports.
flags := []string{"--auto-retry", "--update-table-stats"}
flags = append(flags, extraFlags...)
if len(extraFlags) > 0 {
flags = append(flags, extraFlags...)
}
uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false, flags...)
info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, time.Time{})
require.NotNil(t, info)
Expand Down
32 changes: 21 additions & 11 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// workflowDiffer has metadata and state for the vdiff of a single workflow on this tablet
Expand Down Expand Up @@ -156,7 +157,12 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D
)
defer func() {
if diffTimer != nil {
diffTimer.Stop()
if !diffTimer.Stop() {
select {
case <-diffTimer.C:
default:
}
}
}
}()

Expand All @@ -181,24 +187,28 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D
}

if diffTimer != nil { // We're restarting the diff
diffTimer.Stop()
if !diffTimer.Stop() {
select {
case <-diffTimer.C:
default:
}
}
diffTimer = nil
cancelShardStreams()
// Give the underlying resources (mainly MySQL) a moment to catch up
// before we pick up where we left off (but with a new database snapshot).
// before we pick up where we left off (but with new database snapshots).
time.Sleep(30 * time.Second)
}
diffTimer = time.NewTimer(maxDiffRuntime)

if err := td.initialize(ctx); err != nil {
if err := td.initialize(ctx); err != nil { // Setup the consistent snapshots
return err
}
log.Infof("Table initialization done on table %s for vdiff %s", td.table.Name, wd.ct.uuid)
diffTimer = time.NewTimer(maxDiffRuntime)
diffReport, diffErr = td.diff(ctx, wd.opts.CoreOptions.MaxRows, wd.opts.ReportOptions.DebugQuery, wd.opts.ReportOptions.OnlyPks, wd.opts.CoreOptions.MaxExtraRowsToCompare, wd.opts.ReportOptions.MaxSampleRows, diffTimer.C)
log.Errorf("Encountered an error diffing table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, diffErr)
if diffErr == nil { // We finished the diff successfully
break
}
log.Errorf("Encountered an error diffing table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, diffErr)
if !errors.Is(diffErr, ErrMaxDiffDurationExceeded) { // We only want to retry if we hit the max-diff-duration
return diffErr
}
Expand Down
2 changes: 1 addition & 1 deletion test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,7 @@
},
"vdiff2": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVDiff2", "-timeout", "20m"],
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVDiff2", "-timeout", "30m"],
"Command": [],
"Manual": false,
"Shard": "vreplication_migrate_vdiff2_convert_tz",
Expand Down

0 comments on commit e75bad7

Please sign in to comment.