Skip to content

Commit

Permalink
Merge PR: simulate tx async (#2812)
Browse files Browse the repository at this point in the history
* add enable config of hgu

* simulate tx in new goroutine

* add simulation debug info

* start more goroutine for simulation

* fix bug

* udpate

* fix bug of sig cache

* disable pendingPool

* enable hgu default

* add log info and query api of simulation gas

* add enable config of pgu and adjustment

* use atomic to prevent data race

* delete temp code

* enable async simulation only when pgb is greater than -1

* check error
  • Loading branch information
yann-sjtu authored Nov 29, 2022
1 parent ea57c6e commit 620f195
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 36 deletions.
37 changes: 37 additions & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type OecConfig struct {
maxTxNumPerBlock int64
// mempool.max_gas_used_per_block
maxGasUsedPerBlock int64
// mempool.enable-pgu
enablePGU bool
// mempool.pgu-adjustment
pguAdjustment float64
// mempool.node_key_whitelist
nodeKeyWhitelist []string
//mempool.check_tx_cost
Expand Down Expand Up @@ -123,6 +127,8 @@ const (
FlagMempoolFlush = "mempool.flush"
FlagMaxTxNumPerBlock = "mempool.max_tx_num_per_block"
FlagMaxGasUsedPerBlock = "mempool.max_gas_used_per_block"
FlagEnablePGU = "mempool.enable-pgu"
FlagPGUAdjustment = "mempool.pgu-adjustment"
FlagNodeKeyWhitelist = "mempool.node_key_whitelist"
FlagMempoolCheckTxCost = "mempool.check_tx_cost"
FlagGasLimitBuffer = "gas-limit-buffer"
Expand Down Expand Up @@ -260,6 +266,8 @@ func (c *OecConfig) loadFromConfig() {
c.SetMempoolCheckTxCost(viper.GetBool(FlagMempoolCheckTxCost))
c.SetMaxTxNumPerBlock(viper.GetInt64(FlagMaxTxNumPerBlock))
c.SetMaxGasUsedPerBlock(viper.GetInt64(FlagMaxGasUsedPerBlock))
c.SetEnablePGU(viper.GetBool(FlagEnablePGU))
c.SetPGUAdjustment(viper.GetFloat64(FlagPGUAdjustment))
c.SetGasLimitBuffer(viper.GetUint64(FlagGasLimitBuffer))

c.SetEnableDynamicGp(viper.GetBool(FlagEnableDynamicGp))
Expand Down Expand Up @@ -448,6 +456,18 @@ func (c *OecConfig) updateFromKVStr(k, v string) {
return
}
c.SetMaxGasUsedPerBlock(r)
case FlagEnablePGU:
r, err := strconv.ParseBool(v)
if err != nil {
return
}
c.SetEnablePGU(r)
case FlagPGUAdjustment:
r, err := strconv.ParseFloat(v, 64)
if err != nil {
return
}
c.SetPGUAdjustment(r)
case FlagGasLimitBuffer:
r, err := strconv.ParseUint(v, 10, 64)
if err != nil {
Expand Down Expand Up @@ -723,13 +743,30 @@ func (c *OecConfig) SetMaxTxNumPerBlock(value int64) {
func (c *OecConfig) GetMaxGasUsedPerBlock() int64 {
return c.maxGasUsedPerBlock
}

func (c *OecConfig) SetMaxGasUsedPerBlock(value int64) {
if value < -1 {
return
}
c.maxGasUsedPerBlock = value
}

func (c *OecConfig) GetEnablePGU() bool {
return c.enablePGU
}

func (c *OecConfig) SetEnablePGU(value bool) {
c.enablePGU = value
}

func (c *OecConfig) GetPGUAdjustment() float64 {
return c.pguAdjustment
}

func (c *OecConfig) SetPGUAdjustment(value float64) {
c.pguAdjustment = value
}

func (c *OecConfig) GetGasLimitBuffer() uint64 {
return c.gasLimitBuffer
}
Expand Down
2 changes: 2 additions & 0 deletions app/elapse_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ var (
mandatorySchemas = []string{
trace.Height,
trace.Tx,
trace.SimTx,
trace.BlockSize,
trace.TimeoutInterval,
trace.LastBlockTime,
trace.GasUsed,
trace.SimGasUsed,
trace.InvalidTxs,
trace.LastRun,
trace.RunTx,
Expand Down
6 changes: 4 additions & 2 deletions libs/system/trace/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ const (

const (
GasUsed = "GasUsed"
SimGasUsed = "SimGasUsed"
Produce = "Produce"
RunTx = "RunTx"
LastRun = "lastRun"
Height = "Height"
Tx = "Tx"
SimTx = "SimTx"
BlockSize = "BlockSize"
Elapsed = "Elapsed"
CommitRound = "CommitRound"
Expand All @@ -57,9 +59,9 @@ const (
Delta = "Delta"
InvalidTxs = "InvalidTxs"

Abci = "abci"
Abci = "abci"
//SaveResp = "saveResp"
Persist = "persist"
Persist = "persist"
//MempoolUpdate = "mpUpdate"
//SaveState = "saveState"
ApplyBlock = "ApplyBlock"
Expand Down
10 changes: 10 additions & 0 deletions libs/tendermint/cmd/tendermint/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ func AddNodeFlags(cmd *cobra.Command) {
config.Mempool.MaxGasUsedPerBlock,
"Maximum gas used of transactions in a block",
)
cmd.Flags().Bool(
"mempool.enable-pgu",
false,
"enable precise gas used",
)
cmd.Flags().Float64(
"mempool.pgu-adjustment",
1,
"adjustment for pgu, such as 0.9 or 1.1",
)
cmd.Flags().Bool(
"mempool.sort_tx_by_gp",
config.Mempool.SortTxByGp,
Expand Down
2 changes: 1 addition & 1 deletion libs/tendermint/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ func DefaultMempoolConfig() *MempoolConfig {
SortTxByGp: true,
ForceRecheckGap: 2000,
TxPriceBump: 10,
EnablePendingPool: true,
EnablePendingPool: false,
PendingPoolSize: 50000,
PendingPoolPeriod: 3,
PendingPoolReserveBlocks: 100,
Expand Down
10 changes: 10 additions & 0 deletions libs/tendermint/config/dynamic_config_okchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type IDynamicConfig interface {
GetMempoolCacheSize() int
GetMaxTxNumPerBlock() int64
GetMaxGasUsedPerBlock() int64
GetEnablePGU() bool
GetPGUAdjustment() float64
GetMempoolFlush() bool
GetNodeKeyWhitelist() []string
GetMempoolCheckTxCost() bool
Expand Down Expand Up @@ -58,6 +60,14 @@ func (d MockDynamicConfig) GetMaxGasUsedPerBlock() int64 {
return DefaultMempoolConfig().MaxGasUsedPerBlock
}

func (d MockDynamicConfig) GetEnablePGU() bool {
return false
}

func (d MockDynamicConfig) GetPGUAdjustment() float64 {
return 1
}

func (d MockDynamicConfig) GetMempoolFlush() bool {
return false
}
Expand Down
81 changes: 69 additions & 12 deletions libs/tendermint/mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"

"math/big"
"strconv"
"sync"
Expand All @@ -13,8 +14,7 @@ import (

"github.com/VictoriaMetrics/fastcache"

"github.com/tendermint/go-amino"

lru "github.com/hashicorp/golang-lru"
"github.com/okex/exchain/libs/system/trace"
abci "github.com/okex/exchain/libs/tendermint/abci/types"
cfg "github.com/okex/exchain/libs/tendermint/config"
Expand All @@ -23,6 +23,7 @@ import (
tmmath "github.com/okex/exchain/libs/tendermint/libs/math"
"github.com/okex/exchain/libs/tendermint/proxy"
"github.com/okex/exchain/libs/tendermint/types"
"github.com/tendermint/go-amino"
)

type TxInfoParser interface {
Expand Down Expand Up @@ -90,6 +91,10 @@ type CListMempool struct {
checkP2PTotalTime int64

txs ITransactionQueue

simQueue chan *mempoolTx

gasCache *lru.Cache
}

var _ Mempool = &CListMempool{}
Expand All @@ -110,6 +115,11 @@ func NewCListMempool(
} else {
txQueue = NewBaseTxQueue()
}

gasCache, err := lru.New(1000000)
if err != nil {
panic(err)
}
mempool := &CListMempool{
config: config,
proxyAppConn: proxyAppConn,
Expand All @@ -120,7 +130,10 @@ func NewCListMempool(
logger: log.NewNopLogger(),
metrics: NopMetrics(),
txs: txQueue,
simQueue: make(chan *mempoolTx, 100000),
gasCache: gasCache,
}
go mempool.simulationRoutine()

if cfg.DynamicConfig.GetMempoolCacheSize() > 0 {
mempool.cache = newMapTxCache(cfg.DynamicConfig.GetMempoolCacheSize())
Expand Down Expand Up @@ -402,15 +415,21 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) error {
if err := mem.txs.Insert(memTx); err != nil {
return err
}
if cfg.DynamicConfig.GetMaxGasUsedPerBlock() > -1 && cfg.DynamicConfig.GetEnablePGU() {
select {
case mem.simQueue <- memTx:
default:
mem.logger.Error("tx simulation queue is full")
}
}

atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
mem.eventBus.PublishEventPendingTx(types.EventDataTx{TxResult: types.TxResult{
Height: memTx.height,
Tx: memTx.tx,
}})

types.SignatureCache().Remove(memTx.realTx.TxHash())

return nil
}

Expand Down Expand Up @@ -703,6 +722,15 @@ func (mem *CListMempool) notifyTxsAvailable() {
}
}

func (mem *CListMempool) GetTxSimulateGas(txHash string) int64 {
hash := hex.EncodeToString([]byte(txHash))
v, ok := mem.gasCache.Get(hash)
if !ok {
return -1
}
return v.(int64)
}

func (mem *CListMempool) ReapEssentialTx(tx types.Tx) abci.TxEssentials {
if ele, ok := mem.txs.Load(txKey(tx)); ok {
return ele.Value.(*mempoolTx).realTx
Expand All @@ -724,9 +752,12 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) []types.Tx {
// size per tx, and set the initial capacity based off of that.
// txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max/mem.avgTxSize))
txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), int(cfg.DynamicConfig.GetMaxTxNumPerBlock())))
var simCount, simGas int64
defer func() {
mem.logger.Info("ReapMaxBytesMaxGas", "ProposingHeight", mem.Height()+1,
"MempoolTxs", mem.txs.Len(), "ReapTxs", len(txs))
trace.GetElapsedInfo().AddInfo(trace.SimTx, fmt.Sprintf("%d:%d", mem.Height()+1, simCount))
trace.GetElapsedInfo().AddInfo(trace.SimGasUsed, fmt.Sprintf("%d:%d", mem.Height()+1, simGas))
}()
for e := mem.txs.Front(); e != nil; e = e.Next() {
memTx := e.Value.(*mempoolTx)
Expand All @@ -740,15 +771,9 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) []types.Tx {
// If maxGas is negative, skip this check.
// Since newTotalGas < masGas, which
// must be non-negative, it follows that this won't overflow.
newTotalGas := totalGas + memTx.gasWanted
gasWanted := atomic.LoadInt64(&memTx.gasWanted)
newTotalGas := totalGas + gasWanted
if maxGas > -1 && newTotalGas > maxGas {
if len(txs) <= 1 && mem.Size() > 0 {
mem.logger.Error("Unexpected gas", "txHash", hex.EncodeToString(memTx.tx.Hash(mem.Height())), "gasWanted", memTx.gasWanted, "totalGas", newTotalGas)
for ; e != nil && len(txs) < 15; e = e.Next() {
memTx = e.Value.(*mempoolTx)
txs = append(txs, memTx.tx)
}
}
return txs
}
if totalTxNum >= cfg.DynamicConfig.GetMaxTxNumPerBlock() {
Expand All @@ -758,6 +783,10 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) []types.Tx {
totalTxNum++
totalGas = newTotalGas
txs = append(txs, memTx.tx)
simGas += gasWanted
if atomic.LoadUint32(&memTx.isSim) > 0 {
simCount++
}
}

return txs
Expand Down Expand Up @@ -866,11 +895,13 @@ func (mem *CListMempool) Update(
if mem.pendingPool != nil {
addressNonce = make(map[string]uint64)
}

for i, tx := range txs {
txCode := deliverTxResponses[i].Code
addr := ""
nonce := uint64(0)
if ele := mem.cleanTx(height, tx, txCode); ele != nil {
atomic.AddUint32(&(ele.Value.(*mempoolTx).isOutdated), 1)
addr = ele.Address
nonce = ele.Nonce
mem.logUpdate(ele.Address, ele.Nonce)
Expand Down Expand Up @@ -1069,6 +1100,9 @@ type mempoolTx struct {
from string
senderNonce uint64

isOutdated uint32
isSim uint32

// ids of peers who've sent us this tx (as a map for quick lookups).
// senders: PeerID -> bool
senders map[uint16]struct{}
Expand Down Expand Up @@ -1230,3 +1264,26 @@ func (mem *CListMempool) simulateTx(tx types.Tx) (*SimulationResponse, error) {
err = cdc.UnmarshalBinaryBare(res.Value, &simuRes)
return &simuRes, err
}

func (mem *CListMempool) simulationRoutine() {
for memTx := range mem.simQueue {
mem.simulationJob(memTx)
}
}

func (mem *CListMempool) simulationJob(memTx *mempoolTx) {
defer types.SignatureCache().Remove(memTx.realTx.TxHash())
if atomic.LoadUint32(&memTx.isOutdated) != 0 {
// memTx is outdated
return
}
simuRes, err := mem.simulateTx(memTx.tx)
if err != nil {
mem.logger.Error("simulateTx", "error", err, "txHash", memTx.tx.Hash(mem.Height()))
return
}
gas := int64(simuRes.GasUsed) * int64(cfg.DynamicConfig.GetPGUAdjustment()*100) / 100
atomic.StoreInt64(&memTx.gasWanted, gas)
atomic.AddUint32(&memTx.isSim, 1)
mem.gasCache.Add(hex.EncodeToString(memTx.realTx.TxHash()), gas)
}
2 changes: 2 additions & 0 deletions libs/tendermint/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Mempool interface {
SetAccountRetriever(retriever AccountRetriever)

SetTxInfoParser(parser TxInfoParser)

GetTxSimulateGas(txHash string) int64
}

//--------------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions libs/tendermint/mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,5 @@ func (Mempool) SetAccountRetriever(_ mempl.AccountRetriever) {
func (Mempool) SetTxInfoParser(_ mempl.TxInfoParser) {

}

func (Mempool) GetTxSimulateGas(txHash string) int64 { return 0 }
6 changes: 6 additions & 0 deletions libs/tendermint/rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, err
TotalBytes: env.Mempool.TxsBytes()}, nil
}

func TxSimulateGasCost(ctx *rpctypes.Context, hash string) (*ctypes.ResponseTxSimulateGas, error) {
return &ctypes.ResponseTxSimulateGas{
GasCost: env.Mempool.GetTxSimulateGas(hash),
}, nil
}

func UserUnconfirmedTxs(address string, limit int) (*ctypes.ResultUserUnconfirmedTxs, error) {
txs := env.Mempool.ReapUserTxs(address, limit)
return &ctypes.ResultUserUnconfirmedTxs{
Expand Down
2 changes: 2 additions & 0 deletions libs/tendermint/rpc/core/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var Routes = map[string]*rpc.RPCFunc{

// evidence API
"broadcast_evidence": rpc.NewRPCFunc(BroadcastEvidence, "evidence"),

"tx_simulate_gas": rpc.NewRPCFunc(TxSimulateGasCost, "hash"),
}

func AddUnsafeRoutes() {
Expand Down
Loading

0 comments on commit 620f195

Please sign in to comment.