diff --git a/cmd/loadtest/loadtest.go b/cmd/loadtest/loadtest.go index 96015bd8..0b680b20 100644 --- a/cmd/loadtest/loadtest.go +++ b/cmd/loadtest/loadtest.go @@ -31,6 +31,7 @@ import ( "os/signal" "regexp" "sort" + "strconv" "strings" "sync" "time" @@ -226,33 +227,38 @@ type ( } loadTestParams struct { // inputs - Requests *int64 - Concurrency *int64 - BatchSize *uint64 - TimeLimit *int64 - Verbosity *int64 - PrettyLogs *bool - ToRandom *bool - URL *url.URL - ChainID *uint64 - PrivateKey *string - ToAddress *string - HexSendAmount *string - RateLimit *float64 - Mode *string - Function *uint64 - Iterations *uint64 - ByteCount *uint64 - Seed *int64 - IsAvail *bool - AvailAppID *uint32 - LtAddress *string - DelAddress *string - ForceContractDeploy *bool - ForceGasLimit *uint64 - ForceGasPrice *uint64 - ShouldProduceSummary *bool - SummaryOutputMode *string + Requests *int64 + Concurrency *int64 + BatchSize *uint64 + TimeLimit *int64 + Verbosity *int64 + PrettyLogs *bool + ToRandom *bool + URL *url.URL + ChainID *uint64 + PrivateKey *string + ToAddress *string + HexSendAmount *string + RateLimit *float64 + AdaptiveRateLimit *bool + SteadyStateTxPoolSize *uint64 + AdaptiveRateLimitStart *uint64 + AdaptiveRateLimitIncrement *uint64 + AdaptiveCycleDuration *uint64 + Mode *string + Function *uint64 + Iterations *uint64 + ByteCount *uint64 + Seed *int64 + IsAvail *bool + AvailAppID *uint32 + LtAddress *string + DelAddress *string + ForceContractDeploy *bool + ForceGasLimit *uint64 + ForceGasPrice *uint64 + ShouldProduceSummary *bool + SummaryOutputMode *string // Computed CurrentGas *big.Int @@ -285,6 +291,11 @@ func init() { ltp.ToRandom = LoadtestCmd.PersistentFlags().Bool("to-random", true, "When doing a transfer test, should we send to random addresses rather than DEADBEEFx5") ltp.HexSendAmount = LoadtestCmd.PersistentFlags().String("send-amount", "0x38D7EA4C68000", "The amount of wei that we'll send every transaction") ltp.RateLimit = LoadtestCmd.PersistentFlags().Float64("rate-limit", 4, "An overall limit to the number of requests per second. Give a number less than zero to remove this limit all together") + ltp.AdaptiveRateLimit = LoadtestCmd.PersistentFlags().Bool("adaptive-rate-limit", true, "Loadtest automatically adjusts request rate to maximize utilization but prevent congestion") + ltp.SteadyStateTxPoolSize = LoadtestCmd.PersistentFlags().Uint64("steady-state-tx-pool-size", 1000, "Transaction Pool queue size which we use to either increase/decrease requests per second") + ltp.AdaptiveRateLimitStart = LoadtestCmd.PersistentFlags().Uint64("adaptive-rate-limit-start", 2, "Initial rate of requests per second following the slow-start approach of adaptive rate limiting") + ltp.AdaptiveRateLimitIncrement = LoadtestCmd.PersistentFlags().Uint64("adaptive-rate-limit-increment", 10, "Additive increment to rate of requests if txpool below steady state size") + ltp.AdaptiveCycleDuration = LoadtestCmd.PersistentFlags().Uint64("adaptive-cycle-duration-seconds", 20, "Duration in seconeds that adaptive load test will review txpool and determine whether to increase/decrease rate limit") ltp.Mode = LoadtestCmd.PersistentFlags().StringP("mode", "m", "t", `The testing mode to use. It can be multiple like: "tcdf" t - sending transactions d - deploy contract @@ -510,6 +521,62 @@ func printResults(lts []loadTestSample) { log.Info().Uint64("numErrors", numErrors).Msg("Num errors") } +func cleanHex(hexStr string) string { + // remove 0x prefix if found in the input string + return strings.TrimPrefix(hexStr, "0x") +} + +func getTxPoolSize(rpc *ethrpc.Client) (uint64, error) { + var status map[string]interface{} + err := rpc.Call(&status, "txpool_status") + if err != nil { + return 0, err + } + pendingHex, ok := status["pending"].(string) + if !ok { + return 0, fmt.Errorf("unable to read pending txpool size") + } + queuedHex, ok := status["queued"].(string) + if !ok { + return 0, fmt.Errorf("unable to read queued txpool size") + } + + pendingTxPoolSize, err := strconv.ParseUint(cleanHex(pendingHex), 16, 64) + if err != nil { + return 0, fmt.Errorf("unable to parse pending txpool size: %v", err) + } + queuedTxPoolSize, err := strconv.ParseUint(cleanHex(queuedHex), 16, 64) + if err != nil { + return 0, fmt.Errorf("unable to parse queued txpool size: %v", err) + } + + return (pendingTxPoolSize + queuedTxPoolSize), nil +} + +func updateRateLimit(rl *rate.Limiter, rpc *ethrpc.Client, steadyStateQueueSize uint64, rateLimitIncrement uint64, cycleDuration time.Duration) { + ticker := time.NewTicker(cycleDuration) + defer ticker.Stop() + + for range ticker.C { + txPoolSize, err := getTxPoolSize(rpc) + if err != nil { + log.Error().Err(err).Msg("Error getting txpool size") + return + } + + if txPoolSize < steadyStateQueueSize { + // additively increment requests per second if txpool less than queue steady state + newRateLimit := rate.Limit(float64(rl.Limit()) + float64(rateLimitIncrement)) + rl.SetLimit(newRateLimit) + log.Trace().Float64("New Rate Limit (RPS)", float64(rl.Limit())).Uint64("Current Tx Pool Size", txPoolSize).Uint64("Steady State Tx Pool Size", steadyStateQueueSize).Msg("Increased rate limit") + } else if txPoolSize > steadyStateQueueSize { + // halve rate limit requests per second if txpool greater than queue steady state + rl.SetLimit(rl.Limit() / 2) + log.Trace().Float64("New Rate Limit (RPS)", float64(rl.Limit())).Uint64("Current Tx Pool Size", txPoolSize).Uint64("Steady State Tx Pool Size", steadyStateQueueSize).Msg("Backed off rate limit") + } + } +} + func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) error { ltp := inputLoadTestParams @@ -521,12 +588,25 @@ func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) erro chainID := new(big.Int).SetUint64(*ltp.ChainID) privateKey := ltp.ECDSAPrivateKey mode := *ltp.Mode + steadyStateTxPoolSize := *ltp.SteadyStateTxPoolSize + adaptiveRateLimitIncrement := *ltp.AdaptiveRateLimitIncrement + var rl *rate.Limiter - rl := rate.NewLimiter(rate.Limit(*ltp.RateLimit), 1) - if *ltp.RateLimit <= 0.0 { + if *ltp.AdaptiveRateLimit { + // start slow with adaptive rate limiting and we'll increase limit per feedback loop + rl = rate.NewLimiter(rate.Limit(*ltp.AdaptiveRateLimitStart), 1) + } else { + rl = rate.NewLimiter(rate.Limit(*ltp.RateLimit), 1) + } + + if *ltp.RateLimit <= 0.0 || *ltp.AdaptiveRateLimitStart <= 0.0 { rl = nil } + if rl != nil && *ltp.AdaptiveRateLimit { + go updateRateLimit(rl, rpc, steadyStateTxPoolSize, adaptiveRateLimitIncrement, time.Duration(*ltp.AdaptiveCycleDuration)*time.Second) + } + tops, err := bind.NewKeyedTransactorWithChainID(privateKey, chainID) tops = configureTransactOpts(tops) if err != nil { @@ -783,7 +863,7 @@ func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) erro log.Error().Err(err).Msg("there was an issue waiting for all transactions to be mined") } - lightSummary(ctx, c, rpc, startBlockNumber, startNonce, finalBlockNumber, currentNonce) + lightSummary(ctx, c, rpc, startBlockNumber, startNonce, finalBlockNumber, currentNonce, rl) if *ltp.ShouldProduceSummary { err = summarizeTransactions(ctx, c, rpc, startBlockNumber, startNonce, finalBlockNumber, currentNonce) if err != nil { @@ -793,7 +873,7 @@ func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) erro return nil } -func lightSummary(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client, startBlockNumber, startNonce, endBlockNumber, endNonce uint64) { +func lightSummary(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client, startBlockNumber, startNonce, endBlockNumber, endNonce uint64, rl *rate.Limiter) { startBlock, err := c.BlockByNumber(ctx, new(big.Int).SetUint64(startBlockNumber)) if err != nil { log.Error().Err(err).Msg("unable to get start block for light summary") @@ -816,6 +896,7 @@ func lightSummary(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client, Int("transactionCount", len(loadTestResults)). Float64("testDuration", testDuration.Seconds()). Float64("tps", tps). + Float64("final rate limit", float64(rl.Limit())). Msg("rough test summary (ignores errors)") }