From 620f19569d8a16315ed35af91bec8da99c401163 Mon Sep 17 00:00:00 2001 From: YuanXingqiang Date: Tue, 29 Nov 2022 11:07:19 +0800 Subject: [PATCH] Merge PR: simulate tx async (#2812) * add enable config of hgu * simulate tx in new goroutine * add simulation debug info * start more goroutine for simulation * fix bug * udpate * fix bug of sig cache * disable pendingPool * enable hgu default * add log info and query api of simulation gas * add enable config of pgu and adjustment * use atomic to prevent data race * delete temp code * enable async simulation only when pgb is greater than -1 * check error --- app/config/config.go | 37 +++++++++ app/elapse_info.go | 2 + libs/system/trace/schema.go | 6 +- .../cmd/tendermint/commands/run_node.go | 10 +++ libs/tendermint/config/config.go | 2 +- .../config/dynamic_config_okchain.go | 10 +++ libs/tendermint/mempool/clist_mempool.go | 81 ++++++++++++++++--- libs/tendermint/mempool/mempool.go | 2 + libs/tendermint/mock/mempool.go | 2 + libs/tendermint/rpc/core/mempool.go | 6 ++ libs/tendermint/rpc/core/routes.go | 2 + libs/tendermint/rpc/core/types/responses.go | 4 + x/evm/types/msg_evm.go | 40 ++++----- x/evm/types/msg_test.go | 2 +- 14 files changed, 170 insertions(+), 36 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index 6f73b49b6c..cf01a3c121 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -45,6 +45,10 @@ type OecConfig struct { maxTxNumPerBlock int64 // mempool.max_gas_used_per_block maxGasUsedPerBlock int64 + // mempool.enable-pgu + enablePGU bool + // mempool.pgu-adjustment + pguAdjustment float64 // mempool.node_key_whitelist nodeKeyWhitelist []string //mempool.check_tx_cost @@ -123,6 +127,8 @@ const ( FlagMempoolFlush = "mempool.flush" FlagMaxTxNumPerBlock = "mempool.max_tx_num_per_block" FlagMaxGasUsedPerBlock = "mempool.max_gas_used_per_block" + FlagEnablePGU = "mempool.enable-pgu" + FlagPGUAdjustment = "mempool.pgu-adjustment" FlagNodeKeyWhitelist = "mempool.node_key_whitelist" FlagMempoolCheckTxCost = "mempool.check_tx_cost" FlagGasLimitBuffer = "gas-limit-buffer" @@ -260,6 +266,8 @@ func (c *OecConfig) loadFromConfig() { c.SetMempoolCheckTxCost(viper.GetBool(FlagMempoolCheckTxCost)) c.SetMaxTxNumPerBlock(viper.GetInt64(FlagMaxTxNumPerBlock)) c.SetMaxGasUsedPerBlock(viper.GetInt64(FlagMaxGasUsedPerBlock)) + c.SetEnablePGU(viper.GetBool(FlagEnablePGU)) + c.SetPGUAdjustment(viper.GetFloat64(FlagPGUAdjustment)) c.SetGasLimitBuffer(viper.GetUint64(FlagGasLimitBuffer)) c.SetEnableDynamicGp(viper.GetBool(FlagEnableDynamicGp)) @@ -448,6 +456,18 @@ func (c *OecConfig) updateFromKVStr(k, v string) { return } c.SetMaxGasUsedPerBlock(r) + case FlagEnablePGU: + r, err := strconv.ParseBool(v) + if err != nil { + return + } + c.SetEnablePGU(r) + case FlagPGUAdjustment: + r, err := strconv.ParseFloat(v, 64) + if err != nil { + return + } + c.SetPGUAdjustment(r) case FlagGasLimitBuffer: r, err := strconv.ParseUint(v, 10, 64) if err != nil { @@ -723,6 +743,7 @@ func (c *OecConfig) SetMaxTxNumPerBlock(value int64) { func (c *OecConfig) GetMaxGasUsedPerBlock() int64 { return c.maxGasUsedPerBlock } + func (c *OecConfig) SetMaxGasUsedPerBlock(value int64) { if value < -1 { return @@ -730,6 +751,22 @@ func (c *OecConfig) SetMaxGasUsedPerBlock(value int64) { c.maxGasUsedPerBlock = value } +func (c *OecConfig) GetEnablePGU() bool { + return c.enablePGU +} + +func (c *OecConfig) SetEnablePGU(value bool) { + c.enablePGU = value +} + +func (c *OecConfig) GetPGUAdjustment() float64 { + return c.pguAdjustment +} + +func (c *OecConfig) SetPGUAdjustment(value float64) { + c.pguAdjustment = value +} + func (c *OecConfig) GetGasLimitBuffer() uint64 { return c.gasLimitBuffer } diff --git a/app/elapse_info.go b/app/elapse_info.go index 061485beb8..efb581a7ae 100644 --- a/app/elapse_info.go +++ b/app/elapse_info.go @@ -44,10 +44,12 @@ var ( mandatorySchemas = []string{ trace.Height, trace.Tx, + trace.SimTx, trace.BlockSize, trace.TimeoutInterval, trace.LastBlockTime, trace.GasUsed, + trace.SimGasUsed, trace.InvalidTxs, trace.LastRun, trace.RunTx, diff --git a/libs/system/trace/schema.go b/libs/system/trace/schema.go index 345cabca22..9c3054abcd 100644 --- a/libs/system/trace/schema.go +++ b/libs/system/trace/schema.go @@ -32,11 +32,13 @@ const ( const ( GasUsed = "GasUsed" + SimGasUsed = "SimGasUsed" Produce = "Produce" RunTx = "RunTx" LastRun = "lastRun" Height = "Height" Tx = "Tx" + SimTx = "SimTx" BlockSize = "BlockSize" Elapsed = "Elapsed" CommitRound = "CommitRound" @@ -57,9 +59,9 @@ const ( Delta = "Delta" InvalidTxs = "InvalidTxs" - Abci = "abci" + Abci = "abci" //SaveResp = "saveResp" - Persist = "persist" + Persist = "persist" //MempoolUpdate = "mpUpdate" //SaveState = "saveState" ApplyBlock = "ApplyBlock" diff --git a/libs/tendermint/cmd/tendermint/commands/run_node.go b/libs/tendermint/cmd/tendermint/commands/run_node.go index abc6c1a640..eaf988d164 100644 --- a/libs/tendermint/cmd/tendermint/commands/run_node.go +++ b/libs/tendermint/cmd/tendermint/commands/run_node.go @@ -121,6 +121,16 @@ func AddNodeFlags(cmd *cobra.Command) { config.Mempool.MaxGasUsedPerBlock, "Maximum gas used of transactions in a block", ) + cmd.Flags().Bool( + "mempool.enable-pgu", + false, + "enable precise gas used", + ) + cmd.Flags().Float64( + "mempool.pgu-adjustment", + 1, + "adjustment for pgu, such as 0.9 or 1.1", + ) cmd.Flags().Bool( "mempool.sort_tx_by_gp", config.Mempool.SortTxByGp, diff --git a/libs/tendermint/config/config.go b/libs/tendermint/config/config.go index b86666d770..3622f3264d 100644 --- a/libs/tendermint/config/config.go +++ b/libs/tendermint/config/config.go @@ -703,7 +703,7 @@ func DefaultMempoolConfig() *MempoolConfig { SortTxByGp: true, ForceRecheckGap: 2000, TxPriceBump: 10, - EnablePendingPool: true, + EnablePendingPool: false, PendingPoolSize: 50000, PendingPoolPeriod: 3, PendingPoolReserveBlocks: 100, diff --git a/libs/tendermint/config/dynamic_config_okchain.go b/libs/tendermint/config/dynamic_config_okchain.go index c45294a9e4..7ce1ef5351 100644 --- a/libs/tendermint/config/dynamic_config_okchain.go +++ b/libs/tendermint/config/dynamic_config_okchain.go @@ -9,6 +9,8 @@ type IDynamicConfig interface { GetMempoolCacheSize() int GetMaxTxNumPerBlock() int64 GetMaxGasUsedPerBlock() int64 + GetEnablePGU() bool + GetPGUAdjustment() float64 GetMempoolFlush() bool GetNodeKeyWhitelist() []string GetMempoolCheckTxCost() bool @@ -58,6 +60,14 @@ func (d MockDynamicConfig) GetMaxGasUsedPerBlock() int64 { return DefaultMempoolConfig().MaxGasUsedPerBlock } +func (d MockDynamicConfig) GetEnablePGU() bool { + return false +} + +func (d MockDynamicConfig) GetPGUAdjustment() float64 { + return 1 +} + func (d MockDynamicConfig) GetMempoolFlush() bool { return false } diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index 0fab05b23f..1e9b35ab1e 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "math/big" "strconv" "sync" @@ -13,8 +14,7 @@ import ( "github.com/VictoriaMetrics/fastcache" - "github.com/tendermint/go-amino" - + lru "github.com/hashicorp/golang-lru" "github.com/okex/exchain/libs/system/trace" abci "github.com/okex/exchain/libs/tendermint/abci/types" cfg "github.com/okex/exchain/libs/tendermint/config" @@ -23,6 +23,7 @@ import ( tmmath "github.com/okex/exchain/libs/tendermint/libs/math" "github.com/okex/exchain/libs/tendermint/proxy" "github.com/okex/exchain/libs/tendermint/types" + "github.com/tendermint/go-amino" ) type TxInfoParser interface { @@ -90,6 +91,10 @@ type CListMempool struct { checkP2PTotalTime int64 txs ITransactionQueue + + simQueue chan *mempoolTx + + gasCache *lru.Cache } var _ Mempool = &CListMempool{} @@ -110,6 +115,11 @@ func NewCListMempool( } else { txQueue = NewBaseTxQueue() } + + gasCache, err := lru.New(1000000) + if err != nil { + panic(err) + } mempool := &CListMempool{ config: config, proxyAppConn: proxyAppConn, @@ -120,7 +130,10 @@ func NewCListMempool( logger: log.NewNopLogger(), metrics: NopMetrics(), txs: txQueue, + simQueue: make(chan *mempoolTx, 100000), + gasCache: gasCache, } + go mempool.simulationRoutine() if cfg.DynamicConfig.GetMempoolCacheSize() > 0 { mempool.cache = newMapTxCache(cfg.DynamicConfig.GetMempoolCacheSize()) @@ -402,6 +415,14 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) error { if err := mem.txs.Insert(memTx); err != nil { return err } + if cfg.DynamicConfig.GetMaxGasUsedPerBlock() > -1 && cfg.DynamicConfig.GetEnablePGU() { + select { + case mem.simQueue <- memTx: + default: + mem.logger.Error("tx simulation queue is full") + } + } + atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) mem.eventBus.PublishEventPendingTx(types.EventDataTx{TxResult: types.TxResult{ @@ -409,8 +430,6 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) error { Tx: memTx.tx, }}) - types.SignatureCache().Remove(memTx.realTx.TxHash()) - return nil } @@ -703,6 +722,15 @@ func (mem *CListMempool) notifyTxsAvailable() { } } +func (mem *CListMempool) GetTxSimulateGas(txHash string) int64 { + hash := hex.EncodeToString([]byte(txHash)) + v, ok := mem.gasCache.Get(hash) + if !ok { + return -1 + } + return v.(int64) +} + func (mem *CListMempool) ReapEssentialTx(tx types.Tx) abci.TxEssentials { if ele, ok := mem.txs.Load(txKey(tx)); ok { return ele.Value.(*mempoolTx).realTx @@ -724,9 +752,12 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) []types.Tx { // size per tx, and set the initial capacity based off of that. // txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max/mem.avgTxSize)) txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), int(cfg.DynamicConfig.GetMaxTxNumPerBlock()))) + var simCount, simGas int64 defer func() { mem.logger.Info("ReapMaxBytesMaxGas", "ProposingHeight", mem.Height()+1, "MempoolTxs", mem.txs.Len(), "ReapTxs", len(txs)) + trace.GetElapsedInfo().AddInfo(trace.SimTx, fmt.Sprintf("%d:%d", mem.Height()+1, simCount)) + trace.GetElapsedInfo().AddInfo(trace.SimGasUsed, fmt.Sprintf("%d:%d", mem.Height()+1, simGas)) }() for e := mem.txs.Front(); e != nil; e = e.Next() { memTx := e.Value.(*mempoolTx) @@ -740,15 +771,9 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) []types.Tx { // If maxGas is negative, skip this check. // Since newTotalGas < masGas, which // must be non-negative, it follows that this won't overflow. - newTotalGas := totalGas + memTx.gasWanted + gasWanted := atomic.LoadInt64(&memTx.gasWanted) + newTotalGas := totalGas + gasWanted if maxGas > -1 && newTotalGas > maxGas { - if len(txs) <= 1 && mem.Size() > 0 { - mem.logger.Error("Unexpected gas", "txHash", hex.EncodeToString(memTx.tx.Hash(mem.Height())), "gasWanted", memTx.gasWanted, "totalGas", newTotalGas) - for ; e != nil && len(txs) < 15; e = e.Next() { - memTx = e.Value.(*mempoolTx) - txs = append(txs, memTx.tx) - } - } return txs } if totalTxNum >= cfg.DynamicConfig.GetMaxTxNumPerBlock() { @@ -758,6 +783,10 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) []types.Tx { totalTxNum++ totalGas = newTotalGas txs = append(txs, memTx.tx) + simGas += gasWanted + if atomic.LoadUint32(&memTx.isSim) > 0 { + simCount++ + } } return txs @@ -866,11 +895,13 @@ func (mem *CListMempool) Update( if mem.pendingPool != nil { addressNonce = make(map[string]uint64) } + for i, tx := range txs { txCode := deliverTxResponses[i].Code addr := "" nonce := uint64(0) if ele := mem.cleanTx(height, tx, txCode); ele != nil { + atomic.AddUint32(&(ele.Value.(*mempoolTx).isOutdated), 1) addr = ele.Address nonce = ele.Nonce mem.logUpdate(ele.Address, ele.Nonce) @@ -1069,6 +1100,9 @@ type mempoolTx struct { from string senderNonce uint64 + isOutdated uint32 + isSim uint32 + // ids of peers who've sent us this tx (as a map for quick lookups). // senders: PeerID -> bool senders map[uint16]struct{} @@ -1230,3 +1264,26 @@ func (mem *CListMempool) simulateTx(tx types.Tx) (*SimulationResponse, error) { err = cdc.UnmarshalBinaryBare(res.Value, &simuRes) return &simuRes, err } + +func (mem *CListMempool) simulationRoutine() { + for memTx := range mem.simQueue { + mem.simulationJob(memTx) + } +} + +func (mem *CListMempool) simulationJob(memTx *mempoolTx) { + defer types.SignatureCache().Remove(memTx.realTx.TxHash()) + if atomic.LoadUint32(&memTx.isOutdated) != 0 { + // memTx is outdated + return + } + simuRes, err := mem.simulateTx(memTx.tx) + if err != nil { + mem.logger.Error("simulateTx", "error", err, "txHash", memTx.tx.Hash(mem.Height())) + return + } + gas := int64(simuRes.GasUsed) * int64(cfg.DynamicConfig.GetPGUAdjustment()*100) / 100 + atomic.StoreInt64(&memTx.gasWanted, gas) + atomic.AddUint32(&memTx.isSim, 1) + mem.gasCache.Add(hex.EncodeToString(memTx.realTx.TxHash()), gas) +} diff --git a/libs/tendermint/mempool/mempool.go b/libs/tendermint/mempool/mempool.go index 5afc8588db..9290b1553f 100644 --- a/libs/tendermint/mempool/mempool.go +++ b/libs/tendermint/mempool/mempool.go @@ -89,6 +89,8 @@ type Mempool interface { SetAccountRetriever(retriever AccountRetriever) SetTxInfoParser(parser TxInfoParser) + + GetTxSimulateGas(txHash string) int64 } //-------------------------------------------------------------------------------- diff --git a/libs/tendermint/mock/mempool.go b/libs/tendermint/mock/mempool.go index 7707a9a582..33a883dae6 100644 --- a/libs/tendermint/mock/mempool.go +++ b/libs/tendermint/mock/mempool.go @@ -75,3 +75,5 @@ func (Mempool) SetAccountRetriever(_ mempl.AccountRetriever) { func (Mempool) SetTxInfoParser(_ mempl.TxInfoParser) { } + +func (Mempool) GetTxSimulateGas(txHash string) int64 { return 0 } diff --git a/libs/tendermint/rpc/core/mempool.go b/libs/tendermint/rpc/core/mempool.go index 1d40028af6..fcc32101ca 100644 --- a/libs/tendermint/rpc/core/mempool.go +++ b/libs/tendermint/rpc/core/mempool.go @@ -153,6 +153,12 @@ func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, err TotalBytes: env.Mempool.TxsBytes()}, nil } +func TxSimulateGasCost(ctx *rpctypes.Context, hash string) (*ctypes.ResponseTxSimulateGas, error) { + return &ctypes.ResponseTxSimulateGas{ + GasCost: env.Mempool.GetTxSimulateGas(hash), + }, nil +} + func UserUnconfirmedTxs(address string, limit int) (*ctypes.ResultUserUnconfirmedTxs, error) { txs := env.Mempool.ReapUserTxs(address, limit) return &ctypes.ResultUserUnconfirmedTxs{ diff --git a/libs/tendermint/rpc/core/routes.go b/libs/tendermint/rpc/core/routes.go index ebe3f27c8e..fa7e432069 100644 --- a/libs/tendermint/rpc/core/routes.go +++ b/libs/tendermint/rpc/core/routes.go @@ -48,6 +48,8 @@ var Routes = map[string]*rpc.RPCFunc{ // evidence API "broadcast_evidence": rpc.NewRPCFunc(BroadcastEvidence, "evidence"), + + "tx_simulate_gas": rpc.NewRPCFunc(TxSimulateGasCost, "hash"), } func AddUnsafeRoutes() { diff --git a/libs/tendermint/rpc/core/types/responses.go b/libs/tendermint/rpc/core/types/responses.go index 3edbb691b9..2dfbc3c957 100644 --- a/libs/tendermint/rpc/core/types/responses.go +++ b/libs/tendermint/rpc/core/types/responses.go @@ -204,6 +204,10 @@ type ResultUserUnconfirmedTxs struct { Txs []types.Tx `json:"txs"` } +type ResponseTxSimulateGas struct { + GasCost int64 `json:"gas_cost"` +} + // List of mempool addresses type ResultUnconfirmedAddresses struct { Addresses []string `json:"addresses"` diff --git a/x/evm/types/msg_evm.go b/x/evm/types/msg_evm.go index 2228bff144..4f228e1e79 100644 --- a/x/evm/types/msg_evm.go +++ b/x/evm/types/msg_evm.go @@ -59,18 +59,18 @@ func (tx *MsgEthereumTx) SetFrom(addr string) { // GetFrom returns sender address of MsgEthereumTx if signature is valid, or returns "". func (tx *MsgEthereumTx) GetFrom() string { from := tx.BaseTx.GetFrom() - if from == "" { - from, _ = tmtypes.SignatureCache().Get(tx.TxHash()) - if from == "" { - addr, err := tx.firstVerifySig(tx.ChainID()) - if err != nil { - return "" - } - return EthAddressToString(&addr) - } + if from != "" { + return from } - - return from + from, _ = tmtypes.SignatureCache().Get(tx.TxHash()) + if from != "" { + return from + } + err := tx.firstVerifySig(tx.ChainID()) + if err != nil { + return "" + } + return tx.BaseTx.GetFrom() } func (msg MsgEthereumTx) GetSender(ctx sdk.Context) string { @@ -325,13 +325,13 @@ var sigBigNumPool = &sync.Pool{ }, } -func (msg *MsgEthereumTx) firstVerifySig(chainID *big.Int) (ethcmn.Address, error) { +func (msg *MsgEthereumTx) firstVerifySig(chainID *big.Int) error { var V *big.Int var sigHash ethcmn.Hash if isProtectedV(msg.Data.V) { // do not allow recovery for transactions with an unprotected chainID if chainID.Sign() == 0 { - return emptyEthAddr, errors.New("chainID cannot be zero") + return errors.New("chainID cannot be zero") } bigNum := sigBigNumPool.Get().(*big.Int) @@ -352,9 +352,13 @@ func (msg *MsgEthereumTx) firstVerifySig(chainID *big.Int) (ethcmn.Address, erro sender, err := recoverEthSig(msg.Data.R, msg.Data.S, V, &sigHash) if err != nil { - return emptyEthAddr, err + return err } - return sender, nil + from := EthAddressToString(&sender) + tmtypes.SignatureCache().Add(msg.TxHash(), from) + msg.BaseTx.From = from + msg.addr = sender + return nil } // VerifySig attempts to verify a Transaction's signature for a given chainID. @@ -371,14 +375,10 @@ func (msg *MsgEthereumTx) VerifySig(chainID *big.Int, height int64) error { msg.SetFrom(from) return nil } - addr, err := msg.firstVerifySig(chainID) + err := msg.firstVerifySig(chainID) if err != nil { return err } - from = EthAddressToString(&addr) - tmtypes.SignatureCache().Add(msg.TxHash(), from) - msg.BaseTx.From = from - msg.addr = addr return nil } diff --git a/x/evm/types/msg_test.go b/x/evm/types/msg_test.go index de3c70c7d8..f7d4af3cce 100644 --- a/x/evm/types/msg_test.go +++ b/x/evm/types/msg_test.go @@ -445,7 +445,7 @@ func BenchmarkEvmTxVerifySig(b *testing.B) { b.Run("firstVerifySig", func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - _, err := msg.firstVerifySig(chainID) + err := msg.firstVerifySig(chainID) if err != nil { b.Fatal(err) }