From abae17bd4cd745214b0c6a281b8460be7e38ec1a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 17 Oct 2023 13:37:11 +0300 Subject: [PATCH 1/2] onlineddl_vrepl_stress: fix flakiness caused by timeouts Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl_vrepl_mini_stress_test.go | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 107050c2708..3d88a01b1ce 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -18,6 +18,7 @@ package vreplstress import ( "context" + "errors" "flag" "fmt" "math/rand" @@ -25,7 +26,6 @@ import ( "path" "strings" "sync" - "sync/atomic" "testing" "time" @@ -138,6 +138,7 @@ var ( const ( maxTableRows = 4096 + workloadDuration = 5 * time.Second maxConcurrency = 20 singleConnectionSleepInterval = 2 * time.Millisecond countIterations = 5 @@ -227,6 +228,8 @@ func TestMain(m *testing.M) { func TestSchemaChange(t *testing.T) { defer cluster.PanicHandler(t) + ctx := context.Background() + shards = clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) @@ -251,16 +254,17 @@ func TestSchemaChange(t *testing.T) { // that our testing/metrics logic is sound in the first place. testName := fmt.Sprintf("workload without ALTER TABLE %d/%d", (i + 1), countIterations) t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) initTable(t) + + ctx, cancel := context.WithTimeout(ctx, workloadDuration) + defer cancel() + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() runMultipleConnections(ctx, t) }() - time.Sleep(5 * time.Second) - cancel() // will cause runMultipleConnections() to terminate wg.Wait() testSelectTableMetrics(t) }) @@ -285,7 +289,7 @@ func TestSchemaChange(t *testing.T) { // the vreplication/ALTER TABLE did not corrupt our data and we are happy. testName := fmt.Sprintf("ALTER TABLE with workload %d/%d", (i + 1), countIterations) t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() t.Run("create schema", func(t *testing.T) { testWithInitialSchema(t) }) @@ -293,6 +297,9 @@ func TestSchemaChange(t *testing.T) { initTable(t) }) t.Run("migrate", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup wg.Add(1) go func() { @@ -302,7 +309,7 @@ func TestSchemaChange(t *testing.T) { hint := fmt.Sprintf("hint-alter-with-workload-%d", i) uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), onlineDDLStrategy, "vtgate", hint) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) - cancel() // will cause runMultipleConnections() to terminate + cancel() // Now that the migration is complete, we can stop the workload. wg.Wait() }) t.Run("validate metrics", func(t *testing.T) { @@ -485,7 +492,7 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error { return err } -func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { +func runSingleConnection(ctx context.Context, t *testing.T) { log.Infof("Running single connection") conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -497,10 +504,6 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { require.Nil(t, err) for { - if atomic.LoadInt64(done) == 1 { - log.Infof("Terminating single connection") - return - } switch rand.Int31n(3) { case 0: err = generateInsert(t, conn) @@ -509,27 +512,30 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { case 2: err = generateDelete(t, conn) } - assert.Nil(t, err) - time.Sleep(singleConnectionSleepInterval) + select { + case <-ctx.Done(): + log.Infof("Terminating single connection") + return + case <-time.After(singleConnectionSleepInterval): + } + if !errors.Is(err, context.DeadlineExceeded) { // this is an acceptable error + assert.Nil(t, err) + } } } func runMultipleConnections(ctx context.Context, t *testing.T) { log.Infof("Running multiple connections") - var done int64 var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() - runSingleConnection(ctx, t, &done) + runSingleConnection(ctx, t) }() } - <-ctx.Done() - atomic.StoreInt64(&done, 1) - log.Infof("Running multiple connections: done") wg.Wait() - log.Infof("All connections cancelled") + log.Infof("Running multiple connections: done") } func initTable(t *testing.T) { From 2bb752b088e5e74209fd9b58b7004e9710b7eb81 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 17 Oct 2023 17:24:13 +0300 Subject: [PATCH 2/2] no need to check for context.DeadlineExceeded because we exist in case of deadline before looking into the error Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vrepl_stress/onlineddl_vrepl_mini_stress_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 3d88a01b1ce..983739a976d 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -18,7 +18,6 @@ package vreplstress import ( "context" - "errors" "flag" "fmt" "math/rand" @@ -518,9 +517,7 @@ func runSingleConnection(ctx context.Context, t *testing.T) { return case <-time.After(singleConnectionSleepInterval): } - if !errors.Is(err, context.DeadlineExceeded) { // this is an acceptable error - assert.Nil(t, err) - } + assert.Nil(t, err) } }