diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 983739a976d..7f560a24f9e 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -23,6 +23,7 @@ import ( "math/rand" "os" "path" + "runtime" "strings" "sync" "testing" @@ -135,13 +136,14 @@ var ( writeMetrics WriteMetrics ) +var ( + countIterations = 5 +) + const ( - maxTableRows = 4096 - workloadDuration = 5 * time.Second - maxConcurrency = 20 - singleConnectionSleepInterval = 2 * time.Millisecond - countIterations = 5 - migrationWaitTimeout = 60 * time.Second + maxTableRows = 4096 + workloadDuration = 5 * time.Second + migrationWaitTimeout = 60 * time.Second ) func resetOpOrder() { @@ -377,6 +379,9 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + rowcount := 0 for { @@ -388,7 +393,7 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri } select { - case <-time.After(time.Second): + case <-ticker.C: continue // Keep looping case <-ctx.Done(): // Break below to the assertion @@ -491,7 +496,7 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error { return err } -func runSingleConnection(ctx context.Context, t *testing.T) { +func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.Duration) { log.Infof("Running single connection") conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -502,6 +507,9 @@ func runSingleConnection(ctx context.Context, t *testing.T) { _, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true) require.Nil(t, err) + ticker := time.NewTicker(sleepInterval) + defer ticker.Stop() + for { switch rand.Int31n(3) { case 0: @@ -515,20 +523,31 @@ func runSingleConnection(ctx context.Context, t *testing.T) { case <-ctx.Done(): log.Infof("Terminating single connection") return - case <-time.After(singleConnectionSleepInterval): + case <-ticker.C: } assert.Nil(t, err) } } func runMultipleConnections(ctx context.Context, t *testing.T) { - log.Infof("Running multiple connections") + // The workload for a 16 vCPU machine is: + // - Concurrency of 16 + // - 2ms interval between queries for each connection + // As the number of vCPUs decreases, so do we decrease concurrency, and increase intervals. For example, on a 8 vCPU machine + // we run concurrency of 8 and interval of 4ms. On a 4 vCPU machine we run concurrency of 4 and interval of 8ms. + maxConcurrency := runtime.NumCPU() + sleepModifier := 16.0 / float64(maxConcurrency) + baseSleepInterval := 2 * time.Millisecond + singleConnectionSleepIntervalNanoseconds := float64(baseSleepInterval.Nanoseconds()) * sleepModifier + sleepInterval := time.Duration(int64(singleConnectionSleepIntervalNanoseconds)) + + log.Infof("Running multiple connections: maxConcurrency=%v, sleep interval=%v", maxConcurrency, sleepInterval) var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() - runSingleConnection(ctx, t) + runSingleConnection(ctx, t, sleepInterval) }() } wg.Wait() diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index 3d99a2cef92..693523cec48 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -247,9 +247,13 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c for _, status := range expectStatuses { statusesMap[string(status)] = true } - startTime := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + lastKnownStatus := "" - for time.Since(startTime) < timeout { + for { countMatchedShards := 0 r := VtgateExecQuery(t, vtParams, query, "") for _, row := range r.Named().Rows { @@ -266,9 +270,12 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c if countMatchedShards == len(shards) { return schema.OnlineDDLStatus(lastKnownStatus) } - time.Sleep(1 * time.Second) + select { + case <-ctx.Done(): + return schema.OnlineDDLStatus(lastKnownStatus) + case <-ticker.C: + } } - return schema.OnlineDDLStatus(lastKnownStatus) } // CheckMigrationArtifacts verifies given migration exists, and checks if it has artifacts