Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VDiff: Support a max diff time for tables #14786

Merged
merged 22 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var (
Wait bool
WaitUpdateInterval time.Duration
AutoRetry bool
MaxDiffDuration time.Duration
}{}

deleteOptions = struct {
Expand Down Expand Up @@ -291,6 +292,7 @@ func commandCreate(cmd *cobra.Command, args []string) error {
WaitUpdateInterval: protoutil.DurationToProto(createOptions.WaitUpdateInterval),
AutoRetry: createOptions.AutoRetry,
MaxReportSampleRows: createOptions.MaxReportSampleRows,
MaxDiffDuration: protoutil.DurationToProto(createOptions.MaxDiffDuration),
})

if err != nil {
Expand Down Expand Up @@ -883,6 +885,7 @@ func registerCommands(root *cobra.Command) {
create.Flags().DurationVar(&createOptions.WaitUpdateInterval, "wait-update-interval", time.Duration(1*time.Minute), "When waiting on a vdiff to finish, check and display the current status this often.")
create.Flags().BoolVar(&createOptions.AutoRetry, "auto-retry", true, "Should this vdiff automatically retry and continue in case of recoverable errors.")
create.Flags().BoolVar(&createOptions.UpdateTableStats, "update-table-stats", false, "Update the table statistics, using ANALYZE TABLE, on each table involved in the VDiff during initialization. This will ensure that progress estimates are as accurate as possible -- but it does involve locks and can potentially impact query processing on the target keyspace.")
create.Flags().DurationVar(&createOptions.MaxDiffDuration, "max-diff-duration", 0, "How long should an individual table diff run before being stopped and restarted in order to lessen the impact on tablets due to holding open database snapshots for long periods of time (0 is the default and means no time limit).")
base.AddCommand(create)

base.AddCommand(delete)
Expand Down
113 changes: 86 additions & 27 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
)

Expand All @@ -51,6 +53,7 @@ type testCase struct {
testCLIErrors bool // test CLI errors against this workflow (only needs to be done once)
testCLICreateWait bool // test CLI create and wait until done against this workflow (only needs to be done once)
testCLIFlagHandling bool // test vtctldclient flag handling from end-to-end
extraVDiffFlags map[string]string
}

const (
Expand All @@ -72,12 +75,15 @@ var testCases = []*testCase{
tabletBaseID: 200,
tables: "customer,Lead,Lead-1,nopk",
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(1991234, 'Testy McTester', 'soho')`,
retryInsert: `insert into customer(cid, name, typ) values(2005149100, 'Testy McTester', 'soho')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(1992234, 'Testy McTester (redux)', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(2005149200, 'Testy McTester (redux)', 'enterprise')`,
testCLIErrors: true, // test for errors in the simplest workflow
testCLICreateWait: true, // test wait on create feature against simplest workflow
testCLIFlagHandling: true, // test flag handling end-to-end against simplest workflow
extraVDiffFlags: map[string]string{
"--max-diff-duration": "2s",
},
},
{
name: "Reshard Merge/split 2 to 3",
Expand All @@ -89,9 +95,9 @@ var testCases = []*testCase{
targetShards: "-40,40-a0,a0-",
tabletBaseID: 400,
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(1993234, 'Testy McTester Jr', 'enterprise'), (1993235, 'Testy McTester II', 'enterprise')`,
retryInsert: `insert into customer(cid, name, typ) values(2005149300, 'Testy McTester Jr', 'enterprise'), (2005149350, 'Testy McTester II', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(1994234, 'Testy McTester III', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(2005149400, 'Testy McTester III', 'enterprise')`,
stop: true,
},
{
Expand All @@ -104,9 +110,9 @@ var testCases = []*testCase{
targetShards: "0",
tabletBaseID: 700,
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(1995234, 'Testy McTester IV', 'enterprise')`,
retryInsert: `insert into customer(cid, name, typ) values(2005149500, 'Testy McTester IV', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(1996234, 'Testy McTester V', 'enterprise'), (1996235, 'Testy McTester VI', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(2005149600, 'Testy McTester V', 'enterprise'), (2005149650, 'Testy McTester VI', 'enterprise')`,
stop: true,
},
}
Expand All @@ -117,8 +123,13 @@ func TestVDiff2(t *testing.T) {
sourceShards := []string{"0"}
targetKs := "customer"
targetShards := []string{"-80", "80-"}
// This forces us to use multiple vstream packets even with small test tables.
extraVTTabletArgs = []string{"--vstream_packet_size=1"}
extraVTTabletArgs = []string{
// This forces us to use multiple vstream packets even with small test tables.
"--vstream_packet_size=1",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}

vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig)
require.NotNil(t, vc)
Expand Down Expand Up @@ -194,12 +205,58 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
err := vc.VtctlClient.ExecuteCommand(args...)
require.NoError(t, err)

for _, shard := range arrTargetShards {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
catchup(t, tab, tc.workflow, tc.typ)
waitForShardsToCatchup := func() {
for _, shard := range arrTargetShards {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
catchup(t, tab, tc.workflow, tc.typ)
}
}

vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)
// Wait for the workflow to finish the copy phase and initially catch up.
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
waitForShardsToCatchup()

if diffDuration, ok := tc.extraVDiffFlags["--max-diff-duration"]; ok {
if !strings.Contains(tc.tables, "customer") {
require.Fail(t, "customer table must be included in the table list to test --max-diff-duration")
}
// Generate enough customer table data so that the table diff gets restarted.
dur, err := time.ParseDuration(diffDuration)
require.NoError(t, err, "could not parse --max-diff-duration %q: %v", diffDuration, err)
seconds := int64(dur.Seconds())
chunkSize := int64(100000)
perSecondCount := int64(1000000)
totalRowsToCreate := seconds * perSecondCount
for i := int64(0); i < totalRowsToCreate; i += chunkSize {
generateMoreCustomers(t, sourceKs, chunkSize)
}

// Wait for the workflow to catch up after all the inserts.
waitForShardsToCatchup()

// This flag is only implemented in vtctldclient.
doVtctldclientVDiff(t, tc.targetKs, tc.workflow, allCellNames, nil, "--max-diff-duration", diffDuration)

// Confirm that the customer table diff was restarted but not others.
tablet := vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0])
stat, err := getDebugVar(t, tablet.Port, []string{"VDiffRestartedTableDiffsCount"})
require.NoError(t, err, "failed to get VDiffRestartedTableDiffsCount stat: %v", err)
customerRestarts := gjson.Parse(stat).Get("customer").Int()
require.Greater(t, customerRestarts, int64(0), "expected VDiffRestartedTableDiffsCount stat to be greater than 0 for the customer table, got %d", customerRestarts)
leadRestarts := gjson.Parse(stat).Get("lead").Int()
require.Equal(t, int64(0), leadRestarts, "expected VDiffRestartedTableDiffsCount stat to be 0 for the Lead table, got %d", leadRestarts)

// Cleanup the created customer records so as not to slow down the rest of the test.
delstmt := fmt.Sprintf("delete from %s.customer order by cid desc limit %d", sourceKs, chunkSize)
for i := int64(0); i < totalRowsToCreate; i += chunkSize {
_, err := vtgateConn.ExecuteFetch(delstmt, int(chunkSize), false)
require.NoError(t, err, "failed to cleanup added customer records: %v", err)
}
// Wait for the workflow to catch up again on the deletes.
waitForShardsToCatchup()
} else {
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)
}

if tc.autoRetryError {
testAutoRetryError(t, tc, allCellNames)
Expand All @@ -209,7 +266,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
testResume(t, tc, allCellNames)
}

// These are done here so that we have a valid workflow to test the commands against
// These are done here so that we have a valid workflow to test the commands against.
if tc.stop {
testStop(t, ksWorkflow, allCellNames)
}
Expand All @@ -225,7 +282,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,

testDelete(t, ksWorkflow, allCellNames)

// create another VDiff record to confirm it gets deleted when the workflow is completed
// Create another VDiff record to confirm it gets deleted when the workflow is completed.
ts := time.Now()
uuid, _ := performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false)
waitForVDiff2ToComplete(t, false, ksWorkflow, allCellNames, uuid, ts)
Expand All @@ -235,7 +292,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "Complete", ksWorkflow)
require.NoError(t, err)

// confirm the VDiff data is deleted for the workflow
// Confirm the VDiff data is deleted for the workflow.
testNoOrphanedData(t, tc.targetKs, tc.workflow, arrTargetShards)
}

Expand Down Expand Up @@ -267,6 +324,7 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell
AutoRetry: true,
UpdateTableStats: true,
TimeoutSeconds: 60,
MaxDiffSeconds: 333,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

333 seems like an oddly-specific duration to me ... is there a reason you went with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only in that I needed to pick something specific and this is the random number I typed. 🙂

},
PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{
SourceCell: "zone1,zone2,zone3,zonefoosource",
Expand All @@ -286,6 +344,7 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell
"--max-report-sample-rows", fmt.Sprintf("%d", expectedOptions.ReportOptions.MaxSampleRows),
"--max-extra-rows-to-compare", fmt.Sprintf("%d", expectedOptions.CoreOptions.MaxExtraRowsToCompare),
"--filtered-replication-wait-time", fmt.Sprintf("%v", time.Duration(expectedOptions.CoreOptions.TimeoutSeconds)*time.Second),
"--max-diff-duration", fmt.Sprintf("%v", time.Duration(expectedOptions.CoreOptions.MaxDiffSeconds)*time.Second),
"--source-cells", expectedOptions.PickerOptions.SourceCell,
"--target-cells", expectedOptions.PickerOptions.TargetCell,
"--tablet-types", expectedOptions.PickerOptions.TabletTypes,
Expand Down Expand Up @@ -381,13 +440,13 @@ func testResume(t *testing.T, tc *testCase, cells string) {
t.Run("Resume", func(t *testing.T) {
ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow)

// confirm the last VDiff is in the expected completed state
// Confirm the last VDiff is in the expected completed state.
uuid, output := performVDiff2Action(t, false, ksWorkflow, cells, "show", "last", false)
jsonOutput := getVDiffInfo(output)
require.Equal(t, "completed", jsonOutput.State)
// save the number of rows compared in previous runs
// Save the number of rows compared in previous runs.
rowsCompared := jsonOutput.RowsCompared
ogTime := time.Now() // the completed_at should be later than this after resuming
ogTime := time.Now() // The completed_at should be later than this after resuming

expectedNewRows := int64(0)
if tc.resumeInsert != "" {
Expand Down Expand Up @@ -424,34 +483,34 @@ func testAutoRetryError(t *testing.T, tc *testCase, cells string) {
t.Run("Auto retry on error", func(t *testing.T) {
ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow)

// confirm the last VDiff is in the expected completed state
// Confirm the last VDiff is in the expected completed state.
uuid, output := performVDiff2Action(t, false, ksWorkflow, cells, "show", "last", false)
jsonOutput := getVDiffInfo(output)
require.Equal(t, "completed", jsonOutput.State)
// save the number of rows compared in the first run
// Save the number of rows compared in the first run.
rowsCompared := jsonOutput.RowsCompared
ogTime := time.Now() // the completed_at should be later than this upon retry
ogTime := time.Now() // The completed_at should be later than this upon retry

// create new data since original VDiff run -- if requested -- to confirm that the rows
// compared is cumulative
// Create new data since original VDiff run -- if requested -- to confirm that the rows
// compared is cumulative.
expectedNewRows := int64(0)
if tc.retryInsert != "" {
res := execVtgateQuery(t, vtgateConn, tc.sourceKs, tc.retryInsert)
expectedNewRows = int64(res.RowsAffected)
}
expectedRows := rowsCompared + expectedNewRows

// update the VDiff to simulate an ephemeral error having occurred
// Update the VDiff to simulate an ephemeral error having occurred.
for _, shard := range strings.Split(tc.targetShards, ",") {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
res, err := tab.QueryTabletWithDB(sqlparser.BuildParsedQuery(sqlSimulateError, sidecarDBIdentifier, sidecarDBIdentifier, encodeString(uuid)).Query, "vt_"+tc.targetKs)
require.NoError(t, err)
// should have updated the vdiff record and at least one vdiff_table record
// Should have updated the vdiff record and at least one vdiff_table record.
require.GreaterOrEqual(t, int(res.RowsAffected), 2)
}

// confirm that the VDiff was retried, able to complete, and we compared the expected
// number of rows in total (original run and retry)
// Confirm that the VDiff was retried, able to complete, and we compared the expected
// number of rows in total (original run and retry).
info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, ogTime)
require.NotNil(t, info)
require.False(t, info.HasMismatch)
Expand Down
12 changes: 8 additions & 4 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

const (
vdiffTimeout = 90 * time.Second // we can leverage auto retry on error with this longer-than-usual timeout
vdiffTimeout = 120 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout
vdiffRetryTimeout = 30 * time.Second
vdiffStatusCheckInterval = 1 * time.Second
vdiffRetryInterval = 5 * time.Second
Expand Down Expand Up @@ -109,7 +109,7 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell
}
ch <- true
return
} else if info.State == "started" { // test the progress report
} else if info.State == "started" { // Test the progress report
// The ETA should always be in the future -- when we're able to estimate
// it -- and the progress percentage should only increase.
// The timestamp format allows us to compare them lexicographically.
Expand Down Expand Up @@ -154,11 +154,15 @@ type expectedVDiff2Result struct {
hasMismatch bool
}

func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result) {
func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result, extraFlags ...string) {
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow)
t.Run(fmt.Sprintf("vtctldclient vdiff %s", ksWorkflow), func(t *testing.T) {
// update-table-stats is needed in order to test progress reports.
uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false, "--auto-retry", "--update-table-stats")
flags := []string{"--auto-retry", "--update-table-stats"}
if len(extraFlags) > 0 {
flags = append(flags, extraFlags...)
}
uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false, flags...)
info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, time.Time{})
require.NotNil(t, info)
require.Equal(t, workflow, info.Workflow)
Expand Down
Loading
Loading