Skip to content

Commit

Permalink
Merge pull request #60 from maticnetwork/dan/aimd_rate_limiting
Browse files Browse the repository at this point in the history
[DVT-604] add adaptive rate limiting feature
  • Loading branch information
rebelArtists authored Apr 4, 2023
2 parents f1cd341 + c38964b commit 903cdf5
Showing 1 changed file with 112 additions and 31 deletions.
143 changes: 112 additions & 31 deletions cmd/loadtest/loadtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"os/signal"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -226,33 +227,38 @@ type (
}
loadTestParams struct {
// inputs
Requests *int64
Concurrency *int64
BatchSize *uint64
TimeLimit *int64
Verbosity *int64
PrettyLogs *bool
ToRandom *bool
URL *url.URL
ChainID *uint64
PrivateKey *string
ToAddress *string
HexSendAmount *string
RateLimit *float64
Mode *string
Function *uint64
Iterations *uint64
ByteCount *uint64
Seed *int64
IsAvail *bool
AvailAppID *uint32
LtAddress *string
DelAddress *string
ForceContractDeploy *bool
ForceGasLimit *uint64
ForceGasPrice *uint64
ShouldProduceSummary *bool
SummaryOutputMode *string
Requests *int64
Concurrency *int64
BatchSize *uint64
TimeLimit *int64
Verbosity *int64
PrettyLogs *bool
ToRandom *bool
URL *url.URL
ChainID *uint64
PrivateKey *string
ToAddress *string
HexSendAmount *string
RateLimit *float64
AdaptiveRateLimit *bool
SteadyStateTxPoolSize *uint64
AdaptiveRateLimitStart *uint64
AdaptiveRateLimitIncrement *uint64
AdaptiveCycleDuration *uint64
Mode *string
Function *uint64
Iterations *uint64
ByteCount *uint64
Seed *int64
IsAvail *bool
AvailAppID *uint32
LtAddress *string
DelAddress *string
ForceContractDeploy *bool
ForceGasLimit *uint64
ForceGasPrice *uint64
ShouldProduceSummary *bool
SummaryOutputMode *string

// Computed
CurrentGas *big.Int
Expand Down Expand Up @@ -285,6 +291,11 @@ func init() {
ltp.ToRandom = LoadtestCmd.PersistentFlags().Bool("to-random", true, "When doing a transfer test, should we send to random addresses rather than DEADBEEFx5")
ltp.HexSendAmount = LoadtestCmd.PersistentFlags().String("send-amount", "0x38D7EA4C68000", "The amount of wei that we'll send every transaction")
ltp.RateLimit = LoadtestCmd.PersistentFlags().Float64("rate-limit", 4, "An overall limit to the number of requests per second. Give a number less than zero to remove this limit all together")
ltp.AdaptiveRateLimit = LoadtestCmd.PersistentFlags().Bool("adaptive-rate-limit", true, "Loadtest automatically adjusts request rate to maximize utilization but prevent congestion")
ltp.SteadyStateTxPoolSize = LoadtestCmd.PersistentFlags().Uint64("steady-state-tx-pool-size", 1000, "Transaction Pool queue size which we use to either increase/decrease requests per second")
ltp.AdaptiveRateLimitStart = LoadtestCmd.PersistentFlags().Uint64("adaptive-rate-limit-start", 2, "Initial rate of requests per second following the slow-start approach of adaptive rate limiting")
ltp.AdaptiveRateLimitIncrement = LoadtestCmd.PersistentFlags().Uint64("adaptive-rate-limit-increment", 10, "Additive increment to rate of requests if txpool below steady state size")
ltp.AdaptiveCycleDuration = LoadtestCmd.PersistentFlags().Uint64("adaptive-cycle-duration-seconds", 20, "Duration in seconeds that adaptive load test will review txpool and determine whether to increase/decrease rate limit")
ltp.Mode = LoadtestCmd.PersistentFlags().StringP("mode", "m", "t", `The testing mode to use. It can be multiple like: "tcdf"
t - sending transactions
d - deploy contract
Expand Down Expand Up @@ -510,6 +521,62 @@ func printResults(lts []loadTestSample) {
log.Info().Uint64("numErrors", numErrors).Msg("Num errors")
}

func cleanHex(hexStr string) string {
// remove 0x prefix if found in the input string
return strings.TrimPrefix(hexStr, "0x")
}

func getTxPoolSize(rpc *ethrpc.Client) (uint64, error) {
var status map[string]interface{}
err := rpc.Call(&status, "txpool_status")
if err != nil {
return 0, err
}
pendingHex, ok := status["pending"].(string)
if !ok {
return 0, fmt.Errorf("unable to read pending txpool size")
}
queuedHex, ok := status["queued"].(string)
if !ok {
return 0, fmt.Errorf("unable to read queued txpool size")
}

pendingTxPoolSize, err := strconv.ParseUint(cleanHex(pendingHex), 16, 64)
if err != nil {
return 0, fmt.Errorf("unable to parse pending txpool size: %v", err)
}
queuedTxPoolSize, err := strconv.ParseUint(cleanHex(queuedHex), 16, 64)
if err != nil {
return 0, fmt.Errorf("unable to parse queued txpool size: %v", err)
}

return (pendingTxPoolSize + queuedTxPoolSize), nil
}

func updateRateLimit(rl *rate.Limiter, rpc *ethrpc.Client, steadyStateQueueSize uint64, rateLimitIncrement uint64, cycleDuration time.Duration) {
ticker := time.NewTicker(cycleDuration)
defer ticker.Stop()

for range ticker.C {
txPoolSize, err := getTxPoolSize(rpc)
if err != nil {
log.Error().Err(err).Msg("Error getting txpool size")
return
}

if txPoolSize < steadyStateQueueSize {
// additively increment requests per second if txpool less than queue steady state
newRateLimit := rate.Limit(float64(rl.Limit()) + float64(rateLimitIncrement))
rl.SetLimit(newRateLimit)
log.Trace().Float64("New Rate Limit (RPS)", float64(rl.Limit())).Uint64("Current Tx Pool Size", txPoolSize).Uint64("Steady State Tx Pool Size", steadyStateQueueSize).Msg("Increased rate limit")
} else if txPoolSize > steadyStateQueueSize {
// halve rate limit requests per second if txpool greater than queue steady state
rl.SetLimit(rl.Limit() / 2)
log.Trace().Float64("New Rate Limit (RPS)", float64(rl.Limit())).Uint64("Current Tx Pool Size", txPoolSize).Uint64("Steady State Tx Pool Size", steadyStateQueueSize).Msg("Backed off rate limit")
}
}
}

func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) error {

ltp := inputLoadTestParams
Expand All @@ -521,12 +588,25 @@ func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) erro
chainID := new(big.Int).SetUint64(*ltp.ChainID)
privateKey := ltp.ECDSAPrivateKey
mode := *ltp.Mode
steadyStateTxPoolSize := *ltp.SteadyStateTxPoolSize
adaptiveRateLimitIncrement := *ltp.AdaptiveRateLimitIncrement
var rl *rate.Limiter

rl := rate.NewLimiter(rate.Limit(*ltp.RateLimit), 1)
if *ltp.RateLimit <= 0.0 {
if *ltp.AdaptiveRateLimit {
// start slow with adaptive rate limiting and we'll increase limit per feedback loop
rl = rate.NewLimiter(rate.Limit(*ltp.AdaptiveRateLimitStart), 1)
} else {
rl = rate.NewLimiter(rate.Limit(*ltp.RateLimit), 1)
}

if *ltp.RateLimit <= 0.0 || *ltp.AdaptiveRateLimitStart <= 0.0 {
rl = nil
}

if rl != nil && *ltp.AdaptiveRateLimit {
go updateRateLimit(rl, rpc, steadyStateTxPoolSize, adaptiveRateLimitIncrement, time.Duration(*ltp.AdaptiveCycleDuration)*time.Second)
}

tops, err := bind.NewKeyedTransactorWithChainID(privateKey, chainID)
tops = configureTransactOpts(tops)
if err != nil {
Expand Down Expand Up @@ -783,7 +863,7 @@ func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) erro
log.Error().Err(err).Msg("there was an issue waiting for all transactions to be mined")
}

lightSummary(ctx, c, rpc, startBlockNumber, startNonce, finalBlockNumber, currentNonce)
lightSummary(ctx, c, rpc, startBlockNumber, startNonce, finalBlockNumber, currentNonce, rl)
if *ltp.ShouldProduceSummary {
err = summarizeTransactions(ctx, c, rpc, startBlockNumber, startNonce, finalBlockNumber, currentNonce)
if err != nil {
Expand All @@ -793,7 +873,7 @@ func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) erro
return nil
}

func lightSummary(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client, startBlockNumber, startNonce, endBlockNumber, endNonce uint64) {
func lightSummary(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client, startBlockNumber, startNonce, endBlockNumber, endNonce uint64, rl *rate.Limiter) {
startBlock, err := c.BlockByNumber(ctx, new(big.Int).SetUint64(startBlockNumber))
if err != nil {
log.Error().Err(err).Msg("unable to get start block for light summary")
Expand All @@ -816,6 +896,7 @@ func lightSummary(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client,
Int("transactionCount", len(loadTestResults)).
Float64("testDuration", testDuration.Seconds()).
Float64("tps", tps).
Float64("final rate limit", float64(rl.Limit())).
Msg("rough test summary (ignores errors)")
}

Expand Down

0 comments on commit 903cdf5

Please sign in to comment.