Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DVT-604] add adaptive rate limiting feature #60

Merged
merged 7 commits into from
Apr 4, 2023
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 112 additions & 31 deletions cmd/loadtest/loadtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"os/signal"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -226,33 +227,38 @@ type (
}
loadTestParams struct {
// inputs
Requests *int64
Concurrency *int64
BatchSize *uint64
TimeLimit *int64
Verbosity *int64
PrettyLogs *bool
ToRandom *bool
URL *url.URL
ChainID *uint64
PrivateKey *string
ToAddress *string
HexSendAmount *string
RateLimit *float64
Mode *string
Function *uint64
Iterations *uint64
ByteCount *uint64
Seed *int64
IsAvail *bool
AvailAppID *uint32
LtAddress *string
DelAddress *string
ForceContractDeploy *bool
ForceGasLimit *uint64
ForceGasPrice *uint64
ShouldProduceSummary *bool
SummaryOutputMode *string
Requests *int64
Concurrency *int64
BatchSize *uint64
TimeLimit *int64
Verbosity *int64
PrettyLogs *bool
ToRandom *bool
URL *url.URL
ChainID *uint64
PrivateKey *string
ToAddress *string
HexSendAmount *string
RateLimit *float64
AdaptiveRateLimit *bool
SteadyStateTxPoolSize *uint64
AdaptiveRateLimitStart *uint64
AdaptiveRateLimitIncrement *uint64
AdaptiveCycleDuration *uint64
Mode *string
Function *uint64
Iterations *uint64
ByteCount *uint64
Seed *int64
IsAvail *bool
AvailAppID *uint32
LtAddress *string
DelAddress *string
ForceContractDeploy *bool
ForceGasLimit *uint64
ForceGasPrice *uint64
ShouldProduceSummary *bool
SummaryOutputMode *string

// Computed
CurrentGas *big.Int
Expand Down Expand Up @@ -285,6 +291,11 @@ func init() {
ltp.ToRandom = LoadtestCmd.PersistentFlags().Bool("to-random", true, "When doing a transfer test, should we send to random addresses rather than DEADBEEFx5")
ltp.HexSendAmount = LoadtestCmd.PersistentFlags().String("send-amount", "0x38D7EA4C68000", "The amount of wei that we'll send every transaction")
ltp.RateLimit = LoadtestCmd.PersistentFlags().Float64("rate-limit", 4, "An overall limit to the number of requests per second. Give a number less than zero to remove this limit all together")
ltp.AdaptiveRateLimit = LoadtestCmd.PersistentFlags().Bool("adaptive-rate-limit", true, "Loadtest automatically adjusts request rate to maximize utilization but prevent congestion")
ltp.SteadyStateTxPoolSize = LoadtestCmd.PersistentFlags().Uint64("steady-state-tx-pool-size", 1000, "Transaction Pool queue size which we use to either increase/decrease requests per second")
ltp.AdaptiveRateLimitStart = LoadtestCmd.PersistentFlags().Uint64("adaptive-rate-limit-start", 2, "Initial rate of requests per second following the slow-start approach of adaptive rate limiting")
ltp.AdaptiveRateLimitIncrement = LoadtestCmd.PersistentFlags().Uint64("adaptive-rate-limit-increment", 10, "Additive increment to rate of requests if txpool below steady state size")
ltp.AdaptiveCycleDuration = LoadtestCmd.PersistentFlags().Uint64("adaptive-cycle-duration-seconds", 20, "Duration in seconeds that adaptive load test will review txpool and determine whether to increase/decrease rate limit")
ltp.Mode = LoadtestCmd.PersistentFlags().StringP("mode", "m", "t", `The testing mode to use. It can be multiple like: "tcdf"
t - sending transactions
d - deploy contract
Expand Down Expand Up @@ -510,6 +521,62 @@ func printResults(lts []loadTestSample) {
log.Info().Uint64("numErrors", numErrors).Msg("Num errors")
}

func cleanHex(hexStr string) string {
// remove 0x suffix if found in the input string
return strings.Replace(hexStr, "0x", "", -1)
rebelArtists marked this conversation as resolved.
Show resolved Hide resolved
}

func getTxPoolSize(rpc *ethrpc.Client) (uint64, error) {
var status map[string]interface{}
err := rpc.Call(&status, "txpool_status")
if err != nil {
return 0, err
}
pendingHex, ok := status["pending"].(string)
if !ok {
return 0, fmt.Errorf("unable to read pending txpool size")
}
queuedHex, ok := status["queued"].(string)
if !ok {
return 0, fmt.Errorf("unable to read queued txpool size")
}

pendingTxPoolSize, err := strconv.ParseUint(cleanHex(pendingHex), 16, 64)
if err != nil {
return 0, fmt.Errorf("unable to parse pending txpool size: %v", err)
}
queuedTxPoolSize, err := strconv.ParseUint(cleanHex(queuedHex), 16, 64)
if err != nil {
return 0, fmt.Errorf("unable to parse queued txpool size: %v", err)
}

return (pendingTxPoolSize + queuedTxPoolSize), nil
}

func updateRateLimit(rl *rate.Limiter, rpc *ethrpc.Client, steadyStateQueueSize uint64, rateLimitIncrement uint64, cycleDuration time.Duration) {
ticker := time.NewTicker(cycleDuration)
defer ticker.Stop()

for range ticker.C {
txPoolSize, err := getTxPoolSize(rpc)
if err != nil {
log.Error().Err(err).Msg("Error getting txpool size")
return
}

if txPoolSize < steadyStateQueueSize {
// additively increment requests per second if txpool less than queue steady state
newRateLimit := rate.Limit(float64(rl.Limit()) + float64(rateLimitIncrement))
rl.SetLimit(newRateLimit)
log.Trace().Interface("New Rate Limit (RPS)", rl.Limit()).Interface("Current Tx Pool Size", txPoolSize).Interface("Steady State Tx Pool Size", steadyStateQueueSize).Msg("Increased rate limit")
rebelArtists marked this conversation as resolved.
Show resolved Hide resolved
} else if txPoolSize > steadyStateQueueSize {
// halve rate limit requests per second if txpool greater than queue steady state
rl.SetLimit(rl.Limit() / 2)
log.Trace().Interface("New Rate Limit (RPS)", rl.Limit()).Interface("Current Tx Pool Size", txPoolSize).Interface("Steady State Tx Pool Size", steadyStateQueueSize).Msg("Backed off rate limit")
}
}
}

func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) error {

ltp := inputLoadTestParams
Expand All @@ -521,12 +588,25 @@ func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) erro
chainID := new(big.Int).SetUint64(*ltp.ChainID)
privateKey := ltp.ECDSAPrivateKey
mode := *ltp.Mode
steadyStateTxPoolSize := *ltp.SteadyStateTxPoolSize
adaptiveRateLimitIncrement := *ltp.AdaptiveRateLimitIncrement
var rl *rate.Limiter

rl := rate.NewLimiter(rate.Limit(*ltp.RateLimit), 1)
if *ltp.RateLimit <= 0.0 {
if *ltp.AdaptiveRateLimit {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥

// start slow with adaptive rate limiting and we'll increase limit per feedback loop
rl = rate.NewLimiter(rate.Limit(*ltp.AdaptiveRateLimitStart), 1)
} else {
rl = rate.NewLimiter(rate.Limit(*ltp.RateLimit), 1)
}

if *ltp.RateLimit <= 0.0 || *ltp.AdaptiveRateLimitStart <= 0.0 {
rl = nil
}

if rl != nil && *ltp.AdaptiveRateLimit {
go updateRateLimit(rl, rpc, steadyStateTxPoolSize, adaptiveRateLimitIncrement, time.Duration(*ltp.AdaptiveCycleDuration)*time.Second)
}

tops, err := bind.NewKeyedTransactorWithChainID(privateKey, chainID)
tops = configureTransactOpts(tops)
if err != nil {
Expand Down Expand Up @@ -783,7 +863,7 @@ func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) erro
log.Error().Err(err).Msg("there was an issue waiting for all transactions to be mined")
}

lightSummary(ctx, c, rpc, startBlockNumber, startNonce, finalBlockNumber, currentNonce)
lightSummary(ctx, c, rpc, startBlockNumber, startNonce, finalBlockNumber, currentNonce, rl)
if *ltp.ShouldProduceSummary {
err = summarizeTransactions(ctx, c, rpc, startBlockNumber, startNonce, finalBlockNumber, currentNonce)
if err != nil {
Expand All @@ -793,7 +873,7 @@ func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) erro
return nil
}

func lightSummary(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client, startBlockNumber, startNonce, endBlockNumber, endNonce uint64) {
func lightSummary(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client, startBlockNumber, startNonce, endBlockNumber, endNonce uint64, rl *rate.Limiter) {
startBlock, err := c.BlockByNumber(ctx, new(big.Int).SetUint64(startBlockNumber))
if err != nil {
log.Error().Err(err).Msg("unable to get start block for light summary")
Expand All @@ -816,6 +896,7 @@ func lightSummary(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client,
Int("transactionCount", len(loadTestResults)).
Float64("testDuration", testDuration.Seconds()).
Float64("tps", tps).
Float64("final rate limit", float64(rl.Limit())).
rebelArtists marked this conversation as resolved.
Show resolved Hide resolved
Msg("rough test summary (ignores errors)")
}

Expand Down