Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VDiff: Support a max diff time for tables #14786

Merged
merged 22 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/common/scripts/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ vttablet \
--heartbeat_enable \
--heartbeat_interval=250ms \
--heartbeat_on_demand_duration=5s \
--vreplication_experimental_flags=7 \
mattlord marked this conversation as resolved.
Show resolved Hide resolved
> $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 &

# Block waiting for the tablet to be listening
Expand Down
3 changes: 3 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var (
Wait bool
WaitUpdateInterval time.Duration
AutoRetry bool
MaxDiffDuration time.Duration
}{}

deleteOptions = struct {
Expand Down Expand Up @@ -291,6 +292,7 @@ func commandCreate(cmd *cobra.Command, args []string) error {
WaitUpdateInterval: protoutil.DurationToProto(createOptions.WaitUpdateInterval),
AutoRetry: createOptions.AutoRetry,
MaxReportSampleRows: createOptions.MaxReportSampleRows,
MaxDiffDuration: protoutil.DurationToProto(createOptions.MaxDiffDuration),
})

if err != nil {
Expand Down Expand Up @@ -883,6 +885,7 @@ func registerCommands(root *cobra.Command) {
create.Flags().DurationVar(&createOptions.WaitUpdateInterval, "wait-update-interval", time.Duration(1*time.Minute), "When waiting on a vdiff to finish, check and display the current status this often.")
create.Flags().BoolVar(&createOptions.AutoRetry, "auto-retry", true, "Should this vdiff automatically retry and continue in case of recoverable errors.")
create.Flags().BoolVar(&createOptions.UpdateTableStats, "update-table-stats", false, "Update the table statistics, using ANALYZE TABLE, on each table involved in the VDiff during initialization. This will ensure that progress estimates are as accurate as possible -- but it does involve locks and can potentially impact query processing on the target keyspace.")
create.Flags().DurationVar(&createOptions.MaxDiffDuration, "max-diff-duration", time.Duration(0), "How long should an individual table diff run before being stopped and restarted in order to lessen the impact on tablets due to holding open database snapshots for long periods of time (0 is the default and means no time limit).")
mattlord marked this conversation as resolved.
Show resolved Hide resolved
base.AddCommand(create)

base.AddCommand(delete)
Expand Down
73 changes: 63 additions & 10 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
)

Expand All @@ -51,6 +53,7 @@ type testCase struct {
testCLIErrors bool // test CLI errors against this workflow (only needs to be done once)
testCLICreateWait bool // test CLI create and wait until done against this workflow (only needs to be done once)
testCLIFlagHandling bool // test vtctldclient flag handling from end-to-end
extraVDiffFlags map[string]string
}

const (
Expand Down Expand Up @@ -78,6 +81,10 @@ var testCases = []*testCase{
testCLIErrors: true, // test for errors in the simplest workflow
testCLICreateWait: true, // test wait on create feature against simplest workflow
testCLIFlagHandling: true, // test flag handling end-to-end against simplest workflow
// Uncomment this to test the max-diff-duration flag.
extraVDiffFlags: map[string]string{
"--max-diff-duration": "1s",
},
},
{
name: "Reshard Merge/split 2 to 3",
Expand Down Expand Up @@ -117,8 +124,13 @@ func TestVDiff2(t *testing.T) {
sourceShards := []string{"0"}
targetKs := "customer"
targetShards := []string{"-80", "80-"}
// This forces us to use multiple vstream packets even with small test tables.
extraVTTabletArgs = []string{"--vstream_packet_size=1"}
extraVTTabletArgs = []string{
// This forces us to use multiple vstream packets even with small test tables.
"--vstream_packet_size=1",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}

vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig)
require.NotNil(t, vc)
Expand Down Expand Up @@ -194,12 +206,51 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
err := vc.VtctlClient.ExecuteCommand(args...)
require.NoError(t, err)

for _, shard := range arrTargetShards {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
catchup(t, tab, tc.workflow, tc.typ)
}
if diffDuration, ok := tc.extraVDiffFlags["--max-diff-duration"]; ok {
if !strings.Contains(tc.tables, "customer") {
require.Fail(t, "customer table must be included in the table list to test --max-diff-duration")
}
// Generate enough customer table data so that the table diff gets restarted.
dur, err := time.ParseDuration(diffDuration)
require.NoError(t, err, "could not parse --max-diff-duration %q: %v", diffDuration, err)
seconds := int64(dur.Seconds())
chunkSize := int64(100000)
perSecondCount := int64(500000)
totalRowsToCreate := seconds * perSecondCount
for i := int64(0); i < totalRowsToCreate; i += chunkSize {
generateMoreCustomers(t, sourceKs, chunkSize)
}

vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)
// Wait for the workflow to finish the copy phase and catch up.
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
for _, shard := range arrTargetShards {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
catchup(t, tab, tc.workflow, tc.typ)
}

// This flag is only implemented in vtctldclient.
doVtctldclientVDiff(t, tc.targetKs, tc.workflow, allCellNames, nil, "--max-diff-duration", diffDuration)

// Confirm that the customer table diff was restarted but not others.
tablet := vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0])
stat, err := getDebugVar(t, tablet.Port, []string{"VDiffRestartedTableDiffsCount"})
require.NoError(t, err, "failed to get VDiffRestartedTableDiffsCount stat: %v", err)
customerRestarts := gjson.Parse(stat).Get("customer").Int()
require.Greater(t, customerRestarts, int64(0), "expected VDiffRestartedTableDiffsCount stat to be greater than 0 for the customer table, got %d", customerRestarts)
leadRestarts := gjson.Parse(stat).Get("lead").Int()
require.Equal(t, int64(0), leadRestarts, "expected VDiffRestartedTableDiffsCount stat to be 0 for the Lead table, got %d", leadRestarts)

// Cleanup the created customer records so as not to slow down the rest of the test.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer order by cid desc limit %d", sourceKs, totalRowsToCreate), -1, false)
require.NoError(t, err, "failed to cleanup added customer records: %v", err)
} else {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
for _, shard := range arrTargetShards {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
catchup(t, tab, tc.workflow, tc.typ)
}
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)
}

if tc.autoRetryError {
testAutoRetryError(t, tc, allCellNames)
Expand All @@ -209,7 +260,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
testResume(t, tc, allCellNames)
}

// These are done here so that we have a valid workflow to test the commands against
// These are done here so that we have a valid workflow to test the commands against.
if tc.stop {
testStop(t, ksWorkflow, allCellNames)
}
Expand All @@ -225,7 +276,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,

testDelete(t, ksWorkflow, allCellNames)

// create another VDiff record to confirm it gets deleted when the workflow is completed
// Create another VDiff record to confirm it gets deleted when the workflow is completed.
ts := time.Now()
uuid, _ := performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false)
waitForVDiff2ToComplete(t, false, ksWorkflow, allCellNames, uuid, ts)
Expand All @@ -235,7 +286,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "Complete", ksWorkflow)
require.NoError(t, err)

// confirm the VDiff data is deleted for the workflow
// Confirm the VDiff data is deleted for the workflow.
testNoOrphanedData(t, tc.targetKs, tc.workflow, arrTargetShards)
}

Expand Down Expand Up @@ -267,6 +318,7 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell
AutoRetry: true,
UpdateTableStats: true,
TimeoutSeconds: 60,
MaxDiffSeconds: 333,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

333 seems like an oddly-specific duration to me ... is there a reason you went with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only in that I needed to pick something specific and this is the random number I typed. 🙂

},
PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{
SourceCell: "zone1,zone2,zone3,zonefoosource",
Expand All @@ -286,6 +338,7 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell
"--max-report-sample-rows", fmt.Sprintf("%d", expectedOptions.ReportOptions.MaxSampleRows),
"--max-extra-rows-to-compare", fmt.Sprintf("%d", expectedOptions.CoreOptions.MaxExtraRowsToCompare),
"--filtered-replication-wait-time", fmt.Sprintf("%v", time.Duration(expectedOptions.CoreOptions.TimeoutSeconds)*time.Second),
"--max-diff-duration", fmt.Sprintf("%v", time.Duration(expectedOptions.CoreOptions.MaxDiffSeconds)*time.Second),
"--source-cells", expectedOptions.PickerOptions.SourceCell,
"--target-cells", expectedOptions.PickerOptions.TargetCell,
"--tablet-types", expectedOptions.PickerOptions.TabletTypes,
Expand Down
12 changes: 8 additions & 4 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

const (
vdiffTimeout = 90 * time.Second // we can leverage auto retry on error with this longer-than-usual timeout
vdiffTimeout = 120 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout
vdiffRetryTimeout = 30 * time.Second
vdiffStatusCheckInterval = 1 * time.Second
vdiffRetryInterval = 5 * time.Second
Expand Down Expand Up @@ -109,7 +109,7 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell
}
ch <- true
return
} else if info.State == "started" { // test the progress report
} else if info.State == "started" { // Test the progress report
// The ETA should always be in the future -- when we're able to estimate
// it -- and the progress percentage should only increase.
// The timestamp format allows us to compare them lexicographically.
Expand Down Expand Up @@ -154,11 +154,15 @@ type expectedVDiff2Result struct {
hasMismatch bool
}

func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result) {
func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result, extraFlags ...string) {
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow)
t.Run(fmt.Sprintf("vtctldclient vdiff %s", ksWorkflow), func(t *testing.T) {
// update-table-stats is needed in order to test progress reports.
uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false, "--auto-retry", "--update-table-stats")
flags := []string{"--auto-retry", "--update-table-stats"}
if len(extraFlags) > 0 {
flags = append(flags, extraFlags...)
}
uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false, flags...)
info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, time.Time{})
require.NotNil(t, info)
require.Equal(t, workflow, info.Workflow)
Expand Down
Loading
Loading