From bdf759562217e8c98bfd8b906d6cf58b55d2d85e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 27 Sep 2023 08:17:50 +0300 Subject: [PATCH 1/8] use ctx timeouts Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 600961e6f0c..19562b722ff 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -25,7 +25,6 @@ import ( "path" "strings" "sync" - "sync/atomic" "testing" "time" @@ -449,8 +448,7 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { testName = fmt.Sprintf("%s/ddl=%s", testName, tcase.onlineDDLTable) } t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := context.Background() t.Run("create schema", func(t *testing.T) { createInitialSchema(t, tcase) @@ -460,6 +458,9 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { }) if tcase.workload { t.Run("workload", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, workloadDuration) + defer cancel() + var wg sync.WaitGroup for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { wg.Add(1) @@ -468,7 +469,6 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { runMultipleConnections(ctx, t, tbl) }(workloadTable) } - timer := time.NewTimer(workloadDuration) if tcase.onlineDDLTable != "" { t.Run("migrating", func(t *testing.T) { @@ -494,9 +494,6 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { }) }) } - - <-timer.C - cancel() // will cause runMultipleConnections() to terminate wg.Wait() }) } @@ -877,7 +874,7 @@ func generateDelete(t *testing.T, tableName string, conn *mysql.Conn) error { return err } -func runSingleConnection(ctx context.Context, t *testing.T, tableName string, done *int64) { +func runSingleConnection(ctx context.Context, t *testing.T, tableName string) { log.Infof("Running single connection on %s", tableName) conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -889,7 +886,7 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, do require.Nil(t, err) for { - if atomic.LoadInt64(done) == 1 { + if ctx.Err() != nil { log.Infof("Terminating single connection") return } @@ -907,17 +904,15 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, do func runMultipleConnections(ctx context.Context, t *testing.T, tableName string) { log.Infof("Running multiple connections") - var done int64 var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() - runSingleConnection(ctx, t, tableName, &done) + runSingleConnection(ctx, t, tableName) }() } <-ctx.Done() - atomic.StoreInt64(&done, 1) log.Infof("Running multiple connections: done") wg.Wait() log.Infof("All connections cancelled") From ee9a45b2f5bdbb7f25a667aa89d423cb3327c674 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 27 Sep 2023 11:02:00 +0300 Subject: [PATCH 2/8] wait for position using direct queries rather than using vtctl Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 19562b722ff..f0db6f35f2b 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -33,6 +33,7 @@ import ( "golang.org/x/exp/slices" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" @@ -375,8 +376,39 @@ func tabletTestName(t *testing.T, tablet *cluster.Vttablet) string { return "" } +func getTabletPosition(t *testing.T, tablet *cluster.Vttablet) replication.Position { + rs := queryTablet(t, tablet, "select @@gtid_executed as gtid_executed", "") + row := rs.Named().Row() + require.NotNil(t, row) + gtidExecuted := row.AsString("gtid_executed", "") + require.NotEmpty(t, gtidExecuted) + pos, err := replication.DecodePositionDefaultFlavor(gtidExecuted, replication.Mysql56FlavorID) + assert.NoError(t, err) + return pos +} + func waitForReplicaCatchup(t *testing.T) { - cluster.WaitForReplicationPos(t, primary, replica, true, time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + primaryPos := getTabletPosition(t, primary) + for { + replicaPos := getTabletPosition(t, replica) + if replicaPos.GTIDSet.Contains(primaryPos.GTIDSet) { + // success + return + } + if !cluster.ValidateReplicationIsHealthy(t, replica) { + assert.FailNow(t, "replication is broken; not waiting for catchup") + return + } + select { + case <-ctx.Done(): + assert.FailNow(t, "timeout waiting for replica to catch up") + return + case <-time.After(time.Second): + // + } + } } func validateMetrics(t *testing.T, tcase *testCase) { From ad1be672e7149a60858ff81f82109f748e0f7dcc Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 27 Sep 2023 11:13:38 +0300 Subject: [PATCH 3/8] proper context handling Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vtgate/foreignkey/stress/fk_stress_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index f0db6f35f2b..b45b1a42425 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -918,10 +918,6 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string) { require.Nil(t, err) for { - if ctx.Err() != nil { - log.Infof("Terminating single connection") - return - } switch rand.Int31n(3) { case 0: _ = generateInsert(t, tableName, conn) @@ -930,7 +926,12 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string) { case 2: _ = generateDelete(t, tableName, conn) } - time.Sleep(singleConnectionSleepInterval) + select { + case <-ctx.Done(): + log.Infof("Terminating single connection") + return + case <-time.After(singleConnectionSleepInterval): + } } } From ace13db43d52161d6f615d7e03f5313e89f215d3 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 27 Sep 2023 11:18:31 +0300 Subject: [PATCH 4/8] reundant wait for context Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index b45b1a42425..d0f4f0a30e2 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -945,10 +945,8 @@ func runMultipleConnections(ctx context.Context, t *testing.T, tableName string) runSingleConnection(ctx, t, tableName) }() } - <-ctx.Done() - log.Infof("Running multiple connections: done") wg.Wait() - log.Infof("All connections cancelled") + log.Infof("Running multiple connections: done") } func wrapWithNoFKChecks(sql string) string { From 599f072802572c752e32a61b4b3f6b381819da4a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 27 Sep 2023 11:29:16 +0300 Subject: [PATCH 5/8] migration_check_interval Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index d0f4f0a30e2..6e94140dd4c 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -308,6 +308,7 @@ func TestMain(m *testing.M) { "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", + "--migration_check_interval", "5s", "--watch_replication_stream", "--vreplication_tablet_type", "primary", } From 37781b8b373635f11b86a791fa745ecd65824b60 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 27 Sep 2023 12:03:45 +0300 Subject: [PATCH 6/8] per test log of test setup Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 6e94140dd4c..fc1ca9795ae 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -472,6 +472,7 @@ type testCase struct { // - Either one of ON UPDATE actions // - Potentially running an Online DDL on an indicated table (this will not work in Vanilla MySQL, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/) func ExecuteFKTest(t *testing.T, tcase *testCase) { + t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) workloadName := "static data" if tcase.workload { workloadName = "workload" From 06a03e27f87e480e31af6670f66795a9eea279df Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 27 Sep 2023 12:18:23 +0300 Subject: [PATCH 7/8] adapt maxconcurrency and singleConnectionSleepInterval Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../endtoend/vtgate/foreignkey/stress/fk_stress_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index fc1ca9795ae..342cfc4511b 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -453,8 +453,8 @@ func TestInitialSetup(t *testing.T) { if val, present := os.LookupEnv("GITHUB_ACTIONS"); present && val != "" { // This is the place to fine tune the stress parameters if GitHub actions are too slow - maxConcurrency = maxConcurrency * 1 - singleConnectionSleepInterval = singleConnectionSleepInterval * 1 + maxConcurrency = maxConcurrency / 2 + singleConnectionSleepInterval = singleConnectionSleepInterval * 2 } t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) } @@ -573,6 +573,10 @@ func TestStressFK(t *testing.T) { } if runOnlineDDL { + // Foreign keys introduce some overhead. We reduce concurrency so that GitHub CI can accommodate. + maxConcurrency = maxConcurrency * 4 / 5 + singleConnectionSleepInterval = singleConnectionSleepInterval * 2 + // Running Online DDL on all test tables. We don't use all of the combinations // presented above; we will run with workload, and suffice with same ON DELETE - ON UPDATE actions. for _, action := range referenceActions { From c8447ad22aac3bf06ed2a50c397bfa50c192d55e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 27 Sep 2023 12:44:00 +0300 Subject: [PATCH 8/8] conditional for running online ddl Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 342cfc4511b..c55ce9eef79 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -555,7 +555,9 @@ func TestStressFK(t *testing.T) { }) runOnlineDDL := false - + if val, present := os.LookupEnv("FK_STRESS_ONLINE_DDL"); present && val != "" { + runOnlineDDL = true + } // Without workload ; with workload for _, workload := range []bool{false, true} { // For any type of ON DELETE action