From a4005fc899cfc0e851adda463a4d0bdecccab585 Mon Sep 17 00:00:00 2001 From: KamiD <44460798+KamiD@users.noreply.github.com> Date: Thu, 1 Dec 2022 17:37:34 +0800 Subject: [PATCH] Merge PR: merge v1.6.5 to dev (#2805) * Merge PR: fix the evm2cm tx in parallel in watchdb query status error (#2715) Co-authored-by: Evan Han * Merge PR: fix feesplit cache contaminate when parallel-tx rerun (#2720) * Merge PR: delete feesplit innertx (#2721) * delete feesplit innertx * Update keeper_innertx.go * Merge PR: optimize the feesplit api (#2725) * Merge PR: fix the watchdb parallel query error (#2730) 2. fix the --deliver-txs-mode=2 can not query status 3. update the version * Merge PR: init commit and merge confilict (#2731) when repair-state we judge the fast-storage flag by data add feesplit module reserve --iavl-enable-fast-storage flag for repair-state merge repair-state we judge... and deal with confict rm NeedLongTimeToUpgrade use the IsFastStorgeStrategy universal Co-authored-by: xiangjianmeng <805442788@qq.com> * Merge PR: query Feesplit params by rest api (#2732) * Merge PR: upadte version (#2735) * Merge PR: fix watchdb duplicate tx bugs (#2736) * Merge PR: rollback blocktime v156 (#2738) * roll back blocktime * del flag * Merge PR: fix the estimateGas crash (#2749) * Merge PR: Compatible start enable-repair-state flag with fss (#2754) * when start with enable-repair-state and the db is fast-storage, we repair with the iavl-enable-fast-storage * adjust the fast-storage node size * Merge PR: patch to fix the potential bug of hgu (#2755) Co-authored-by: KamiD <44460798+KamiD@users.noreply.github.com> * Merge PR: update config default value (#2751) * set chain-id default to exchain-66 * set EnablePrerunTx default to true * set pruning default to everything * enable ac default * disableABCIQueryMutex default * enable fast-query by default * enable BloomFilter by default * update mempool Size default value * update val mode config * add tx_indexer.indexer cmd flags * add enable-concurrency flag * set chainid to exchain65 if make testnet * Merge PR: dump lastrun in case of fastsync (#2760) * Merge PR: recommend gp bug fix (#2744) * gp bug fix * modify code * add flag DynamicGpAdaptUncongest * modify ut * adapt congest * bug fix * code optimize Co-authored-by: KamiD <44460798+KamiD@users.noreply.github.com> * Merge PR: optimize start params (#2762) * optimize start params * optimize start param * optimize start param * optimize start param * Merge PR: DDS lastrun & bump version to v1.6.5.5(#2763) * dds lastrun * Version=v1.6.5.5 * Merge PR: add dynamic dp flag (#2764) * gp bug fix * modify code * add flag DynamicGpAdaptUncongest * modify ut * adapt congest * bug fix * code optimize * add flag: gp maxGas maxTx * Update config.go * Update flags.go Co-authored-by: KamiD <44460798+KamiD@users.noreply.github.com> * Merge PR: add avc flag (#2768) * Merge PR: modify DynamicGpMaxGasUsed flag (#2765) * gp bug fix * modify code * add flag DynamicGpAdaptUncongest * modify ut * adapt congest * bug fix * code optimize * add flag: gp maxGas maxTx * bug fix * modify flag * typo fix * Merge PR: optimiz fss flag (#2769) * fast storage is default true and set by the data * add fast-storage tips when start the node * hidden flag discard-fast-storage * print iavl.IsFastStorage when start node * Update Makefile * restore and hide the old flag Co-authored-by: KamiD <44460798+KamiD@users.noreply.github.com> * Merge PR: evn set (#2782) * Merge PR: commitGapHeight for dynamic update (#2773) * 1. add commitGapHeight for dynamic update * 1. add comment * 1. add UpdateCommitGapHeight at before commitStores * 1. add UpdateCommitGapHeight at before commitStores * 1. modify the spell Co-authored-by: KamiD <44460798+KamiD@users.noreply.github.com> * Merge PR: support dynamic set fast node cache size (#2780) * support dynamic set fast node cache size * use atomic operate the size * do not use atomic * use one const and reduce dup flag * Merge PR: optimize gp flag and fix bug (#2775) * modify dynamic gp flag * fix bug * rewrite ut * magic number fix * replace flag in sh * modify code * fix typo * mark hidden enable-dynamic-gp * improve code * improve code * Merge PR: change avc premakeblock to single (#2778) * add debug test * preblock test * restore producer * adjust channel buffer Co-authored-by: xiangjianmeng <805442788@qq.com> * Merge PR: feesplit support paralleled-tx (#2776) * Update app_parallel.go first. commit Update baseapp_runtx.go Update abci.go 1 Update app_parallel.go Update app_parallel.go Update baseapp_mode_deliver.go 1 Update evm_hooks.go Update baseapp_mode_deliver.go Update base.go Update evm_hooks.go 1 a Update repair_state.go Revert "Update repair_state.go" This reverts commit 3051e30feeb365fa10ca039c92e602132aec6a7f. * fix test * typ * add check * add check Co-authored-by: xiangjianmeng <805442788@qq.com> * Merge PR: workload(lastRun+persist) statistic (#2785) * applybolck workload statistic * complete workload statistic * fix bug & change format * remove BlockExecutor field * fix atomic on lower go version * remove assert * make it more accurately if process started less than 1 hour * make code easier to understand * fix code style Co-authored-by: Zhong Qiu <36867992+zhongqiuwood@users.noreply.github.com> Co-authored-by: yangzhe * Merge PR: fix random panic parallel tx (#2794) * Merge PR: Bump version to v1.6.5.7 (#2796) * Merge PR: replay cmd default value (#2792) * change default cli params when replay * update Co-authored-by: xiangjianmeng <805442788@qq.com> * Merge PR: disable multi cache (#2802) * Merge PR: bump to v1.6.5.8 (#2803) * Merge PR: fix eth_getTransactionCount error when rpc node is behind the latest block (#2807) * Merge PR: local dynamic config (#2783) * local dynamic config * update go mod * support dynamic config for mempool.cache_size * if enable apollo, local dynamic config will not be applied * update Co-authored-by: ylsGit Co-authored-by: xiangjianmeng <805442788@qq.com> * Merge PR: fix concurrency execute bug (#2808) * fix bug: concurrency execute ApplyBlock in TestFastSyncBadBlockStopsPeer make WrokloadStatistic crash * remove out-of-date comment * add comment for concurrently invoking Co-authored-by: yangzhe * revert patch for hgu * Merge PR: Update elapse_info.go (#2810) * Merge PR: fix config of timeout_commit (#2806) * fix config of timeout_commit * fix config fmt * add db_backend * add pendingPool=false * del setEnv rocksdb * unify timeout_commit define in one place * testnet.sh * add log * add log Co-authored-by: xiangjianmeng <805442788@qq.com> * Merge PR: support pprof and multi-cache when repair state (#2801) * support multi-cache when repair state * add pprof * fix net pprof * Merge PR: simulate tx async (#2812) * add enable config of hgu * simulate tx in new goroutine * add simulation debug info * start more goroutine for simulation * fix bug * udpate * fix bug of sig cache * disable pendingPool * enable hgu default * add log info and query api of simulation gas * add enable config of pgu and adjustment * use atomic to prevent data race * delete temp code * enable async simulation only when pgb is greater than -1 * check error * Merge PR: fix websocket leak (#2820) * fix websocket leak * optimize code * Merge PR: Change GetFastNode's Lock to Rlock (#2815) * remove the lock when get fast node * replace mutex with rbmutex * bump version to v1.6.6 Co-authored-by: lyh169 <44960676+lyh169@users.noreply.github.com> Co-authored-by: Evan Han Co-authored-by: ylsGit Co-authored-by: zhangkai Co-authored-by: xiangjianmeng <805442788@qq.com> Co-authored-by: lcmmhcc <37269413+lcmmhcc@users.noreply.github.com> Co-authored-by: chengzhinei Co-authored-by: YuanXingqiang Co-authored-by: cwbhhjl Co-authored-by: Leo <52782564+LeoGuo621@users.noreply.github.com> Co-authored-by: Zhong Qiu <36867992+zhongqiuwood@users.noreply.github.com> Co-authored-by: lisuxiaoqi Co-authored-by: chunfengSun <516108736@qq.com> Co-authored-by: fatcat22 Co-authored-by: yangzhe --- Makefile | 6 +- app/app.go | 14 +- app/app_abci.go | 5 +- app/config/config.go | 301 ++++++++++++-- app/config/config_test.go | 21 + app/config/local.go | 106 +++++ app/elapse_info.go | 16 +- app/gasprice/gasprice.go | 53 +-- app/gasprice/gasprice_test.go | 377 ++++++++++++++++-- app/node_mode.go | 18 +- app/repair_state.go | 2 + app/rpc/backend/backend.go | 10 + app/rpc/namespaces/eth/api.go | 29 +- app/rpc/websockets/server.go | 72 +++- app/types/gasprice.go | 87 +++- app/types/node_mode.go | 2 +- app/utils/sanity/start.go | 5 +- cmd/client/flags.go | 22 +- cmd/exchaincli/main.go | 3 +- cmd/exchaind/main.go | 22 +- cmd/exchaind/repair_data.go | 17 +- cmd/exchaind/replay.go | 9 + dev/keplr-test.sh | 2 +- dev/local-perf.sh | 2 +- dev/start.sh | 2 +- dev/testnet/testnet.sh | 2 +- dev/wasm-test.sh | 2 +- go.mod | 4 +- go.sum | 7 +- libs/cosmos-sdk/baseapp/baseapp_parallel.go | 5 +- libs/cosmos-sdk/server/pruning.go | 6 +- libs/cosmos-sdk/server/start.go | 10 +- libs/cosmos-sdk/server/start_okchain.go | 32 +- libs/cosmos-sdk/server/util.go | 11 +- .../store/rootmulti/rootmulti_store.go | 6 + libs/cosmos-sdk/types/cache.go | 2 +- libs/iavl/config/dynamic_config_okchain.go | 22 +- libs/iavl/mutable_tree_oec.go | 36 +- libs/iavl/nodedb.go | 4 +- libs/system/trace/schema.go | 9 +- libs/system/trace/trace.go | 14 +- libs/system/trace/workload_statistic.go | 175 ++++++++ libs/system/trace/workload_statistic_test.go | 38 ++ .../cmd/tendermint/commands/run_node.go | 16 +- libs/tendermint/config/config.go | 7 +- .../config/dynamic_config_okchain.go | 15 + libs/tendermint/mempool/clist_mempool.go | 79 +++- libs/tendermint/mempool/mempool.go | 2 + libs/tendermint/mock/mempool.go | 2 + libs/tendermint/rpc/core/mempool.go | 6 + libs/tendermint/rpc/core/routes.go | 2 + libs/tendermint/rpc/core/types/responses.go | 4 + libs/tendermint/state/execution.go | 11 +- libs/tendermint/types/params.go | 3 + x/evm/types/msg_evm.go | 40 +- x/evm/types/msg_test.go | 2 +- x/feesplit/client/rest/rest.go | 2 +- 57 files changed, 1511 insertions(+), 268 deletions(-) create mode 100644 app/config/local.go create mode 100644 libs/system/trace/workload_statistic.go create mode 100644 libs/system/trace/workload_statistic_test.go diff --git a/Makefile b/Makefile index aa9e2fed7e..5910e239a1 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ IGNORE_CHECK_GO=false install_rocksdb_version:=$(ROCKSDB_VERSION) -Version=v1.6.5 +Version=v1.6.6 CosmosSDK=v0.39.2 Tendermint=v0.33.9 Iavl=v0.14.3 @@ -104,6 +104,10 @@ ifeq ($(WITH_ROCKSDB),true) ldflags += -X github.com/okex/exchain/libs/tendermint/types.DBBackend=rocksdb endif +ifeq ($(MAKECMDGOALS),testnet) + ldflags += -X github.com/okex/exchain/libs/cosmos-sdk/server.ChainID=exchain-65 +endif + ifeq ($(LINK_STATICALLY),true) ldflags += -linkmode=external -extldflags "-Wl,-z,muldefs -static" endif diff --git a/app/app.go b/app/app.go index 5acba28a1b..462821e6ab 100644 --- a/app/app.go +++ b/app/app.go @@ -14,11 +14,14 @@ import ( "google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding/proto" + "github.com/okex/exchain/app/utils/appstatus" + "github.com/okex/exchain/app/ante" okexchaincodec "github.com/okex/exchain/app/codec" appconfig "github.com/okex/exchain/app/config" - gasprice "github.com/okex/exchain/app/gasprice" + "github.com/okex/exchain/app/gasprice" "github.com/okex/exchain/app/refund" + "github.com/okex/exchain/app/types" okexchain "github.com/okex/exchain/app/types" "github.com/okex/exchain/app/utils/sanity" bam "github.com/okex/exchain/libs/cosmos-sdk/baseapp" @@ -683,7 +686,7 @@ func NewOKExChainApp( enableAnalyzer := sm.DeliverTxsExecMode(viper.GetInt(sm.FlagDeliverTxsExecMode)) == sm.DeliverTxsExecModeSerial trace.EnableAnalyzer(enableAnalyzer) - if appconfig.GetOecConfig().GetEnableDynamicGp() { + if appconfig.GetOecConfig().GetDynamicGpMode() != types.CloseMode { gpoConfig := gasprice.NewGPOConfig(appconfig.GetOecConfig().GetDynamicGpWeight(), appconfig.GetOecConfig().GetDynamicGpCheckBlocks()) app.gpo = gasprice.NewOracle(gpoConfig) } @@ -719,8 +722,9 @@ func (app *OKExChainApp) BeginBlocker(ctx sdk.Context, req abci.RequestBeginBloc // EndBlocker updates every end block func (app *OKExChainApp) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) abci.ResponseEndBlock { - if appconfig.GetOecConfig().GetEnableDynamicGp() { - _ = app.gpo.BlockGPQueue.Push(app.gpo.CurrentBlockGPs) + if appconfig.GetOecConfig().GetDynamicGpMode() != types.CloseMode { + currentBlockGPsCopy := app.gpo.CurrentBlockGPs.Copy() + _ = app.gpo.BlockGPQueue.Push(currentBlockGPsCopy) GlobalGp = app.gpo.RecommendGP() app.gpo.CurrentBlockGPs.Clear() } @@ -875,9 +879,11 @@ func PreRun(ctx *server.Context, cmd *cobra.Command) error { // init tx signature cache tmtypes.InitSignatureCache() + iavl.SetEnableFastStorage(appstatus.IsFastStorageStrategy()) // set external package flags server.SetExternalPackageValue(cmd) + ctx.Logger.Info("The database storage strategy", "fast-storage", iavl.GetEnableFastStorage()) // set the dynamic config appconfig.RegisterDynamicConfig(ctx.Logger.With("module", "config")) diff --git a/app/app_abci.go b/app/app_abci.go index 7c1288043f..b7449b8798 100644 --- a/app/app_abci.go +++ b/app/app_abci.go @@ -5,6 +5,7 @@ import ( "time" appconfig "github.com/okex/exchain/app/config" + "github.com/okex/exchain/app/types" sdk "github.com/okex/exchain/libs/cosmos-sdk/types" "github.com/okex/exchain/libs/system/trace" abci "github.com/okex/exchain/libs/tendermint/abci/types" @@ -25,7 +26,7 @@ func (app *OKExChainApp) DeliverTx(req abci.RequestDeliverTx) (res abci.Response resp := app.BaseApp.DeliverTx(req) - if appconfig.GetOecConfig().GetEnableDynamicGp() { + if appconfig.GetOecConfig().GetDynamicGpMode() != types.CloseMode { tx, err := evm.TxDecoder(app.marshal)(req.Tx) if err == nil { //optimize get tx gas price can not get value from verifySign method @@ -46,7 +47,7 @@ func (app *OKExChainApp) DeliverRealTx(req abci.TxEssentials) (res abci.Response app.EvmKeeper.Watcher.RecordTxAndFailedReceipt(req, &resp, app.GetTxDecoder()) var err error - if appconfig.GetOecConfig().GetEnableDynamicGp() { + if appconfig.GetOecConfig().GetDynamicGpMode() != types.CloseMode { tx, _ := req.(sdk.Tx) if tx == nil { tx, err = evm.TxDecoder(app.Codec())(req.GetRaw()) diff --git a/app/config/config.go b/app/config/config.go index f632fa5eda..cf01a3c121 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -2,14 +2,19 @@ package config import ( "fmt" + "path" + "path/filepath" "runtime/debug" "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/okex/exchain/libs/cosmos-sdk/server" "github.com/okex/exchain/libs/cosmos-sdk/store/iavl" + "github.com/okex/exchain/libs/cosmos-sdk/store/types" + tmiavl "github.com/okex/exchain/libs/iavl" iavlconfig "github.com/okex/exchain/libs/iavl/config" "github.com/okex/exchain/libs/system" "github.com/okex/exchain/libs/system/trace" @@ -32,12 +37,18 @@ type OecConfig struct { mempoolForceRecheckGap int64 // mempool.size mempoolSize int + // mempool.cache_size + mempoolCacheSize int // mempool.flush mempoolFlush bool // mempool.max_tx_num_per_block maxTxNumPerBlock int64 // mempool.max_gas_used_per_block maxGasUsedPerBlock int64 + // mempool.enable-pgu + enablePGU bool + // mempool.pgu-adjustment + pguAdjustment float64 // mempool.node_key_whitelist nodeKeyWhitelist []string //mempool.check_tx_cost @@ -47,12 +58,21 @@ type OecConfig struct { // gas-limit-buffer gasLimitBuffer uint64 + // enable-dynamic-gp enableDynamicGp bool // dynamic-gp-weight dynamicGpWeight int // dynamic-gp-check-blocks dynamicGpCheckBlocks int + // dynamic-gp-coefficient + dynamicGpCoefficient int + // dynamic-gp-max-gas-used + dynamicGpMaxGasUsed int64 + // dynamic-gp-max-tx-num + dynamicGpMaxTxNum int64 + // dynamic-gp-mode + dynamicGpMode int // consensus.timeout_propose csTimeoutPropose time.Duration @@ -71,6 +91,11 @@ type OecConfig struct { // iavl-cache-size iavlCacheSize int + // commit-gap-height + commitGapHeight int64 + + // iavl-fast-storage-cache-size + iavlFSCacheSize int64 // enable-wtx enableWtx bool @@ -95,21 +120,27 @@ type OecConfig struct { const ( FlagEnableDynamic = "config.enable-dynamic" - FlagMempoolRecheck = "mempool.recheck" - FlagMempoolForceRecheckGap = "mempool.force_recheck_gap" - FlagMempoolSize = "mempool.size" - FlagMempoolFlush = "mempool.flush" - FlagMaxTxNumPerBlock = "mempool.max_tx_num_per_block" - FlagMaxGasUsedPerBlock = "mempool.max_gas_used_per_block" - FlagNodeKeyWhitelist = "mempool.node_key_whitelist" - FlagMempoolCheckTxCost = "mempool.check_tx_cost" - FlagGasLimitBuffer = "gas-limit-buffer" - FlagEnableDynamicGp = "enable-dynamic-gp" - FlagDynamicGpWeight = "dynamic-gp-weight" - FlagDynamicGpCheckBlocks = "dynamic-gp-check-blocks" - FlagEnableWrappedTx = "enable-wtx" - FlagSentryAddrs = "p2p.sentry_addrs" - + FlagMempoolRecheck = "mempool.recheck" + FlagMempoolForceRecheckGap = "mempool.force_recheck_gap" + FlagMempoolSize = "mempool.size" + FlagMempoolCacheSize = "mempool.cache_size" + FlagMempoolFlush = "mempool.flush" + FlagMaxTxNumPerBlock = "mempool.max_tx_num_per_block" + FlagMaxGasUsedPerBlock = "mempool.max_gas_used_per_block" + FlagEnablePGU = "mempool.enable-pgu" + FlagPGUAdjustment = "mempool.pgu-adjustment" + FlagNodeKeyWhitelist = "mempool.node_key_whitelist" + FlagMempoolCheckTxCost = "mempool.check_tx_cost" + FlagGasLimitBuffer = "gas-limit-buffer" + FlagEnableDynamicGp = "enable-dynamic-gp" + FlagDynamicGpMode = "dynamic-gp-mode" + FlagDynamicGpWeight = "dynamic-gp-weight" + FlagDynamicGpCheckBlocks = "dynamic-gp-check-blocks" + FlagDynamicGpCoefficient = "dynamic-gp-coefficient" + FlagDynamicGpMaxGasUsed = "dynamic-gp-max-gas-used" + FlagDynamicGpMaxTxNum = "dynamic-gp-max-tx-num" + FlagEnableWrappedTx = "enable-wtx" + FlagSentryAddrs = "p2p.sentry_addrs" FlagCsTimeoutPropose = "consensus.timeout_propose" FlagCsTimeoutProposeDelta = "consensus.timeout_propose_delta" FlagCsTimeoutPrevote = "consensus.timeout_prevote" @@ -187,9 +218,21 @@ func NewOecConfig() *OecConfig { c.loadFromConfig() if viper.GetBool(FlagEnableDynamic) { - loaded := c.loadFromApollo() - if !loaded { - panic("failed to connect apollo or no config items in apollo") + if viper.IsSet(FlagApollo) { + loaded := c.loadFromApollo() + if !loaded { + panic("failed to connect apollo or no config items in apollo") + } + } else { + ok, err := c.loadFromLocal() + if err != nil { + confLogger.Error("failed to load config from local", "err", err) + } + if !ok { + confLogger.Error("failed to load config from local") + } else { + confLogger.Info("load config from local success") + } } } @@ -200,6 +243,8 @@ func defaultOecConfig() *OecConfig { return &OecConfig{ mempoolRecheck: false, mempoolForceRecheckGap: 2000, + commitGapHeight: iavlconfig.DefaultCommitGapHeight, + iavlFSCacheSize: tmiavl.DefaultIavlFastStorageCacheSize, } } @@ -216,14 +261,23 @@ func (c *OecConfig) loadFromConfig() { c.SetMempoolRecheck(viper.GetBool(FlagMempoolRecheck)) c.SetMempoolForceRecheckGap(viper.GetInt64(FlagMempoolForceRecheckGap)) c.SetMempoolSize(viper.GetInt(FlagMempoolSize)) + c.SetMempoolCacheSize(viper.GetInt(FlagMempoolCacheSize)) c.SetMempoolFlush(viper.GetBool(FlagMempoolFlush)) c.SetMempoolCheckTxCost(viper.GetBool(FlagMempoolCheckTxCost)) c.SetMaxTxNumPerBlock(viper.GetInt64(FlagMaxTxNumPerBlock)) c.SetMaxGasUsedPerBlock(viper.GetInt64(FlagMaxGasUsedPerBlock)) + c.SetEnablePGU(viper.GetBool(FlagEnablePGU)) + c.SetPGUAdjustment(viper.GetFloat64(FlagPGUAdjustment)) c.SetGasLimitBuffer(viper.GetUint64(FlagGasLimitBuffer)) + c.SetEnableDynamicGp(viper.GetBool(FlagEnableDynamicGp)) c.SetDynamicGpWeight(viper.GetInt(FlagDynamicGpWeight)) c.SetDynamicGpCheckBlocks(viper.GetInt(FlagDynamicGpCheckBlocks)) + c.SetDynamicGpCoefficient(viper.GetInt(FlagDynamicGpCoefficient)) + c.SetDynamicGpMaxGasUsed(viper.GetInt64(FlagDynamicGpMaxGasUsed)) + c.SetDynamicGpMaxTxNum(viper.GetInt64(FlagDynamicGpMaxTxNum)) + + c.SetDynamicGpMode(viper.GetInt(FlagDynamicGpMode)) c.SetCsTimeoutPropose(viper.GetDuration(FlagCsTimeoutPropose)) c.SetCsTimeoutProposeDelta(viper.GetDuration(FlagCsTimeoutProposeDelta)) c.SetCsTimeoutPrevote(viper.GetDuration(FlagCsTimeoutPrevote)) @@ -232,6 +286,8 @@ func (c *OecConfig) loadFromConfig() { c.SetCsTimeoutPrecommitDelta(viper.GetDuration(FlagCsTimeoutPrecommitDelta)) c.SetCsTimeoutCommit(viper.GetDuration(FlagCsTimeoutCommit)) c.SetIavlCacheSize(viper.GetInt(iavl.FlagIavlCacheSize)) + c.SetIavlFSCacheSize(viper.GetInt64(tmiavl.FlagIavlFastStorageCacheSize)) + c.SetCommitGapHeight(viper.GetInt64(server.FlagCommitGapHeight)) c.SetSentryAddrs(viper.GetString(FlagSentryAddrs)) c.SetNodeKeyWhitelist(viper.GetString(FlagNodeKeyWhitelist)) c.SetEnableWtx(viper.GetBool(FlagEnableWrappedTx)) @@ -261,19 +317,42 @@ func (c *OecConfig) loadFromApollo() bool { return client.LoadConfig() } +func (c *OecConfig) loadFromLocal() (bool, error) { + var err error + rootDir := viper.GetString("home") + configPath := path.Join(rootDir, "config", LocalDynamicConfigPath) + configPath, err = filepath.Abs(configPath) + if err != nil { + return false, err + } + client, err := NewLocalClient(configPath, c, confLogger) + if err != nil { + return false, err + } + ok := client.LoadConfig() + err = client.Enable() + return ok, err +} + func (c *OecConfig) format() string { return fmt.Sprintf(`%s config: mempool.recheck: %v mempool.force_recheck_gap: %d mempool.size: %d + mempool.cache_size: %d + mempool.flush: %v mempool.max_tx_num_per_block: %d mempool.max_gas_used_per_block: %d mempool.check_tx_cost: %v gas-limit-buffer: %d - enable-dynamic-gp: %v dynamic-gp-weight: %d + dynamic-gp-check-blocks: %d + dynamic-gp-coefficient: %d + dynamic-gp-max-gas-used: %d + dynamic-gp-max-tx-num: %d + dynamic-gp-mode: %d consensus.timeout_propose: %s consensus.timeout_propose_delta: %s @@ -284,18 +363,25 @@ func (c *OecConfig) format() string { consensus.timeout_commit: %s iavl-cache-size: %d + iavl-fast-storage-cache-size: %d + commit-gap-height: %d enable-analyzer: %v active-view-change: %v`, system.ChainName, c.GetMempoolRecheck(), c.GetMempoolForceRecheckGap(), c.GetMempoolSize(), + c.GetMempoolCacheSize(), c.GetMempoolFlush(), c.GetMaxTxNumPerBlock(), c.GetMaxGasUsedPerBlock(), c.GetMempoolCheckTxCost(), c.GetGasLimitBuffer(), - c.GetEnableDynamicGp(), c.GetDynamicGpWeight(), + c.GetDynamicGpCheckBlocks(), + c.GetDynamicGpCoefficient(), + c.GetDynamicGpMaxGasUsed(), + c.GetDynamicGpMaxTxNum(), + c.GetDynamicGpMode(), c.GetCsTimeoutPropose(), c.GetCsTimeoutProposeDelta(), c.GetCsTimeoutPrevote(), @@ -304,6 +390,8 @@ func (c *OecConfig) format() string { c.GetCsTimeoutPrecommitDelta(), c.GetCsTimeoutCommit(), c.GetIavlCacheSize(), + c.GetIavlFSCacheSize(), + c.GetCommitGapHeight(), c.GetEnableAnalyzer(), c.GetActiveVC(), ) @@ -311,6 +399,10 @@ func (c *OecConfig) format() string { func (c *OecConfig) update(key, value interface{}) { k, v := key.(string), value.(string) + c.updateFromKVStr(k, v) +} + +func (c *OecConfig) updateFromKVStr(k, v string) { switch k { case FlagMempoolRecheck: r, err := strconv.ParseBool(v) @@ -330,6 +422,12 @@ func (c *OecConfig) update(key, value interface{}) { return } c.SetMempoolSize(r) + case FlagMempoolCacheSize: + r, err := strconv.Atoi(v) + if err != nil { + return + } + c.SetMempoolCacheSize(r) case FlagMempoolFlush: r, err := strconv.ParseBool(v) if err != nil { @@ -343,11 +441,7 @@ func (c *OecConfig) update(key, value interface{}) { } c.SetMaxTxNumPerBlock(r) case FlagNodeKeyWhitelist: - r, ok := value.(string) - if !ok { - return - } - c.SetNodeKeyWhitelist(r) + c.SetNodeKeyWhitelist(v) case FlagMempoolCheckTxCost: r, err := strconv.ParseBool(v) if err != nil { @@ -355,17 +449,25 @@ func (c *OecConfig) update(key, value interface{}) { } c.SetMempoolCheckTxCost(r) case FlagSentryAddrs: - r, ok := value.(string) - if !ok { - return - } - c.SetSentryAddrs(r) + c.SetSentryAddrs(v) case FlagMaxGasUsedPerBlock: r, err := strconv.ParseInt(v, 10, 64) if err != nil { return } c.SetMaxGasUsedPerBlock(r) + case FlagEnablePGU: + r, err := strconv.ParseBool(v) + if err != nil { + return + } + c.SetEnablePGU(r) + case FlagPGUAdjustment: + r, err := strconv.ParseFloat(v, 64) + if err != nil { + return + } + c.SetPGUAdjustment(r) case FlagGasLimitBuffer: r, err := strconv.ParseUint(v, 10, 64) if err != nil { @@ -390,6 +492,30 @@ func (c *OecConfig) update(key, value interface{}) { return } c.SetDynamicGpCheckBlocks(r) + case FlagDynamicGpCoefficient: + r, err := strconv.Atoi(v) + if err != nil { + return + } + c.SetDynamicGpCoefficient(r) + case FlagDynamicGpMaxGasUsed: + r, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return + } + c.SetDynamicGpMaxGasUsed(r) + case FlagDynamicGpMaxTxNum: + r, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return + } + c.SetDynamicGpMaxTxNum(r) + case FlagDynamicGpMode: + r, err := strconv.Atoi(v) + if err != nil { + return + } + c.SetDynamicGpMode(r) case FlagCsTimeoutPropose: r, err := time.ParseDuration(v) if err != nil { @@ -438,6 +564,18 @@ func (c *OecConfig) update(key, value interface{}) { return } c.SetIavlCacheSize(r) + case tmiavl.FlagIavlFastStorageCacheSize: + r, err := strconv.Atoi(v) + if err != nil { + return + } + c.SetIavlFSCacheSize(int64(r)) + case server.FlagCommitGapHeight: + r, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return + } + c.SetCommitGapHeight(r) case trace.FlagEnableAnalyzer: r, err := strconv.ParseBool(v) if err != nil { @@ -523,6 +661,16 @@ func (c *OecConfig) SetMempoolSize(value int) { c.mempoolSize = value } +func (c *OecConfig) GetMempoolCacheSize() int { + return c.mempoolCacheSize +} +func (c *OecConfig) SetMempoolCacheSize(value int) { + if value < 0 { + return + } + c.mempoolCacheSize = value +} + func (c *OecConfig) GetMempoolFlush() bool { return c.mempoolFlush } @@ -595,6 +743,7 @@ func (c *OecConfig) SetMaxTxNumPerBlock(value int64) { func (c *OecConfig) GetMaxGasUsedPerBlock() int64 { return c.maxGasUsedPerBlock } + func (c *OecConfig) SetMaxGasUsedPerBlock(value int64) { if value < -1 { return @@ -602,6 +751,22 @@ func (c *OecConfig) SetMaxGasUsedPerBlock(value int64) { c.maxGasUsedPerBlock = value } +func (c *OecConfig) GetEnablePGU() bool { + return c.enablePGU +} + +func (c *OecConfig) SetEnablePGU(value bool) { + c.enablePGU = value +} + +func (c *OecConfig) GetPGUAdjustment() float64 { + return c.pguAdjustment +} + +func (c *OecConfig) SetPGUAdjustment(value float64) { + c.pguAdjustment = value +} + func (c *OecConfig) GetGasLimitBuffer() uint64 { return c.gasLimitBuffer } @@ -612,6 +777,7 @@ func (c *OecConfig) SetGasLimitBuffer(value uint64) { func (c *OecConfig) GetEnableDynamicGp() bool { return c.enableDynamicGp } + func (c *OecConfig) SetEnableDynamicGp(value bool) { c.enableDynamicGp = value } @@ -619,6 +785,7 @@ func (c *OecConfig) SetEnableDynamicGp(value bool) { func (c *OecConfig) GetDynamicGpWeight() int { return c.dynamicGpWeight } + func (c *OecConfig) SetDynamicGpWeight(value int) { if value <= 0 { value = 1 @@ -628,6 +795,51 @@ func (c *OecConfig) SetDynamicGpWeight(value int) { c.dynamicGpWeight = value } +func (c *OecConfig) GetDynamicGpCoefficient() int { + return c.dynamicGpCoefficient +} +func (c *OecConfig) SetDynamicGpCoefficient(value int) { + if value <= 0 { + value = 1 + } else if value > 100 { + value = 100 + } + c.dynamicGpCoefficient = value +} + +func (c *OecConfig) GetDynamicGpMaxGasUsed() int64 { + return c.dynamicGpMaxGasUsed +} + +func (c *OecConfig) SetDynamicGpMaxGasUsed(value int64) { + if value < -1 { + return + } + c.dynamicGpMaxGasUsed = value +} + +func (c *OecConfig) GetDynamicGpMaxTxNum() int64 { + return c.dynamicGpMaxTxNum +} + +func (c *OecConfig) SetDynamicGpMaxTxNum(value int64) { + if value < 0 { + return + } + c.dynamicGpMaxTxNum = value +} + +func (c *OecConfig) GetDynamicGpMode() int { + return c.dynamicGpMode +} + +func (c *OecConfig) SetDynamicGpMode(value int) { + if value < 0 || value > 2 { + return + } + c.dynamicGpMode = value +} + func (c *OecConfig) GetDynamicGpCheckBlocks() int { return c.dynamicGpCheckBlocks } @@ -718,6 +930,35 @@ func (c *OecConfig) SetIavlCacheSize(value int) { c.iavlCacheSize = value } +func (c *OecConfig) GetIavlFSCacheSize() int64 { + return c.iavlFSCacheSize +} + +func (c *OecConfig) SetIavlFSCacheSize(value int64) { + c.iavlFSCacheSize = value +} + +func (c *OecConfig) GetCommitGapHeight() int64 { + return atomic.LoadInt64(&c.commitGapHeight) +} +func (c *OecConfig) SetCommitGapHeight(value int64) { + if IsPruningOptionNothing() { // pruning nothing the gap should 1 + value = 1 + } + if value <= 0 { + return + } + atomic.StoreInt64(&c.commitGapHeight, value) +} + +func IsPruningOptionNothing() bool { + strategy := strings.ToLower(viper.GetString(server.FlagPruning)) + if strategy == types.PruningOptionNothing { + return true + } + return false +} + func (c *OecConfig) GetActiveVC() bool { return c.activeVC } diff --git a/app/config/config_test.go b/app/config/config_test.go index ab7d4edd26..7a27cb9c4f 100644 --- a/app/config/config_test.go +++ b/app/config/config_test.go @@ -3,8 +3,13 @@ package config import ( "testing" + iavlconfig "github.com/okex/exchain/libs/iavl/config" + + "github.com/spf13/viper" + "github.com/stretchr/testify/require" + "github.com/okex/exchain/libs/cosmos-sdk/server" tm "github.com/okex/exchain/libs/tendermint/config" ) @@ -16,4 +21,20 @@ func TestConfig(t *testing.T) { c.SetMempoolSize(150) require.Equal(t, 150, tm.DynamicConfig.GetMempoolSize()) + + iavlconfig.SetDynamicConfig(c) + require.Equal(t, int64(100), iavlconfig.DynamicConfig.GetCommitGapHeight()) + + c.SetCommitGapHeight(0) + require.Equal(t, int64(100), iavlconfig.DynamicConfig.GetCommitGapHeight()) + + c.SetCommitGapHeight(-1) + require.Equal(t, int64(100), iavlconfig.DynamicConfig.GetCommitGapHeight()) + + c.SetCommitGapHeight(10) + require.Equal(t, int64(10), iavlconfig.DynamicConfig.GetCommitGapHeight()) + + viper.SetDefault(server.FlagPruning, "nothing") + c.SetCommitGapHeight(9) + require.Equal(t, int64(1), iavlconfig.DynamicConfig.GetCommitGapHeight()) } diff --git a/app/config/local.go b/app/config/local.go new file mode 100644 index 0000000000..2a3dc2c58f --- /dev/null +++ b/app/config/local.go @@ -0,0 +1,106 @@ +package config + +import ( + "encoding/json" + "os" + "path/filepath" + + "github.com/fsnotify/fsnotify" + "github.com/okex/exchain/libs/tendermint/libs/log" +) + +const ( + LocalDynamicConfigPath = "config.dynamic.json" +) + +type LocalClient struct { + path string + dir string + oecConf *OecConfig + logger log.Logger + watcher *fsnotify.Watcher + close chan struct{} +} + +func NewLocalClient(path string, oecConf *OecConfig, logger log.Logger) (*LocalClient, error) { + if logger == nil { + logger = log.NewNopLogger() + } + dir := filepath.Dir(path) + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + client := &LocalClient{ + path: path, + dir: dir, + oecConf: oecConf, + logger: logger, + watcher: watcher, + close: make(chan struct{}), + } + go func() { + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + // logger.Debug("local config event", "event", event) + if event.Name == client.path && (event.Has(fsnotify.Write) || event.Has(fsnotify.Create)) { + logger.Debug("local config changed", "path", path) + ok = client.LoadConfig() + if !ok { + logger.Debug("local config changed but failed to load") + } else { + logger.Debug("local config changed and loaded") + } + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + logger.Error("local config watcher error", "err", err) + case <-client.close: + logger.Debug("local client closed") + return + } + } + }() + + return client, nil +} + +func (a *LocalClient) Close() error { + close(a.close) + return a.watcher.Close() +} + +func (a *LocalClient) Enable() (err error) { + return a.watcher.Add(a.dir) +} + +func (a *LocalClient) configExists() bool { + _, err := os.Stat(a.path) + return !os.IsNotExist(err) +} + +func (a *LocalClient) LoadConfig() (loaded bool) { + var conf map[string]string + bz, err := os.ReadFile(a.path) + if err != nil { + a.logger.Error("failed to read local config", "path", a.path, "err", err) + return false + } + err = json.Unmarshal(bz, &conf) + if err != nil { + a.logger.Error("failed to unmarshal local config", "path", a.path, "err", err) + return false + } + loaded = true + for k, v := range conf { + a.oecConf.updateFromKVStr(k, v) + } + a.logger.Info(a.oecConf.format()) + return +} diff --git a/app/elapse_info.go b/app/elapse_info.go index 8febf5eec3..efb581a7ae 100644 --- a/app/elapse_info.go +++ b/app/elapse_info.go @@ -2,11 +2,12 @@ package app import ( "fmt" - "github.com/okex/exchain/libs/system/trace" - "github.com/okex/exchain/libs/tendermint/libs/log" "strings" "sync" + "github.com/okex/exchain/libs/system/trace" + "github.com/okex/exchain/libs/tendermint/libs/log" + "github.com/spf13/viper" ) @@ -18,7 +19,6 @@ type SchemaConfig struct { var ( optionalSchemas = []SchemaConfig{ {trace.MempoolCheckTxCnt, 0}, - {trace.MempoolTxsCnt, 0}, {trace.MempoolCheckTxTime, 0}, {trace.SigCacheRatio, 0}, {trace.Evm, 1}, @@ -29,14 +29,13 @@ var ( {trace.IavlRuntime, 0}, {trace.RunAnteDetail, 0}, - {trace.RunAnteDetail, 0}, {trace.AnteChainDetail, 0}, {trace.Round, 0}, {trace.CommitRound, 0}, //{trace.RecvBlock, 1}, - {trace.First2LastPart, 1}, - {trace.BlockParts, 1}, - {trace.BlockPartsP2P, 1}, + {trace.First2LastPart, 0}, + {trace.BlockParts, 0}, + {trace.BlockPartsP2P, 0}, {trace.Produce, 0}, {trace.CompressBlock, 0}, {trace.UncompressBlock, 0}, @@ -45,15 +44,18 @@ var ( mandatorySchemas = []string{ trace.Height, trace.Tx, + trace.SimTx, trace.BlockSize, trace.TimeoutInterval, trace.LastBlockTime, trace.GasUsed, + trace.SimGasUsed, trace.InvalidTxs, trace.LastRun, trace.RunTx, trace.Prerun, trace.MempoolTxsCnt, + trace.Workload, } DefaultElapsedSchemas string diff --git a/app/gasprice/gasprice.go b/app/gasprice/gasprice.go index e6c947156b..7035a7e118 100644 --- a/app/gasprice/gasprice.go +++ b/app/gasprice/gasprice.go @@ -5,14 +5,17 @@ import ( "sort" "github.com/ethereum/go-ethereum/params" + "github.com/spf13/viper" appconfig "github.com/okex/exchain/app/config" "github.com/okex/exchain/app/types" + "github.com/okex/exchain/libs/cosmos-sdk/server" + sdk "github.com/okex/exchain/libs/cosmos-sdk/types" ) var ( maxPrice = big.NewInt(500 * params.GWei) - defaultPrice = big.NewInt(params.GWei / 10) + defaultPrice = getDefaultGasPrice() ) type GPOConfig struct { @@ -29,10 +32,6 @@ func NewGPOConfig(weight int, checkBlocks int) GPOConfig { } } -func DefaultGPOConfig() GPOConfig { - return NewGPOConfig(80, 5) -} - // Oracle recommends gas prices based on the content of recent blocks. type Oracle struct { CurrentBlockGPs *types.SingleBlockGPs @@ -63,30 +62,38 @@ func NewOracle(params GPOConfig) *Oracle { } func (gpo *Oracle) RecommendGP() *big.Int { - maxGasUsed := appconfig.GetOecConfig().GetMaxGasUsedPerBlock() - // If maxGasUsed is not negative and the current block's total gas consumption is - // less than 80% of it, then we consider the chain to be uncongested and return defaultPrice. - if maxGasUsed > 0 && gpo.CurrentBlockGPs.GetGasUsed() < uint64(maxGasUsed*80/100) { - return defaultPrice - } - // If the number of tx in the current block is less than the MaxTxNumPerBlock in mempool config, - // the default gas price is returned. - allGPsLen := int64(len(gpo.CurrentBlockGPs.GetAll())) - maxTxNum := appconfig.GetOecConfig().GetMaxTxNumPerBlock() - if allGPsLen < maxTxNum { - return defaultPrice - } - txPrices := gpo.BlockGPQueue.ExecuteSamplingBy(gpo.lastPrice) + maxGasUsed := appconfig.GetOecConfig().GetDynamicGpMaxGasUsed() + maxTxNum := appconfig.GetOecConfig().GetDynamicGpMaxTxNum() + allTxsLen := int64(len(gpo.CurrentBlockGPs.GetAll())) + // If the current block's total gas consumption is more than maxGasUsed, + // or the number of tx in the current block is more than maxTxNum, + // then we consider the chain to be congested. + isCongested := (int64(gpo.CurrentBlockGPs.GetGasUsed()) >= maxGasUsed) || (allTxsLen >= maxTxNum) + + // When the network is congested, increase the recommended gas price. + adoptHigherGp := (appconfig.GetOecConfig().GetDynamicGpMode() == types.CongestionHigherGpMode) && isCongested - price := gpo.lastPrice + txPrices := gpo.BlockGPQueue.ExecuteSamplingBy(gpo.lastPrice, adoptHigherGp) + + price := new(big.Int).Set(gpo.lastPrice) if len(txPrices) > 0 { sort.Sort(types.BigIntArray(txPrices)) - price = txPrices[(len(txPrices)-1)*gpo.weight/100] + price.Set(txPrices[(len(txPrices)-1)*gpo.weight/100]) } + if price.Cmp(maxPrice) > 0 { - price = new(big.Int).Set(maxPrice) + price.Set(maxPrice) } - gpo.lastPrice = price + gpo.lastPrice.Set(price) return price } + +func getDefaultGasPrice() *big.Int { + gasPrices, err := sdk.ParseDecCoins(viper.GetString(server.FlagMinGasPrices)) + if err == nil && gasPrices != nil && len(gasPrices) > 0 { + return gasPrices[0].Amount.BigInt() + } + //return the default gas price : DefaultGasPrice + return sdk.NewDecFromBigIntWithPrec(big.NewInt(1), sdk.Precision/2+1).BigInt() +} diff --git a/app/gasprice/gasprice_test.go b/app/gasprice/gasprice_test.go index 39568340f1..8a28f04965 100644 --- a/app/gasprice/gasprice_test.go +++ b/app/gasprice/gasprice_test.go @@ -1,6 +1,7 @@ package gasprice import ( + "fmt" "math/big" "testing" @@ -12,36 +13,12 @@ import ( ) func TestOracle_RecommendGP(t *testing.T) { - t.Run("case 1", func(t *testing.T) { - appconfig.GetOecConfig().SetMaxTxNumPerBlock(300) - appconfig.GetOecConfig().SetMaxGasUsedPerBlock(1000000) - appconfig.GetOecConfig().SetDynamicGpCheckBlocks(5) - config := NewGPOConfig(80, appconfig.GetOecConfig().GetDynamicGpCheckBlocks()) - var testRecommendGP *big.Int - gpo := NewOracle(config) - coefficient := int64(200000) - gpNum := 20000 - - for blockNum := 1; blockNum <= 10; blockNum++ { - for i := 0; i < gpNum; i++ { - gp := big.NewInt(coefficient + params.GWei) - gpo.CurrentBlockGPs.Update(gp, 35) // chain is uncongested - //gpo.CurrentBlockGPs.Update(gp, 45) // chain is congested - coefficient-- - } - gpo.BlockGPQueue.Push(gpo.CurrentBlockGPs) - testRecommendGP = gpo.RecommendGP() - gpo.CurrentBlockGPs.Clear() - require.NotNil(t, testRecommendGP) - //fmt.Println(testRecommendGP) - } - }) - t.Run("case 2", func(t *testing.T) { - // Case 2 reproduces the problem of GP increase when the OKC's block height is 13527188 - appconfig.GetOecConfig().SetMaxTxNumPerBlock(300) - appconfig.GetOecConfig().SetMaxGasUsedPerBlock(1000000) - appconfig.GetOecConfig().SetDynamicGpCheckBlocks(5) - config := NewGPOConfig(80, appconfig.GetOecConfig().GetDynamicGpCheckBlocks()) + t.Run("case 1: mainnet case", func(t *testing.T) { + // Case 1 reproduces the problem of GP increase when the OKC's block height is 13527188 + appconfig.GetOecConfig().SetDynamicGpMaxTxNum(300) + appconfig.GetOecConfig().SetDynamicGpMaxGasUsed(1000000) + appconfig.GetOecConfig().SetDynamicGpMode(1) + config := NewGPOConfig(80, 5) var testRecommendGP *big.Int gpo := NewOracle(config) @@ -79,26 +56,350 @@ func TestOracle_RecommendGP(t *testing.T) { testRecommendGP = gpo.RecommendGP() require.NotNil(t, testRecommendGP) - //fmt.Println(testRecommendGP) + fmt.Println(testRecommendGP) //testRecommendGP == 0.1GWei }) + t.Run("case 2: not full tx, not full gasUsed, maxGasUsed configured, mode 1", func(t *testing.T) { + appconfig.GetOecConfig().SetDynamicGpMaxTxNum(300) + appconfig.GetOecConfig().SetDynamicGpMaxGasUsed(1000000) + appconfig.GetOecConfig().SetDynamicGpMode(1) + config := NewGPOConfig(80, 5) + var testRecommendGP *big.Int + gpo := NewOracle(config) + delta := int64(200000) + gpNum := 200 + + for blockNum := 1; blockNum <= 10; blockNum++ { + for i := 0; i < gpNum; i++ { + gp := big.NewInt(delta + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 3500) + delta-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + }) + t.Run("case 3: not full tx, not full gasUsed, maxGasUsed unconfigured, mode 0", func(t *testing.T) { + appconfig.GetOecConfig().SetDynamicGpMaxTxNum(300) + appconfig.GetOecConfig().SetDynamicGpMaxGasUsed(-1) + appconfig.GetOecConfig().SetDynamicGpMode(0) + config := NewGPOConfig(80, 5) + var testRecommendGP *big.Int + gpo := NewOracle(config) + coefficient := int64(200000) + gpNum := 200 + + for blockNum := 1; blockNum <= 10; blockNum++ { + for i := 0; i < gpNum; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 3500) // chain is uncongested + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + }) + t.Run("case 4: not full tx, not full gasUsed, maxGasUsed unconfigured, mode 1", func(t *testing.T) { + appconfig.GetOecConfig().SetDynamicGpMaxTxNum(300) + appconfig.GetOecConfig().SetDynamicGpMaxGasUsed(-1) + appconfig.GetOecConfig().SetDynamicGpMode(1) + config := NewGPOConfig(80, 5) + var testRecommendGP *big.Int + gpo := NewOracle(config) + coefficient := int64(200000) + gpNum := 200 + + for blockNum := 1; blockNum <= 10; blockNum++ { + for i := 0; i < gpNum; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 3500) + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + }) + + t.Run("case 5: not full tx, full gasUsed, gp surge, mode 0", func(t *testing.T) { + appconfig.GetOecConfig().SetDynamicGpMaxTxNum(200) + appconfig.GetOecConfig().SetDynamicGpMaxGasUsed(1000000) + appconfig.GetOecConfig().SetDynamicGpMode(0) + config := NewGPOConfig(80, 5) + var testRecommendGP *big.Int + gpo := NewOracle(config) + coefficient := int64(200000) + gpNum := 180 + + for blockNum := 1; blockNum <= 5; blockNum++ { + for i := 0; i < gpNum; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 6000) // chain is congested + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + for blockNum := 5; blockNum <= 10; blockNum++ { + for i := 0; i < gpNum/2; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 6000) // chain is congested + coefficient-- + } + for i := gpNum / 2; i < gpNum; i++ { + gp := big.NewInt((coefficient + params.GWei) * 100) + gpo.CurrentBlockGPs.Update(gp, 6000) + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + }) + t.Run("case 6: not full tx, full gasUsed, gp surge, mode 1", func(t *testing.T) { + appconfig.GetOecConfig().SetDynamicGpMaxTxNum(300) + appconfig.GetOecConfig().SetDynamicGpMaxGasUsed(1000000) + appconfig.GetOecConfig().SetDynamicGpMode(1) + config := NewGPOConfig(80, 5) + var testRecommendGP *big.Int + gpo := NewOracle(config) + coefficient := int64(200000) + gpNum := 200 + + for blockNum := 1; blockNum <= 5; blockNum++ { + for i := 0; i < gpNum; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 4500) // chain is congested + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + for blockNum := 5; blockNum <= 10; blockNum++ { + for i := 0; i < gpNum/2; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 4500) // chain is congested + coefficient-- + } + for i := gpNum / 2; i < gpNum; i++ { + gp := big.NewInt((coefficient + params.GWei) * 100) + gpo.CurrentBlockGPs.Update(gp, 4500) // chain is congested + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + }) + t.Run("case 7: full tx, not full gasUsed, gp surge, mode 0", func(t *testing.T) { + appconfig.GetOecConfig().SetDynamicGpMaxTxNum(300) + appconfig.GetOecConfig().SetDynamicGpMaxGasUsed(1000000) + appconfig.GetOecConfig().SetDynamicGpMode(0) + config := NewGPOConfig(80, 5) + var testRecommendGP *big.Int + gpo := NewOracle(config) + coefficient := int64(200000) + gpNum := 300 + for blockNum := 1; blockNum <= 5; blockNum++ { + for i := 0; i < gpNum; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 450) // chain is congested + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + for blockNum := 5; blockNum <= 10; blockNum++ { + for i := 0; i < gpNum/2; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 450) // chain is congested + coefficient-- + } + for i := gpNum / 2; i < gpNum; i++ { + gp := big.NewInt((coefficient + params.GWei) * 100) + gpo.CurrentBlockGPs.Update(gp, 450) // chain is congested + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + }) + t.Run("case 8: full tx, not full gasUsed, gp surge, mode 1", func(t *testing.T) { + appconfig.GetOecConfig().SetDynamicGpMaxTxNum(300) + appconfig.GetOecConfig().SetDynamicGpMaxGasUsed(1000000) + appconfig.GetOecConfig().SetDynamicGpMode(1) + config := NewGPOConfig(80, 5) + var testRecommendGP *big.Int + gpo := NewOracle(config) + coefficient := int64(200000) + gpNum := 300 + for blockNum := 1; blockNum <= 5; blockNum++ { + for i := 0; i < gpNum; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 450) // chain is congested + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + for blockNum := 5; blockNum <= 10; blockNum++ { + for i := 0; i < gpNum/2; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 450) + coefficient-- + } + for i := gpNum / 2; i < gpNum; i++ { + gp := big.NewInt((coefficient + params.GWei) * 100) + gpo.CurrentBlockGPs.Update(gp, 450) + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + }) + t.Run("case 9: not full tx, full gasUsed, gp decrease, mode 0", func(t *testing.T) { + appconfig.GetOecConfig().SetDynamicGpMaxTxNum(300) + appconfig.GetOecConfig().SetDynamicGpMaxGasUsed(1000000) + appconfig.GetOecConfig().SetDynamicGpMode(0) + config := NewGPOConfig(80, 5) + var testRecommendGP *big.Int + gpo := NewOracle(config) + coefficient := int64(200000) + gpNum := 200 + + for blockNum := 1; blockNum <= 5; blockNum++ { + for i := 0; i < gpNum/2; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 6000) + coefficient-- + } + for i := gpNum / 2; i < gpNum; i++ { + gp := big.NewInt((coefficient + params.GWei) * 100) + gpo.CurrentBlockGPs.Update(gp, 6000) + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + for blockNum := 5; blockNum <= 10; blockNum++ { + for i := 0; i < gpNum; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 6000) + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + }) + t.Run("case 10: not full tx, full gasUsed, gp decrease, mode 1", func(t *testing.T) { + appconfig.GetOecConfig().SetDynamicGpMaxTxNum(300) + appconfig.GetOecConfig().SetDynamicGpMaxGasUsed(1000000) + appconfig.GetOecConfig().SetDynamicGpMode(1) + config := NewGPOConfig(80, 5) + var testRecommendGP *big.Int + gpo := NewOracle(config) + coefficient := int64(200000) + gpNum := 200 + + for blockNum := 1; blockNum <= 5; blockNum++ { + for i := 0; i < gpNum/2; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 6000) + coefficient-- + } + for i := gpNum / 2; i < gpNum; i++ { + gp := big.NewInt((coefficient + params.GWei) * 100) + gpo.CurrentBlockGPs.Update(gp, 6000) + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + for blockNum := 5; blockNum <= 10; blockNum++ { + for i := 0; i < gpNum; i++ { + gp := big.NewInt(coefficient + params.GWei) + gpo.CurrentBlockGPs.Update(gp, 6000) + coefficient-- + } + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) + testRecommendGP = gpo.RecommendGP() + gpo.CurrentBlockGPs.Clear() + require.NotNil(t, testRecommendGP) + fmt.Println(testRecommendGP) + } + }) } func BenchmarkRecommendGP(b *testing.B) { - appconfig.GetOecConfig().SetMaxTxNumPerBlock(300) - appconfig.GetOecConfig().SetMaxGasUsedPerBlock(1000000) - appconfig.GetOecConfig().SetDynamicGpCheckBlocks(6) - config := NewGPOConfig(80, appconfig.GetOecConfig().GetDynamicGpCheckBlocks()) + appconfig.GetOecConfig().SetDynamicGpMaxTxNum(300) + appconfig.GetOecConfig().SetDynamicGpMaxGasUsed(1000000) + appconfig.GetOecConfig().SetDynamicGpMode(0) + config := NewGPOConfig(80, 6) gpo := NewOracle(config) coefficient := int64(2000000) - gpNum := 20000 + gpNum := 300 for blockNum := 1; blockNum <= 20; blockNum++ { for i := 0; i < gpNum; i++ { gp := big.NewInt(coefficient + params.GWei) - gpo.CurrentBlockGPs.Update(gp, 35) + gpo.CurrentBlockGPs.Update(gp, 6000) coefficient-- } - gpo.BlockGPQueue.Push(gpo.CurrentBlockGPs) + cp := gpo.CurrentBlockGPs.Copy() + gpo.BlockGPQueue.Push(cp) gpo.CurrentBlockGPs.Clear() } diff --git a/app/node_mode.go b/app/node_mode.go index b9fe7c8078..5b6a07f0c8 100644 --- a/app/node_mode.go +++ b/app/node_mode.go @@ -6,6 +6,8 @@ import ( "sort" "strings" + "github.com/spf13/viper" + appconfig "github.com/okex/exchain/app/config" "github.com/okex/exchain/app/rpc/backend" "github.com/okex/exchain/app/types" @@ -19,7 +21,6 @@ import ( "github.com/okex/exchain/libs/tendermint/mempool" evmtypes "github.com/okex/exchain/x/evm/types" "github.com/okex/exchain/x/evm/watcher" - "github.com/spf13/viper" ) func setNodeConfig(ctx *server.Context) error { @@ -70,13 +71,20 @@ func setRpcConfig(ctx *server.Context) { func setValidatorConfig(ctx *server.Context) { viper.SetDefault(abcitypes.FlagDisableABCIQueryMutex, true) - viper.SetDefault(appconfig.FlagEnableDynamicGp, false) + viper.SetDefault(appconfig.FlagDynamicGpMode, types.CloseMode) viper.SetDefault(iavl.FlagIavlEnableAsyncCommit, true) viper.SetDefault(store.FlagIavlCacheSize, 10000000) viper.SetDefault(server.FlagPruning, "everything") - ctx.Logger.Info(fmt.Sprintf("Set --%s=%v\n--%s=%v\n--%s=%v\n--%s=%v\n--%s=%v by validator node mode", - abcitypes.FlagDisableABCIQueryMutex, true, appconfig.FlagEnableDynamicGp, false, iavl.FlagIavlEnableAsyncCommit, true, - store.FlagIavlCacheSize, 10000000, server.FlagPruning, "everything")) + viper.SetDefault(evmtypes.FlagEnableBloomFilter, false) + viper.SetDefault(watcher.FlagFastQuery, false) + viper.SetDefault(appconfig.FlagMaxGasUsedPerBlock, 120000000) + viper.SetDefault(mempool.FlagEnablePendingPool, false) + + ctx.Logger.Info(fmt.Sprintf("Set --%s=%v\n--%s=%v\n--%s=%v\n--%s=%v\n--%s=%v\n--%s=%v\n--%s=%v\n--%s=%v\n--%s=%v by validator node mode", + abcitypes.FlagDisableABCIQueryMutex, true, appconfig.FlagDynamicGpMode, types.CloseMode, iavl.FlagIavlEnableAsyncCommit, true, + store.FlagIavlCacheSize, 10000000, server.FlagPruning, "everything", + evmtypes.FlagEnableBloomFilter, false, watcher.FlagFastQuery, false, appconfig.FlagMaxGasUsedPerBlock, 120000000, + mempool.FlagEnablePendingPool, false)) } func setArchiveConfig(ctx *server.Context) { diff --git a/app/repair_state.go b/app/repair_state.go index d61d52719d..744f1007a1 100644 --- a/app/repair_state.go +++ b/app/repair_state.go @@ -9,6 +9,7 @@ import ( "time" "github.com/okex/exchain/app/config" + "github.com/okex/exchain/app/utils/appstatus" "github.com/okex/exchain/libs/cosmos-sdk/server" "github.com/okex/exchain/libs/cosmos-sdk/store/flatkv" @@ -60,6 +61,7 @@ func repairStateOnStart(ctx *server.Context) { orgEnableFlatKV := viper.GetBool(flatkv.FlagEnable) iavl.EnableAsyncCommit = false viper.Set(flatkv.FlagEnable, false) + iavl.SetEnableFastStorage(appstatus.IsFastStorageStrategy()) // repair state RepairState(ctx, true) diff --git a/app/rpc/backend/backend.go b/app/rpc/backend/backend.go index 5226a3d94c..6017c7336e 100644 --- a/app/rpc/backend/backend.go +++ b/app/rpc/backend/backend.go @@ -6,6 +6,9 @@ import ( "fmt" "time" + "github.com/okex/exchain/libs/cosmos-sdk/server" + "github.com/okex/exchain/libs/cosmos-sdk/store/types" + "github.com/okex/exchain/libs/tendermint/global" lru "github.com/hashicorp/golang-lru" @@ -73,6 +76,7 @@ type Backend interface { ConvertToBlockNumber(rpctypes.BlockNumberOrHash) (rpctypes.BlockNumber, error) // Block returns the block at the given block number, block data is readonly Block(height *int64) (*coretypes.ResultBlock, error) + PruneEverything() bool } var _ Backend = (*EthermintBackend)(nil) @@ -92,6 +96,7 @@ type EthermintBackend struct { logsLimit int logsTimeout int // timeout second blockCache *lru.Cache + pruneEverything bool } // New creates a new EthermintBackend instance @@ -109,11 +114,16 @@ func New(clientCtx clientcontext.CLIContext, log log.Logger, rateLimiters map[st backendCache: NewLruCache(), logsLimit: viper.GetInt(FlagLogsLimit), logsTimeout: viper.GetInt(FlagLogsTimeout), + pruneEverything: viper.GetString(server.FlagPruning) == types.PruningOptionEverything, } b.blockCache, _ = lru.New(blockCacheSize) return b } +func (b *EthermintBackend) PruneEverything() bool { + return b.pruneEverything +} + func (b *EthermintBackend) LogsLimit() int { return b.logsLimit } diff --git a/app/rpc/namespaces/eth/api.go b/app/rpc/namespaces/eth/api.go index 3a965faf0b..6a7dd57db7 100644 --- a/app/rpc/namespaces/eth/api.go +++ b/app/rpc/namespaces/eth/api.go @@ -32,6 +32,7 @@ import ( "github.com/okex/exchain/app/rpc/monitor" "github.com/okex/exchain/app/rpc/namespaces/eth/simulation" rpctypes "github.com/okex/exchain/app/rpc/types" + "github.com/okex/exchain/app/types" ethermint "github.com/okex/exchain/app/types" "github.com/okex/exchain/app/utils" clientcontext "github.com/okex/exchain/libs/cosmos-sdk/client/context" @@ -251,8 +252,18 @@ func (api *PublicEthereumAPI) GasPrice() *hexutil.Big { monitor := monitor.GetMonitor("eth_gasPrice", api.logger, api.Metrics).OnBegin() defer monitor.OnEnd() - if appconfig.GetOecConfig().GetEnableDynamicGp() { - return (*hexutil.Big)(app.GlobalGp) + if appconfig.GetOecConfig().GetDynamicGpMode() != types.CloseMode { + price := new(big.Int).Set(app.GlobalGp) + if price.Cmp((*big.Int)(api.gasPrice)) == -1 { + price.Set((*big.Int)(api.gasPrice)) + } + + if appconfig.GetOecConfig().GetDynamicGpCoefficient() > 0 { + coefficient := big.NewInt(int64(appconfig.GetOecConfig().GetDynamicGpCoefficient())) + gpRes := new(big.Int).Mul(price, coefficient) + return (*hexutil.Big)(gpRes) + } + return (*hexutil.Big)(price) } return api.gasPrice @@ -446,10 +457,16 @@ func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockN monitor := monitor.GetMonitor("eth_getTransactionCount", api.logger, api.Metrics).OnBegin() defer monitor.OnEnd("address", address, "block number", blockNrOrHash) - blockNum, err := api.backend.ConvertToBlockNumber(blockNrOrHash) - if err != nil { - return nil, err + var err error + blockNum := rpctypes.LatestBlockNumber + // do not support block number param when node is pruning everything + if !api.backend.PruneEverything() { + blockNum, err = api.backend.ConvertToBlockNumber(blockNrOrHash) + if err != nil { + return nil, err + } } + clientCtx := api.clientCtx pending := blockNum == rpctypes.PendingBlockNumber // pass the given block height to the context if the height is not pending or latest @@ -884,7 +901,7 @@ func (api *PublicEthereumAPI) doCall( // evm tx to cm tx is no need watch db query useWatch := api.useWatchBackend(blockNum) if useWatch && args.To != nil && - api.JudgeEvm2CmTx(args.To.Bytes(), *args.Data) { + api.JudgeEvm2CmTx(args.To.Bytes(), data) { useWatch = false } diff --git a/app/rpc/websockets/server.go b/app/rpc/websockets/server.go index 84ea8e8aec..364f012b86 100644 --- a/app/rpc/websockets/server.go +++ b/app/rpc/websockets/server.go @@ -23,6 +23,8 @@ import ( "github.com/spf13/viper" ) +const FlagSubscribeLimit = "ws.max-subscriptions" + // Server defines a server that handles Ethereum websockets. type Server struct { rpcAddr string // listen address of rest-server @@ -34,6 +36,7 @@ type Server struct { connPoolLock *sync.Mutex currentConnNum metrics.Gauge maxConnNum metrics.Gauge + maxSubLimit int } // NewServer creates a new websocket server instance. @@ -69,6 +72,7 @@ func NewServer(clientCtx context.CLIContext, log log.Logger, wsAddr string) *Ser Name: "connection_capacity", Help: "the capacity number of websocket client connections", }, nil), + maxSubLimit: viper.GetInt(FlagSubscribeLimit), } } @@ -116,23 +120,36 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (s *Server) sendErrResponse(conn *wsConn, msg string) { - res := &ErrorResponseJSON{ + res := makeErrResponse(msg) + err := conn.WriteJSON(res) + if err != nil { + s.logger.Error("websocket failed write message", "error", err) + } +} + +func makeErrResponse(errMsg string) *ErrorResponseJSON { + return &ErrorResponseJSON{ Jsonrpc: "2.0", Error: &ErrorMessageJSON{ Code: big.NewInt(-32600), - Message: msg, + Message: errMsg, }, ID: big.NewInt(1), } - err := conn.WriteJSON(res) - if err != nil { - s.logger.Error("websocket failed write message", "error", err) - } } type wsConn struct { - conn *websocket.Conn - mux *sync.Mutex + conn *websocket.Conn + mux *sync.Mutex + subCount int +} + +func (w *wsConn) GetSubCount() int { + return w.subCount +} + +func (w *wsConn) AddSubCount(delta int) { + w.subCount += delta } func (w *wsConn) WriteJSON(v interface{}) error { @@ -181,6 +198,11 @@ func (s *Server) readLoop(wsConn *wsConn) { s.sendErrResponse(wsConn, "invalid request") } if methodStr == "eth_subscribe" { + if wsConn.GetSubCount() >= s.maxSubLimit { + s.sendErrResponse(wsConn, + fmt.Sprintf("subscription has reached the upper limit(%d)", s.maxSubLimit)) + continue + } params, ok := msg["params"].([]interface{}) if !ok || len(params) == 0 { s.sendErrResponse(wsConn, "invalid parameters") @@ -212,6 +234,7 @@ func (s *Server) readLoop(wsConn *wsConn) { } s.logger.Debug("successfully subscribe", "ID", id) subIds[id] = struct{}{} + wsConn.AddSubCount(1) continue } else if methodStr == "eth_unsubscribe" { ids, ok := msg["params"].([]interface{}) @@ -245,43 +268,44 @@ func (s *Server) readLoop(wsConn *wsConn) { } s.logger.Debug("successfully unsubscribe", "ID", id) delete(subIds, rpc.ID(id)) + wsConn.AddSubCount(-1) continue } // otherwise, call the usual rpc server to respond - err = s.tcpGetAndSendResponse(wsConn, mb) + data, err := s.getRpcResponse(mb) if err != nil { s.sendErrResponse(wsConn, err.Error()) + } else { + wsConn.WriteJSON(data) } } } -// tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response -// to the client over websockets -func (s *Server) tcpGetAndSendResponse(conn *wsConn, mb []byte) error { +// getRpcResponse connects to the rest-server over tcp, posts a JSON-RPC request, and return response +func (s *Server) getRpcResponse(mb []byte) (interface{}, error) { req, err := http.NewRequest(http.MethodPost, s.rpcAddr, bytes.NewReader(mb)) if err != nil { - return fmt.Errorf("failed to request; %s", err) + return nil, fmt.Errorf("failed to request; %s", err) } req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { - return fmt.Errorf("failed to write to rest-server; %s", err) + return nil, fmt.Errorf("failed to write to rest-server; %s", err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("could not read body from response; %s", err) + return nil, fmt.Errorf("could not read body from response; %s", err) } var wsSend interface{} err = json.Unmarshal(body, &wsSend) if err != nil { - return fmt.Errorf("failed to unmarshal rest-server response; %s", err) + return nil, fmt.Errorf("failed to unmarshal rest-server response; %s", err) } - - return conn.WriteJSON(wsSend) + return wsSend, nil } func (s *Server) closeWsConnection(subIds map[rpc.ID]struct{}) { @@ -304,13 +328,17 @@ func (s *Server) batchCall(mb []byte, wsConn *wsConn) error { for i := 0; i < len(msgs); i++ { b, err := json.Marshal(msgs[i]) if err != nil { - s.sendErrResponse(wsConn, err.Error()) - continue + s.sendErrResponse(wsConn, "invalid request") + s.logger.Error("web socket batchCall failed", "error", err) + break } - err = s.tcpGetAndSendResponse(wsConn, b) + data, err := s.getRpcResponse(b) if err != nil { - s.sendErrResponse(wsConn, err.Error()) + data = makeErrResponse(err.Error()) + } + if err := wsConn.WriteJSON(data); err != nil { + break // connection broken } } return nil diff --git a/app/types/gasprice.go b/app/types/gasprice.go index 2ad6b449e0..4b7450adba 100644 --- a/app/types/gasprice.go +++ b/app/types/gasprice.go @@ -4,13 +4,17 @@ import ( "errors" "math/big" "sort" - - "github.com/ethereum/go-ethereum/params" ) -const sampleNumber = 3 // Number of transactions sampled in a block +const ( + sampleNumber = 3 // Number of transactions sampled in a block + + CongestionHigherGpMode = 0 + NormalGpMode = 1 + CloseMode = 2 -var ignorePrice = big.NewInt(2 * params.Wei) + NoGasUsedCap = -1 +) // SingleBlockGPs holds the gas price of all transactions in a block // and will sample the lower few gas prices according to sampleNumber. @@ -44,23 +48,33 @@ func (bgp SingleBlockGPs) GetGasUsed() uint64 { } func (bgp *SingleBlockGPs) AddSampledGP(gp *big.Int) { - bgp.sampled = append(bgp.sampled, gp) + gpCopy := new(big.Int).Set(gp) + bgp.sampled = append(bgp.sampled, gpCopy) } func (bgp *SingleBlockGPs) Update(gp *big.Int, gas uint64) { - bgp.all = append(bgp.all, gp) + gpCopy := new(big.Int).Set(gp) + bgp.all = append(bgp.all, gpCopy) bgp.gasUsed += gas } func (bgp *SingleBlockGPs) Clear() { - bgp.all = bgp.all[:0] - bgp.sampled = bgp.sampled[:0] + bgp.all = make([]*big.Int, 0) + bgp.sampled = make([]*big.Int, 0) bgp.gasUsed = 0 } -func (bgp *SingleBlockGPs) SampleGP() { +func (bgp *SingleBlockGPs) Copy() *SingleBlockGPs { + return &SingleBlockGPs{ + all: bgp.all, + sampled: bgp.sampled, + gasUsed: bgp.gasUsed, + } +} + +func (bgp *SingleBlockGPs) SampleGP(adoptHigherGp bool) { // "len(bgp.sampled) != 0" means it has been sampled - if len(bgp.all) == 0 && len(bgp.sampled) != 0 { + if len(bgp.sampled) != 0 { return } @@ -68,14 +82,42 @@ func (bgp *SingleBlockGPs) SampleGP() { copy(txGPs, bgp.all) sort.Sort(BigIntArray(txGPs)) - for _, gp := range txGPs { - // If a GP is too cheap, discard it. - if gp.Cmp(ignorePrice) == -1 { - continue + if adoptHigherGp { + + rowSampledGPs := make([]*big.Int, 0) + + // Addition of sampleNumber lower-priced gp + for i := 0; i < len(txGPs); i++ { + if i >= sampleNumber { + break + } + rowSampledGPs = append(rowSampledGPs, new(big.Int).Set(txGPs[i])) + } + + // Addition of sampleNumber higher-priced gp + for i := len(txGPs) - 1; i >= 0; i-- { + if len(txGPs)-1-i >= sampleNumber { + break + } + rowSampledGPs = append(rowSampledGPs, new(big.Int).Set(txGPs[i])) } - bgp.AddSampledGP(gp) - if len(bgp.sampled) >= sampleNumber { - break + + if len(rowSampledGPs) != 0 { + sampledGPLen := big.NewInt(int64(len(rowSampledGPs))) + sum := big.NewInt(0) + for _, gp := range rowSampledGPs { + sum.Add(sum, gp) + } + + avgGP := new(big.Int).Quo(sum, sampledGPLen) + bgp.AddSampledGP(avgGP) + } + } else { + for _, gp := range txGPs { + bgp.AddSampledGP(gp) + if len(bgp.sampled) >= sampleNumber { + break + } } } } @@ -155,20 +197,23 @@ func (rs *BlockGPResults) Pop() (*SingleBlockGPs, error) { return element, nil } -func (rs *BlockGPResults) ExecuteSamplingBy(lastPrice *big.Int) []*big.Int { +func (rs *BlockGPResults) ExecuteSamplingBy(lastPrice *big.Int, adoptHigherGp bool) []*big.Int { var txPrices []*big.Int if !rs.IsEmpty() { // traverse the circular queue for i := rs.front; i != rs.rear; i = (i + 1) % rs.capacity { - rs.items[i].SampleGP() - + rs.items[i].SampleGP(adoptHigherGp) // If block is empty, use the latest gas price for sampling. if len(rs.items[i].sampled) == 0 { rs.items[i].AddSampledGP(lastPrice) } - txPrices = append(txPrices, rs.items[i].sampled...) } + rs.items[rs.rear].SampleGP(adoptHigherGp) + if len(rs.items[rs.rear].sampled) == 0 { + rs.items[rs.rear].AddSampledGP(lastPrice) + } + txPrices = append(txPrices, rs.items[rs.rear].sampled...) } return txPrices } diff --git a/app/types/node_mode.go b/app/types/node_mode.go index d1b642dcff..f35b484ff0 100644 --- a/app/types/node_mode.go +++ b/app/types/node_mode.go @@ -5,7 +5,7 @@ type NodeMode string const ( // node mode values RpcNode NodeMode = "rpc" - ValidatorNode NodeMode = "validator" + ValidatorNode NodeMode = "val" ArchiveNode NodeMode = "archive" InnertxNode NodeMode = "innertx" diff --git a/app/utils/sanity/start.go b/app/utils/sanity/start.go index 7c2a57ffe0..62b106b6a5 100644 --- a/app/utils/sanity/start.go +++ b/app/utils/sanity/start.go @@ -3,6 +3,8 @@ package sanity import ( "fmt" + "github.com/spf13/viper" + apptype "github.com/okex/exchain/app/types" "github.com/okex/exchain/app/utils/appstatus" "github.com/okex/exchain/libs/cosmos-sdk/server" @@ -14,7 +16,6 @@ import ( "github.com/okex/exchain/libs/tendermint/types" "github.com/okex/exchain/x/evm/watcher" "github.com/okex/exchain/x/infura" - "github.com/spf13/viper" ) // CheckStart check start command's flags. if user set conflict flags return error. @@ -37,7 +38,7 @@ import ( // --node-mode=validator manage the following flags: // --disable-checktx-mutex=true // --disable-query-mutex=true -// --enable-dynamic-gp=false +// --dynamic-gp-mode=2 // --iavl-enable-async-commit=true // --iavl-cache-size=10000000 // --pruning=everything diff --git a/cmd/client/flags.go b/cmd/client/flags.go index 29494ca650..7ff9ef6110 100644 --- a/cmd/client/flags.go +++ b/cmd/client/flags.go @@ -1,14 +1,13 @@ package client import ( - "github.com/spf13/cobra" - "github.com/okex/exchain/app" "github.com/okex/exchain/app/config" "github.com/okex/exchain/app/rpc" "github.com/okex/exchain/app/rpc/backend" "github.com/okex/exchain/app/rpc/namespaces/eth" "github.com/okex/exchain/app/rpc/namespaces/eth/filters" + "github.com/okex/exchain/app/rpc/websockets" "github.com/okex/exchain/app/types" "github.com/okex/exchain/app/utils/sanity" "github.com/okex/exchain/libs/system/trace" @@ -21,10 +20,11 @@ import ( "github.com/okex/exchain/x/infura" "github.com/okex/exchain/x/token" "github.com/okex/exchain/x/wasm" + "github.com/spf13/cobra" ) func RegisterAppFlag(cmd *cobra.Command) { - cmd.Flags().Bool(watcher.FlagFastQuery, false, "Enable the fast query mode for rpc queries") + cmd.Flags().Bool(watcher.FlagFastQuery, true, "Enable the fast query mode for rpc queries") cmd.Flags().Uint64(eth.FlagFastQueryThreshold, 10, "Set the threshold of fast query") cmd.Flags().Int(watcher.FlagFastQueryLru, 1000, "Set the size of LRU cache under fast-query mode") cmd.Flags().Int(backend.FlagApiBackendBlockLruCache, 30000, "Set the size of block LRU cache for backend mem cache") @@ -32,7 +32,7 @@ func RegisterAppFlag(cmd *cobra.Command) { cmd.Flags().Bool(watcher.FlagCheckWd, false, "Enable check watchDB in log") cmd.Flags().Bool(rpc.FlagPersonalAPI, true, "Enable the personal_ prefixed set of APIs in the Web3 JSON-RPC spec") cmd.Flags().Bool(rpc.FlagDebugAPI, false, "Enable the debug_ prefixed set of APIs in the Web3 JSON-RPC spec") - cmd.Flags().Bool(evmtypes.FlagEnableBloomFilter, false, "Enable bloom filter for event logs") + cmd.Flags().Bool(evmtypes.FlagEnableBloomFilter, true, "Enable bloom filter for event logs") cmd.Flags().Int64(filters.FlagGetLogsHeightSpan, 2000, "config the block height span for get logs") // register application rpc to nacos cmd.Flags().String(rpc.FlagRestApplicationName, "", "rest application name in nacos") @@ -56,9 +56,16 @@ func RegisterAppFlag(cmd *cobra.Command) { cmd.Flags().Int(rpc.FlagRateLimitBurst, 1, "Set the concurrent count of requests allowed of rpc rate limiter") cmd.Flags().Uint64(config.FlagGasLimitBuffer, 50, "Percentage to increase gas limit") cmd.Flags().String(rpc.FlagDisableAPI, "", "Set the RPC API to be disabled, such as \"eth_getLogs,eth_newFilter,eth_newBlockFilter,eth_newPendingTransactionFilter,eth_getFilterChanges\"") + + cmd.Flags().Bool(config.FlagEnableDynamicGp, false, "Enable node to dynamic support gas price suggest") + cmd.Flags().MarkHidden(config.FlagEnableDynamicGp) + cmd.Flags().Int64(config.FlagDynamicGpMaxTxNum, 300, "If tx number in the block is more than this, the network is congested.") + cmd.Flags().Int64(config.FlagDynamicGpMaxGasUsed, types.NoGasUsedCap, "If the block gas used is more than this, the network is congested.") cmd.Flags().Int(config.FlagDynamicGpWeight, 80, "The recommended weight of dynamic gas price [1,100])") cmd.Flags().Int(config.FlagDynamicGpCheckBlocks, 5, "The recommended number of blocks checked of dynamic gas price [1,100])") - cmd.Flags().Bool(config.FlagEnableDynamicGp, true, "Enable node to dynamic support gas price suggest") + cmd.Flags().Int(config.FlagDynamicGpCoefficient, 1, "Adjustment coefficient of dynamic gas price [1,100])") + cmd.Flags().Int(config.FlagDynamicGpMode, types.CongestionHigherGpMode, "Dynamic gas price mode (0: higher price|1: normal|2: close) is used to manage flags") + cmd.Flags().Bool(config.FlagEnableHasBlockPartMsg, false, "Enable peer to broadcast HasBlockPartMessage") cmd.Flags().Bool(eth.FlagEnableMultiCall, false, "Enable node to support the eth_multiCall RPC API") @@ -98,9 +105,9 @@ func RegisterAppFlag(cmd *cobra.Command) { cmd.Flags().String(tmdb.FlagGoLeveldbOpts, "", "Options of goleveldb. (cache_size=128MB,handlers_num=1024)") cmd.Flags().String(tmdb.FlagRocksdbOpts, "", "Options of rocksdb. (block_size=4KB,block_cache=1GB,statistics=true,allow_mmap_reads=true,max_open_files=-1)") - cmd.Flags().String(types.FlagNodeMode, "", "Node mode (rpc|validator|archive) is used to manage flags") + cmd.Flags().String(types.FlagNodeMode, "", "Node mode (rpc|val|archive) is used to manage flags") - cmd.Flags().Bool(consensus.EnablePrerunTx, false, "enable proactively runtx mode, default close") + cmd.Flags().Bool(consensus.EnablePrerunTx, true, "enable proactively runtx mode, default close") cmd.Flags().String(automation.ConsensusRole, "", "consensus role") cmd.Flags().String(automation.ConsensusTestcase, "", "consensus test case file") @@ -124,5 +131,6 @@ func RegisterAppFlag(cmd *cobra.Command) { cmd.Flags().String(rpc.FlagWebsocket, "8546", "websocket port to listen to") cmd.Flags().Int(backend.FlagLogsLimit, 0, "Maximum number of logs returned when calling eth_getLogs") cmd.Flags().Int(backend.FlagLogsTimeout, 60, "Maximum query duration when calling eth_getLogs") + cmd.Flags().Int(websockets.FlagSubscribeLimit, 15, "Maximum subscription on a websocket connection") wasm.AddModuleInitFlags(cmd) } diff --git a/cmd/exchaincli/main.go b/cmd/exchaincli/main.go index 5c05d6df7f..d6c3723b03 100644 --- a/cmd/exchaincli/main.go +++ b/cmd/exchaincli/main.go @@ -18,6 +18,7 @@ import ( clientrpc "github.com/okex/exchain/libs/cosmos-sdk/client/rpc" sdkcodec "github.com/okex/exchain/libs/cosmos-sdk/codec" "github.com/okex/exchain/libs/cosmos-sdk/crypto/keys" + "github.com/okex/exchain/libs/cosmos-sdk/server" sdk "github.com/okex/exchain/libs/cosmos-sdk/types" "github.com/okex/exchain/libs/cosmos-sdk/version" "github.com/okex/exchain/libs/cosmos-sdk/x/auth" @@ -62,7 +63,7 @@ func main() { } // Add --chain-id to persistent flags and mark it required - rootCmd.PersistentFlags().String(flags.FlagChainID, "", "Chain ID of tendermint node") + rootCmd.PersistentFlags().String(flags.FlagChainID, server.ChainID, "Chain ID of tendermint node") rootCmd.PersistentPreRunE = func(_ *cobra.Command, _ []string) error { utils.SetParseAppTx(wrapDecoder(parseMsgEthereumTx, parseProtobufTx)) return client.InitConfig(rootCmd) diff --git a/cmd/exchaind/main.go b/cmd/exchaind/main.go index e0ceb8b709..e75d73a529 100644 --- a/cmd/exchaind/main.go +++ b/cmd/exchaind/main.go @@ -4,6 +4,8 @@ import ( "encoding/json" "fmt" "io" + "os" + "strings" "github.com/okex/exchain/app/logevents" "github.com/okex/exchain/cmd/exchaind/fss" @@ -42,6 +44,7 @@ import ( ) const flagInvCheckPeriod = "inv-check-period" +const OkcEnvPrefix = "OKEXCHAIN" var invCheckPeriod uint @@ -106,17 +109,34 @@ func main() { registerRoutes, client.RegisterAppFlag, app.PreRun, subFunc) // prepare and add flags - executor := cli.PrepareBaseCmd(rootCmd, "OKEXCHAIN", app.DefaultNodeHome) + executor := cli.PrepareBaseCmd(rootCmd, OkcEnvPrefix, app.DefaultNodeHome) rootCmd.PersistentFlags().UintVar(&invCheckPeriod, flagInvCheckPeriod, 0, "Assert registered invariants every N blocks") rootCmd.PersistentFlags().Bool(server.FlagGops, false, "Enable gops metrics collection") + initEnv() err := executor.Execute() if err != nil { panic(err) } } +func initEnv() { + checkSetEnv("mempool_size", "200000") + checkSetEnv("mempool_cache_size", "300000") + checkSetEnv("mempool_force_recheck_gap", "2000") + checkSetEnv("mempool_recheck", "false") + checkSetEnv("consensus_timeout_commit", fmt.Sprintf("%dms", tmtypes.TimeoutCommit)) +} + +func checkSetEnv(envName string, value string) { + realEnvName := OkcEnvPrefix + "_" + strings.ToUpper(envName) + _, ok := os.LookupEnv(realEnvName) + if !ok { + _ = os.Setenv(realEnvName, value) + } +} + func closeApp(iApp abci.Application) { fmt.Println("Close App") app := iApp.(*app.OKExChainApp) diff --git a/cmd/exchaind/repair_data.go b/cmd/exchaind/repair_data.go index bde18883b3..59615a3a3b 100644 --- a/cmd/exchaind/repair_data.go +++ b/cmd/exchaind/repair_data.go @@ -1,7 +1,10 @@ package main import ( + "fmt" "log" + "net/http" + _ "net/http/pprof" "github.com/okex/exchain/app" "github.com/okex/exchain/app/utils/appstatus" @@ -27,6 +30,13 @@ func repairStateCmd(ctx *server.Context) *cobra.Command { Run: func(cmd *cobra.Command, args []string) { log.Println("--------- repair data start ---------") + go func() { + pprofAddress := viper.GetString(pprofAddrFlag) + err := http.ListenAndServe(pprofAddress, nil) + if err != nil { + fmt.Println(err) + } + }() app.RepairState(ctx, false) log.Println("--------- repair data success ---------") }, @@ -37,14 +47,13 @@ func repairStateCmd(ctx *server.Context) *cobra.Command { cmd.Flags().Bool(trace.FlagEnableAnalyzer, false, "Enable auto open log analyzer") cmd.Flags().BoolVar(&types2.TrieUseCompositeKey, types2.FlagTrieUseCompositeKey, true, "Use composite key to store contract state") cmd.Flags().Int(sm.FlagDeliverTxsExecMode, 0, "execution mode for deliver txs, (0:serial[default], 1:deprecated, 2:parallel)") - cmd.Flags().Bool(tmiavl.FlagIavlEnableFastStorage, false, "Enable fast storage") cmd.Flags().String(sdk.FlagDBBackend, tmtypes.DBBackend, "Database backend: goleveldb | rocksdb") + cmd.Flags().Bool(sdk.FlagMultiCache, true, "Enable multi cache") + cmd.Flags().StringP(pprofAddrFlag, "p", "0.0.0.0:6060", "Address and port of pprof HTTP server listening") return cmd } func setExternalPackageValue() { - enableFastStorage := viper.GetBool(tmiavl.FlagIavlEnableFastStorage) || - appstatus.IsFastStorageStrategy() - tmiavl.SetEnableFastStorage(enableFastStorage) + tmiavl.SetEnableFastStorage(appstatus.IsFastStorageStrategy()) } diff --git a/cmd/exchaind/replay.go b/cmd/exchaind/replay.go index 1c0c59a831..70fa536a74 100644 --- a/cmd/exchaind/replay.go +++ b/cmd/exchaind/replay.go @@ -11,15 +11,20 @@ import ( "runtime/pprof" "time" + evmtypes "github.com/okex/exchain/x/evm/types" + "github.com/okex/exchain/x/evm/watcher" + "github.com/gogo/protobuf/jsonpb" "github.com/okex/exchain/app/config" okexchain "github.com/okex/exchain/app/types" + "github.com/okex/exchain/app/utils/appstatus" "github.com/okex/exchain/app/utils/sanity" "github.com/okex/exchain/libs/cosmos-sdk/baseapp" "github.com/okex/exchain/libs/cosmos-sdk/client/lcd" "github.com/okex/exchain/libs/cosmos-sdk/codec" "github.com/okex/exchain/libs/cosmos-sdk/server" sdk "github.com/okex/exchain/libs/cosmos-sdk/types" + "github.com/okex/exchain/libs/iavl" "github.com/okex/exchain/libs/system/trace" abci "github.com/okex/exchain/libs/tendermint/abci/types" tcmd "github.com/okex/exchain/libs/tendermint/cmd/tendermint/commands" @@ -66,6 +71,7 @@ func replayCmd(ctx *server.Context, registerAppFlagFn func(cmd *cobra.Command), fmt.Println(err) return err } + iavl.SetEnableFastStorage(appstatus.IsFastStorageStrategy()) server.SetExternalPackageValue(cmd) types.InitSignatureCache() return nil @@ -181,6 +187,9 @@ func registerReplayFlags(cmd *cobra.Command) *cobra.Command { cmd.Flags().Bool(saveBlock, false, "save block when replay") cmd.Flags().Bool(FlagEnableRest, false, "start rest service when replay") + viper.SetDefault(watcher.FlagFastQuery, false) + viper.SetDefault(evmtypes.FlagEnableBloomFilter, false) + return cmd } diff --git a/dev/keplr-test.sh b/dev/keplr-test.sh index 09240607da..74278afdf4 100755 --- a/dev/keplr-test.sh +++ b/dev/keplr-test.sh @@ -27,7 +27,7 @@ run() { --local-rpc-port 26657 \ --log_level $LOG_LEVEL \ --log_file json \ - --enable-dynamic-gp=false \ + --dynamic-gp-mode=2 \ --consensus.timeout_commit 2000ms \ --enable-preruntx=false \ --iavl-enable-async-commit \ diff --git a/dev/local-perf.sh b/dev/local-perf.sh index 45a508df86..89a9d22b99 100755 --- a/dev/local-perf.sh +++ b/dev/local-perf.sh @@ -27,7 +27,7 @@ run() { --local-rpc-port 26657 \ --log_level $LOG_LEVEL \ --log_file json \ - --enable-dynamic-gp=false \ + --dynamic-gp-mode=2 \ --consensus.timeout_commit 100ms \ --disable-abci-query-mutex=true \ --mempool.max_tx_num_per_block=10000 \ diff --git a/dev/start.sh b/dev/start.sh index c7cd89fbb1..5a380846ce 100755 --- a/dev/start.sh +++ b/dev/start.sh @@ -28,7 +28,7 @@ run() { --local-rpc-port 26657 \ --log_level $LOG_LEVEL \ --log_file json \ - --enable-dynamic-gp=false \ + --dynamic-gp-mode=2 \ --consensus.timeout_commit 2000ms \ --enable-preruntx=1 \ --iavl-enable-async-commit \ diff --git a/dev/testnet/testnet.sh b/dev/testnet/testnet.sh index 5e847669cb..ac300d3dbd 100755 --- a/dev/testnet/testnet.sh +++ b/dev/testnet/testnet.sh @@ -146,7 +146,7 @@ run() { --home cache/node${index}/exchaind \ --p2p.seed_mode=$seed_mode \ --p2p.allow_duplicate_ip \ - --enable-dynamic-gp=false \ + --dynamic-gp-mode=2 \ --enable-wtx=${WRAPPEDTX} \ --mempool.node_key_whitelist ${WHITE_LIST} \ --p2p.pex=false \ diff --git a/dev/wasm-test.sh b/dev/wasm-test.sh index cf5d717dae..fbbdaf86e3 100755 --- a/dev/wasm-test.sh +++ b/dev/wasm-test.sh @@ -27,7 +27,7 @@ run() { --local-rpc-port 26657 \ --log_level $LOG_LEVEL \ --log_file json \ - --enable-dynamic-gp=false \ + --dynamic-gp-mode=2 \ --consensus.timeout_commit 500ms \ --enable-preruntx=false \ --fast-query=true \ diff --git a/go.mod b/go.mod index faeea651da..01bb243e62 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/enigmampc/btcutil v1.0.3-0.20200723161021-e2fb6adb2a25 github.com/ethereum/go-ethereum v1.10.8 github.com/fortytw2/leaktest v1.3.0 + github.com/fsnotify/fsnotify v1.6.0 github.com/go-errors/errors v1.0.1 github.com/go-kit/kit v0.10.0 github.com/go-logfmt/logfmt v0.5.0 @@ -109,7 +110,6 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/edsrzf/mmap-go v1.0.0 // indirect - github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect github.com/go-ole/go-ole v1.2.6-0.20210915003542-8b1f7f90f6b1 // indirect github.com/go-sql-driver/mysql v1.6.0 // indirect @@ -157,7 +157,7 @@ require ( go.uber.org/multierr v1.5.0 // indirect go.uber.org/zap v1.15.0 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 // indirect + golang.org/x/sys v0.0.0-20220908164124-27713097b956 // indirect golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect golang.org/x/text v0.3.6 // indirect gopkg.in/ini.v1 v1.51.0 // indirect diff --git a/go.sum b/go.sum index f46bf510ad..7c9e9d4cde 100644 --- a/go.sum +++ b/go.sum @@ -258,8 +258,9 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/garyburd/redigo v1.6.0/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff h1:tY80oXqGNY4FhTFhk+o9oFHGINQ/+vhlm8HFzi6znCI= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww= @@ -1028,8 +1029,8 @@ golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210902050250-f475640dd07b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 h1:OH54vjqzRWmbJ62fjuhxy7AxFFgoHN0/DPc/UrL8cAs= -golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956 h1:XeJjHH1KiLpKGb6lvMiksZ9l0fVUh+AmGcm0nOMEBOY= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/libs/cosmos-sdk/baseapp/baseapp_parallel.go b/libs/cosmos-sdk/baseapp/baseapp_parallel.go index c6476b37de..1f6408b877 100644 --- a/libs/cosmos-sdk/baseapp/baseapp_parallel.go +++ b/libs/cosmos-sdk/baseapp/baseapp_parallel.go @@ -279,6 +279,7 @@ func (app *BaseApp) runTxs() []*abci.ResponseDeliverTx { pm.blockGasMeterMu.Unlock() // merge tx pm.SetCurrentIndex(txIndex, res) + pm.finalResult[txIndex] = res currentGas += uint64(res.resp.GasUsed) txIndex++ @@ -335,7 +336,7 @@ func (app *BaseApp) endParallelTxs() [][]byte { txs := make([]sdk.Tx, app.parallelTxManage.txSize) app.FeeSplitCollector = make([]*sdk.FeeSplitInfo, 0) for index := 0; index < app.parallelTxManage.txSize; index++ { - txRes := app.parallelTxManage.txResultCollector.getTxResult(index) + txRes := app.parallelTxManage.finalResult[index] logIndex[index] = txRes.paraMsg.LogIndex errs[index] = txRes.paraMsg.AnteErr hasEnterEvmTx[index] = txRes.paraMsg.HasRunEvmTx @@ -617,6 +618,7 @@ type parallelTxManager struct { extraTxsInfo []*extraDataForTx txResultCollector *txResultCollector + finalResult []*executeResult groupList map[int][]int nextTxInGroup map[int]int @@ -745,6 +747,7 @@ func (f *parallelTxManager) init() { txSize := f.txSize f.txResultCollector.init(txSize) + f.finalResult = make([]*executeResult, txSize) txsInfoCap := cap(f.extraTxsInfo) if f.extraTxsInfo == nil || txsInfoCap < txSize { diff --git a/libs/cosmos-sdk/server/pruning.go b/libs/cosmos-sdk/server/pruning.go index 4e59ce47d1..6904ac5615 100644 --- a/libs/cosmos-sdk/server/pruning.go +++ b/libs/cosmos-sdk/server/pruning.go @@ -2,14 +2,16 @@ package server import ( "fmt" - "github.com/okex/exchain/libs/cosmos-sdk/store/mpt" "strings" + "github.com/okex/exchain/libs/cosmos-sdk/store/mpt" + "github.com/spf13/viper" "github.com/okex/exchain/libs/cosmos-sdk/store" "github.com/okex/exchain/libs/cosmos-sdk/store/types" tmiavl "github.com/okex/exchain/libs/iavl" + iavlcfg "github.com/okex/exchain/libs/iavl/config" ) // GetPruningOptionsFromFlags parses command flags and returns the correct @@ -23,7 +25,7 @@ func GetPruningOptionsFromFlags() (types.PruningOptions, error) { if strategy == types.PruningOptionNothing { tmiavl.EnablePruningHistoryState = false tmiavl.CommitIntervalHeight = 1 - tmiavl.CommitGapHeight = 1 + iavlcfg.DynamicConfig.SetCommitGapHeight(1) mpt.TrieDirtyDisabled = true } return types.NewPruningOptionsFromString(strategy), nil diff --git a/libs/cosmos-sdk/server/start.go b/libs/cosmos-sdk/server/start.go index 2e85b9a016..8851de1881 100644 --- a/libs/cosmos-sdk/server/start.go +++ b/libs/cosmos-sdk/server/start.go @@ -6,6 +6,8 @@ import ( "os" "runtime/pprof" + "github.com/okex/exchain/libs/tendermint/consensus" + "github.com/okex/exchain/libs/cosmos-sdk/store/mpt" "github.com/okex/exchain/libs/tendermint/rpc/client" @@ -319,8 +321,9 @@ func SetExternalPackageValue(cmd *cobra.Command) { tmiavl.HeightOrphansCacheSize = viper.GetInt(tmiavl.FlagIavlHeightOrphansCacheSize) tmiavl.MaxCommittedHeightNum = viper.GetInt(tmiavl.FlagIavlMaxCommittedHeightNum) tmiavl.EnableAsyncCommit = viper.GetBool(tmiavl.FlagIavlEnableAsyncCommit) - tmiavl.SetEnableFastStorage(viper.GetBool(tmiavl.FlagIavlEnableFastStorage)) - tmiavl.SetFastNodeCacheSize(viper.GetInt(tmiavl.FlagIavlFastStorageCacheSize)) + if viper.GetBool(tmiavl.FlagIavlDiscardFastStorage) { + tmiavl.SetEnableFastStorage(false) + } system.EnableGid = viper.GetBool(system.FlagEnableGid) state.ApplyBlockPprofTime = viper.GetInt(state.FlagApplyBlockPprofTime) @@ -337,8 +340,9 @@ func SetExternalPackageValue(cmd *cobra.Command) { tmtypes.BlockCompressFlag = viper.GetInt(tmtypes.FlagBlockCompressFlag) tmtypes.BlockCompressThreshold = viper.GetInt(tmtypes.FlagBlockCompressThreshold) - tmiavl.CommitGapHeight = viper.GetInt64(FlagCommitGapHeight) mpt.TrieCommitGap = viper.GetInt64(FlagCommitGapHeight) bcv0.MaxIntervalForFastSync = viper.GetInt64(FlagFastSyncGap) + + consensus.SetActiveVC(viper.GetBool(FlagActiveViewChange)) } diff --git a/libs/cosmos-sdk/server/start_okchain.go b/libs/cosmos-sdk/server/start_okchain.go index 18dd6353fb..b2669d6d98 100644 --- a/libs/cosmos-sdk/server/start_okchain.go +++ b/libs/cosmos-sdk/server/start_okchain.go @@ -9,6 +9,9 @@ import ( "reflect" "strconv" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/okex/exchain/libs/cosmos-sdk/baseapp" "github.com/okex/exchain/libs/cosmos-sdk/client/flags" "github.com/okex/exchain/libs/cosmos-sdk/store/flatkv" @@ -25,8 +28,6 @@ import ( "github.com/okex/exchain/libs/tendermint/state" tmtypes "github.com/okex/exchain/libs/tendermint/types" evmtypes "github.com/okex/exchain/x/evm/types" - "github.com/spf13/cobra" - "github.com/spf13/viper" ) // exchain full-node start flags @@ -42,6 +43,10 @@ const ( FlagWsSubChannelLength = "ws.sub_channel_length" ) +var ( + ChainID = "exchain-66" +) + //module hook type fnHookstartInProcess func(ctx *Context) error @@ -56,7 +61,7 @@ func InstallHookEx(flag string, hooker fnHookstartInProcess) { gSrvHookTable.hookTable[flag] = hooker } -//call hooker function +// call hooker function func callHooker(flag string, args ...interface{}) error { params := make([]interface{}, 0) switch flag { @@ -166,7 +171,7 @@ func RegisterServerFlags(cmd *cobra.Command) *cobra.Command { cmd.Flags().Bool(FlagInterBlockCache, true, "Enable inter-block caching") cmd.Flags().String(flagCPUProfile, "", "Enable CPU profiling and write to the provided file") - cmd.Flags().String(FlagPruning, storetypes.PruningOptionDefault, "Pruning strategy (default|nothing|everything|custom)") + cmd.Flags().String(FlagPruning, storetypes.PruningOptionEverything, "Pruning strategy (default|nothing|everything|custom)") cmd.Flags().Uint64(FlagPruningKeepRecent, 0, "Number of recent heights to keep on disk (ignored if pruning is not 'custom')") cmd.Flags().Uint64(FlagPruningKeepEvery, 0, "Offset heights to keep on disk after 'keep-every' (ignored if pruning is not 'custom')") cmd.Flags().Uint64(FlagPruningInterval, 0, "Height interval at which pruned heights are removed from disk (ignored if pruning is not 'custom')") @@ -194,17 +199,20 @@ func RegisterServerFlags(cmd *cobra.Command) *cobra.Command { cmd.Flags().Int(tmtypes.FlagBlockCompressThreshold, 1024000, "Compress only if block size exceeds the threshold.") cmd.Flags().Bool(FlagActiveViewChange, false, "Enable active view change") - cmd.Flags().Int(iavl.FlagIavlCacheSize, 1000000, "Max size of iavl cache") + cmd.Flags().Int(iavl.FlagIavlCacheSize, 10000000, "Max size of iavl cache") cmd.Flags().Float64(tmiavl.FlagIavlCacheInitRatio, 1, "iavl cache init ratio, 0.0~1.0, default is 0, iavl cache map would be init with (cache size * init ratio)") cmd.Flags().StringToInt(tmiavl.FlagOutputModules, map[string]int{"evm": 1, "acc": 1}, "decide which module in iavl to be printed") cmd.Flags().Int64(tmiavl.FlagIavlCommitIntervalHeight, 100, "Max interval to commit node cache into leveldb") cmd.Flags().Int64(tmiavl.FlagIavlMinCommitItemCount, 1000000, "Min nodes num to triggle node cache commit") cmd.Flags().Int(tmiavl.FlagIavlHeightOrphansCacheSize, 8, "Max orphan version to cache in memory") cmd.Flags().Int(tmiavl.FlagIavlMaxCommittedHeightNum, 30, "Max committed version to cache in memory") - cmd.Flags().Bool(tmiavl.FlagIavlEnableAsyncCommit, false, "Enable async commit") + cmd.Flags().Bool(tmiavl.FlagIavlEnableAsyncCommit, true, "Enable async commit") + cmd.Flags().Bool(tmiavl.FlagIavlDiscardFastStorage, false, "Discard fast storage") + cmd.Flags().MarkHidden(tmiavl.FlagIavlDiscardFastStorage) cmd.Flags().Bool(tmiavl.FlagIavlEnableFastStorage, false, "Enable fast storage") - cmd.Flags().Int(tmiavl.FlagIavlFastStorageCacheSize, 100000, "Max size of iavl fast storage cache") - cmd.Flags().Bool(abci.FlagDisableABCIQueryMutex, false, "Disable local client query mutex for better concurrency") + cmd.Flags().MarkHidden(tmiavl.FlagIavlEnableFastStorage) + cmd.Flags().Int(tmiavl.FlagIavlFastStorageCacheSize, tmiavl.DefaultIavlFastStorageCacheSize, "Max size of iavl fast storage cache") + cmd.Flags().Bool(abci.FlagDisableABCIQueryMutex, true, "Disable local client query mutex for better concurrency") cmd.Flags().Bool(abci.FlagDisableCheckTx, false, "Disable checkTx for test") cmd.Flags().Bool(sdkstoretypes.FlagLoadVersionAsync, false, "Enable async for each kvstore to load version") cmd.Flags().MarkHidden(abci.FlagDisableCheckTx) @@ -217,7 +225,8 @@ func RegisterServerFlags(cmd *cobra.Command) *cobra.Command { cmd.Flags().Float64Var(&baseapp.GasUsedFactor, baseapp.FlagGasUsedFactor, 0.4, "factor to calculate history gas used") - cmd.Flags().Bool(sdk.FlagMultiCache, true, "Enable multi cache") + cmd.Flags().Bool(sdk.FlagMultiCache, false, "Enable multi cache") + cmd.Flags().MarkHidden(sdk.FlagMultiCache) cmd.Flags().Int(sdk.MaxAccInMultiCache, 0, "max acc in multi cache") cmd.Flags().Int(sdk.MaxStorageInMultiCache, 0, "max storage in multi cache") cmd.Flags().Bool(flatkv.FlagEnable, false, "Enable flat kv storage for read performance") @@ -237,6 +246,7 @@ func RegisterServerFlags(cmd *cobra.Command) *cobra.Command { viper.BindPFlag(FlagGoroutineNum, cmd.Flags().Lookup(FlagGoroutineNum)) cmd.Flags().Int(state.FlagDeliverTxsExecMode, 0, "Execution mode for deliver txs, (0:serial[default], 1:deprecated, 2:parallel)") + cmd.Flags().Bool(state.FlagEnableConcurrency, false, "Enable concurrency for deliver txs") cmd.Flags().String(FlagListenAddr, "tcp://0.0.0.0:26659", "EVM RPC and cosmos-sdk REST API listen address.") cmd.Flags().String(FlagUlockKey, "", "Select the keys to unlock on the RPC server") @@ -247,7 +257,7 @@ func RegisterServerFlags(cmd *cobra.Command) *cobra.Command { cmd.Flags().Int(FlagMaxOpenConnections, 1000, "The number of maximum open connections of rest-server") cmd.Flags().Int(FlagWsMaxConnections, 20000, "the max capacity number of websocket client connections") cmd.Flags().Int(FlagWsSubChannelLength, 100, "the length of subscription channel") - cmd.Flags().String(flags.FlagChainID, "", "Chain ID of tendermint node for web3") + cmd.Flags().String(flags.FlagChainID, ChainID, "Chain ID of tendermint node for web3") cmd.Flags().StringP(flags.FlagBroadcastMode, "b", flags.BroadcastSync, "Transaction broadcasting mode (sync|async|block) for web3") cmd.Flags().UintVar(&mpttypes.TrieRocksdbBatchSize, mpttypes.FlagTrieRocksdbBatchSize, 10, "Concurrent rocksdb batch size for mpt") @@ -284,7 +294,7 @@ set --node-mode=rpc to manage the following flags: set --node-mode=validator to manage the following flags: --disable-checktx-mutex=true --disable-query-mutex=true - --enable-dynamic-gp=false + --dynamic-gp-mode=2 --iavl-enable-async-commit=true --iavl-cache-size=10000000 --pruning=everything diff --git a/libs/cosmos-sdk/server/util.go b/libs/cosmos-sdk/server/util.go index c2cf9e0d02..08eb738d4a 100644 --- a/libs/cosmos-sdk/server/util.go +++ b/libs/cosmos-sdk/server/util.go @@ -11,9 +11,6 @@ import ( "time" "github.com/gogo/protobuf/jsonpb" - "github.com/spf13/cobra" - "github.com/spf13/viper" - "github.com/google/gops/agent" "github.com/okex/exchain/libs/cosmos-sdk/client/flags" "github.com/okex/exchain/libs/cosmos-sdk/client/lcd" @@ -25,6 +22,9 @@ import ( "github.com/okex/exchain/libs/tendermint/libs/cli" tmflags "github.com/okex/exchain/libs/tendermint/libs/cli/flags" "github.com/okex/exchain/libs/tendermint/libs/log" + "github.com/okex/exchain/libs/tendermint/state" + "github.com/spf13/cobra" + "github.com/spf13/viper" ) const FlagGops = "gops" @@ -60,6 +60,11 @@ func PersistentPreRunEFn(context *Context) func(*cobra.Command, []string) error if err != nil { return err } + if !viper.IsSet(state.FlagDeliverTxsExecMode) { + if viper.GetBool(state.FlagEnableConcurrency) { + viper.Set(state.FlagDeliverTxsExecMode, state.DeliverTxsExecModeParallel) + } + } // okchain output := os.Stdout if !config.LogStdout { diff --git a/libs/cosmos-sdk/store/rootmulti/rootmulti_store.go b/libs/cosmos-sdk/store/rootmulti/rootmulti_store.go index 51e39c23e0..14089a6e91 100644 --- a/libs/cosmos-sdk/store/rootmulti/rootmulti_store.go +++ b/libs/cosmos-sdk/store/rootmulti/rootmulti_store.go @@ -26,6 +26,7 @@ import ( sdk "github.com/okex/exchain/libs/cosmos-sdk/types" sdkerrors "github.com/okex/exchain/libs/cosmos-sdk/types/errors" iavltree "github.com/okex/exchain/libs/iavl" + "github.com/okex/exchain/libs/iavl/config" abci "github.com/okex/exchain/libs/tendermint/abci/types" //"github.com/okex/exchain/libs/tendermint/crypto/merkle" @@ -1180,6 +1181,11 @@ func commitStores(version int64, storeMap map[types.StoreKey]types.CommitKVStore inputDeltaMap iavltree.TreeDeltaMap, filters []types.StoreFilter) (commitInfo, iavltree.TreeDeltaMap) { var storeInfos []storeInfo outputDeltaMap := iavltree.TreeDeltaMap{} + + // updata commit gap height + if iavltree.EnableAsyncCommit { + iavltree.UpdateCommitGapHeight(config.DynamicConfig.GetCommitGapHeight()) + } for key, store := range storeMap { sName := key.Name() if evmAccStoreFilter(sName, version) { diff --git a/libs/cosmos-sdk/types/cache.go b/libs/cosmos-sdk/types/cache.go index c6960704fc..f6861079a5 100644 --- a/libs/cosmos-sdk/types/cache.go +++ b/libs/cosmos-sdk/types/cache.go @@ -69,7 +69,7 @@ type Cache struct { } func initCacheParam() { - UseCache = viper.GetBool(FlagMultiCache) + UseCache = false if data := viper.GetInt(MaxAccInMultiCache); data != 0 { maxAccInMap = data diff --git a/libs/iavl/config/dynamic_config_okchain.go b/libs/iavl/config/dynamic_config_okchain.go index 5525d7df25..26737a7136 100644 --- a/libs/iavl/config/dynamic_config_okchain.go +++ b/libs/iavl/config/dynamic_config_okchain.go @@ -1,18 +1,38 @@ package config +const ( + DefaultCommitGapHeight = 100 +) + type IDynamicConfig interface { GetIavlCacheSize() int + GetIavlFSCacheSize() int64 + GetCommitGapHeight() int64 + SetCommitGapHeight(gap int64) } -var DynamicConfig IDynamicConfig = MockDynamicConfig{} +var DynamicConfig IDynamicConfig = MockDynamicConfig{commitGapHeight: DefaultCommitGapHeight} func SetDynamicConfig(c IDynamicConfig) { DynamicConfig = c } type MockDynamicConfig struct { + commitGapHeight int64 } func (d MockDynamicConfig) GetIavlCacheSize() int { return 10000 } + +func (d MockDynamicConfig) GetIavlFSCacheSize() int64 { + return 10000 +} + +func (d MockDynamicConfig) GetCommitGapHeight() int64 { + return d.commitGapHeight +} + +func (d MockDynamicConfig) SetCommitGapHeight(gap int64) { + d.commitGapHeight = gap +} diff --git a/libs/iavl/mutable_tree_oec.go b/libs/iavl/mutable_tree_oec.go index 4607a64e76..0217c607d7 100644 --- a/libs/iavl/mutable_tree_oec.go +++ b/libs/iavl/mutable_tree_oec.go @@ -6,19 +6,23 @@ import ( "sort" "sync" + "github.com/okex/exchain/libs/iavl/config" + "github.com/okex/exchain/libs/system/trace" dbm "github.com/okex/exchain/libs/tm-db" ) const ( - minHistoryStateNum = 30 - FlagIavlCommitIntervalHeight = "iavl-commit-interval-height" - FlagIavlMinCommitItemCount = "iavl-min-commit-item-count" - FlagIavlHeightOrphansCacheSize = "iavl-height-orphans-cache-size" - FlagIavlMaxCommittedHeightNum = "iavl-max-committed-height-num" - FlagIavlEnableAsyncCommit = "iavl-enable-async-commit" - FlagIavlEnableFastStorage = "iavl-enable-fast-storage" - FlagIavlFastStorageCacheSize = "iavl-fast-storage-cache-size" + minHistoryStateNum = 30 + FlagIavlCommitIntervalHeight = "iavl-commit-interval-height" + FlagIavlMinCommitItemCount = "iavl-min-commit-item-count" + FlagIavlHeightOrphansCacheSize = "iavl-height-orphans-cache-size" + FlagIavlMaxCommittedHeightNum = "iavl-max-committed-height-num" + FlagIavlEnableAsyncCommit = "iavl-enable-async-commit" + FlagIavlFastStorageCacheSize = "iavl-fast-storage-cache-size" + FlagIavlEnableFastStorage = "iavl-enable-fast-storage" + FlagIavlDiscardFastStorage = "discard-fast-storage" + DefaultIavlFastStorageCacheSize = 10000000 ) var ( @@ -32,9 +36,8 @@ var ( MaxCommittedHeightNum = minHistoryStateNum EnableAsyncCommit = false EnablePruningHistoryState = true - CommitGapHeight int64 = 100 - enableFastStorage = false - fastNodeCacheSize = 100000 + CommitGapHeight int64 = config.DefaultCommitGapHeight + enableFastStorage = true ) type commitEvent struct { @@ -62,14 +65,13 @@ func GetEnableFastStorage() bool { return enableFastStorage } -// SetFastNodeCacheSize set fast node cache size -func SetFastNodeCacheSize(size int) { - fastNodeCacheSize = size -} - // GetFastNodeCacheSize get fast node cache size func GetFastNodeCacheSize() int { - return fastNodeCacheSize + return int(config.DynamicConfig.GetIavlFSCacheSize()) +} + +func UpdateCommitGapHeight(gap int64) { + CommitGapHeight = gap } func (tree *MutableTree) SaveVersionAsync(version int64, useDeltas bool) ([]byte, int64, error) { diff --git a/libs/iavl/nodedb.go b/libs/iavl/nodedb.go index 796059604a..c1f901152a 100644 --- a/libs/iavl/nodedb.go +++ b/libs/iavl/nodedb.go @@ -130,8 +130,8 @@ func (ndb *nodeDB) GetFastNode(key []byte) (*FastNode, error) { return nil, errors.New("storage version is not fast") } - ndb.mtx.Lock() - defer ndb.mtx.Unlock() + ndb.mtx.RLock() + defer ndb.mtx.RUnlock() if len(key) == 0 { return nil, fmt.Errorf("nodeDB.GetFastNode() requires key, len(key) equals 0") diff --git a/libs/system/trace/schema.go b/libs/system/trace/schema.go index add536aadb..9c3054abcd 100644 --- a/libs/system/trace/schema.go +++ b/libs/system/trace/schema.go @@ -30,14 +30,15 @@ const ( HandlerDefer = "handler_defer" ) - const ( GasUsed = "GasUsed" + SimGasUsed = "SimGasUsed" Produce = "Produce" RunTx = "RunTx" LastRun = "lastRun" Height = "Height" Tx = "Tx" + SimTx = "SimTx" BlockSize = "BlockSize" Elapsed = "Elapsed" CommitRound = "CommitRound" @@ -58,9 +59,9 @@ const ( Delta = "Delta" InvalidTxs = "InvalidTxs" - Abci = "abci" + Abci = "abci" //SaveResp = "saveResp" - Persist = "persist" + Persist = "persist" //MempoolUpdate = "mpUpdate" //SaveState = "saveState" ApplyBlock = "ApplyBlock" @@ -78,6 +79,8 @@ const ( IavlRuntime = "IavlRuntime" BlockPartsP2P = "BlockPartsP2P" + + Workload = "Workload" ) const ( diff --git a/libs/system/trace/trace.go b/libs/system/trace/trace.go index 060cc2ccaf..77eea8657c 100644 --- a/libs/system/trace/trace.go +++ b/libs/system/trace/trace.go @@ -33,8 +33,10 @@ type Tracer struct { intervals []time.Duration elapsedTime time.Duration - pinMap map[string]time.Duration - enableSummary bool + pinMap map[string]time.Duration + enableSummary bool + + wls *WorkloadStatistic } func NewTracer(name string) *Tracer { @@ -50,6 +52,10 @@ func (t *Tracer) EnableSummary() { t.enableSummary = true } +func (t *Tracer) SetWorkloadStatistic(wls *WorkloadStatistic) { + t.wls = wls +} + func (t *Tracer) Pin(format string, args ...interface{}) { t.pinByFormat(fmt.Sprintf(format, args...)) } @@ -74,6 +80,10 @@ func (t *Tracer) pinByFormat(tag string) { if t.enableSummary { insertElapse(t.lastPin, duration.Milliseconds()) } + + if t.wls != nil { + t.wls.Add(t.lastPin, now, duration) + } } t.lastPinStartTime = now t.lastPin = tag diff --git a/libs/system/trace/workload_statistic.go b/libs/system/trace/workload_statistic.go new file mode 100644 index 0000000000..239c8b5721 --- /dev/null +++ b/libs/system/trace/workload_statistic.go @@ -0,0 +1,175 @@ +package trace + +import ( + "fmt" + "strings" + "sync/atomic" + "time" +) + +var ( + startupTime = time.Now() + + applyBlockWorkloadStatistic = newWorkloadStatistic( + []time.Duration{time.Hour, 2 * time.Hour, 4 * time.Hour, 8 * time.Hour}, []string{LastRun, Persist}) +) + +// TODO: think about a very long work which longer than a statistic period. + +// WorkloadStatistic accumulate workload for specific trace tags during some specific period. +// Everytime `Add` or `end` method be called, it record workload on corresponding `summaries` fields, +// and send this workload info to `shrinkLoop`, which will subtract this workload from `summaries` +// when the workload out of statistic period. To do that, `shrinkLoop` will record the workload and it's +// out-of-date timestamp; `shrinkLoop` also has a ticker promote current time once a second. +// If current time is larger or equal than recorded timestamp, it remove that workload and subtract +// it's value from `summaries`. +type WorkloadStatistic struct { + concernedTags map[string]struct{} + summaries []workloadSummary + + workCh chan singleWorkInfo +} + +type workloadSummary struct { + period time.Duration + workload int64 +} + +type singleWorkInfo struct { + duration int64 + endTime time.Time +} + +// GetApplyBlockWorkloadSttistic return a global `WorkloadStatistic` object. +// WARNING: if you call `WorkloadStatistic.Add` concurrently, the summary result will be incorrect. +func GetApplyBlockWorkloadSttistic() *WorkloadStatistic { + return applyBlockWorkloadStatistic +} + +func newWorkloadStatistic(periods []time.Duration, tags []string) *WorkloadStatistic { + concernedTags := toTagsMap(tags) + + workloads := make([]workloadSummary, 0, len(periods)) + for _, period := range periods { + workloads = append(workloads, workloadSummary{period, 0}) + } + + wls := &WorkloadStatistic{concernedTags: concernedTags, summaries: workloads, workCh: make(chan singleWorkInfo, 1000)} + go wls.shrinkLoop() + + return wls +} + +// Add accumulate workload to summary. +// WARNING: if you call `Add` concurrently, the summary result will be incorrect. +func (ws *WorkloadStatistic) Add(tag string, endTime time.Time, duration time.Duration) { + if _, ok := ws.concernedTags[tag]; !ok { + return + } + + for i := range ws.summaries { + atomic.AddInt64(&ws.summaries[i].workload, int64(duration)) + } + + ws.workCh <- singleWorkInfo{int64(duration), endTime} +} + +func (ws *WorkloadStatistic) Format() string { + var sumItem []string + for _, summary := range ws.summary() { + sumItem = append(sumItem, fmt.Sprintf("%.2f", float64(summary.workload)/float64(summary.period))) + } + + return strings.Join(sumItem, "|") +} + +type summaryInfo struct { + period time.Duration + workload time.Duration +} + +func (ws *WorkloadStatistic) summary() []summaryInfo { + startupDuration := time.Now().Sub(startupTime) + result := make([]summaryInfo, 0, len(ws.summaries)) + + for _, summary := range ws.summaries { + period := minDuration(startupDuration, summary.period) + result = append(result, summaryInfo{period, time.Duration(atomic.LoadInt64(&summary.workload))}) + } + return result +} + +func (ws *WorkloadStatistic) shrinkLoop() { + shrinkInfos := make([]map[int64]int64, 0, len(ws.summaries)) + for i := 0; i < len(ws.summaries); i++ { + shrinkInfos = append(shrinkInfos, make(map[int64]int64)) + } + + var latest int64 + ticker := time.NewTicker(time.Second) + + for { + select { + case singleWork := <-ws.workCh: + // `earliest` record the expired timestamp which is minimum. + // It's just for initialize `latest`. + earliest := int64(^uint64(0) >> 1) + + for sumIndex, summary := range ws.summaries { + expiredTS := singleWork.endTime.Add(summary.period).Unix() + if expiredTS < earliest { + earliest = expiredTS + } + + info := shrinkInfos[sumIndex] + // TODO: it makes recoding workload larger than actual value + // if a work begin before this period and end during this period + if _, ok := info[expiredTS]; !ok { + info[expiredTS] = singleWork.duration + } else { + info[expiredTS] += singleWork.duration + } + } + + if latest == 0 { + latest = earliest + } + case t := <-ticker.C: + current := t.Unix() + if latest == 0 { + latest = current + } + + // try to remove workload of every expired work. + // `latest` make sure even if ticker is not accurately, + // we can also remove the expired correctly. + for index, info := range shrinkInfos { + for i := latest; i < current+1; i++ { + w, ok := info[i] + if ok { + atomic.AddInt64(&ws.summaries[index].workload, -w) + delete(info, i) + } + } + } + + latest = current + } + } + +} + +func toTagsMap(keys []string) map[string]struct{} { + tags := make(map[string]struct{}) + for _, tag := range keys { + tags[tag] = struct{}{} + } + return tags +} + +func minDuration(d1 time.Duration, d2 time.Duration) time.Duration { + if d1 < d2 { + return d1 + } + return d2 +} diff --git a/libs/system/trace/workload_statistic_test.go b/libs/system/trace/workload_statistic_test.go new file mode 100644 index 0000000000..e8df02d135 --- /dev/null +++ b/libs/system/trace/workload_statistic_test.go @@ -0,0 +1,38 @@ +package trace + +import ( + "testing" + "time" +) + +func TestWorkload(t *testing.T) { + abciWorkload := time.Second + lastRunWorkload := 2 * time.Minute + persistWorkload := time.Second + expectWorkload := int64((lastRunWorkload + persistWorkload).Seconds()) + + trc := NewTracer(ApplyBlock) + trc.EnableSummary() + trc.SetWorkloadStatistic(GetApplyBlockWorkloadSttistic()) + + defer func() { + GetElapsedInfo().AddInfo(RunTx, trc.Format()) + + time.Sleep(time.Second) + summary := GetApplyBlockWorkloadSttistic().summary() + for _, sum := range summary { + workload := int64(sum.workload.Seconds()) + if workload != expectWorkload { + t.Errorf("period %d: expect workload %v but got %v\n", sum.period, expectWorkload, workload) + } + } + }() + + trc.Pin(Abci) + time.Sleep(abciWorkload) + GetApplyBlockWorkloadSttistic().Add(LastRun, time.Now(), lastRunWorkload) + + trc.Pin(Persist) + time.Sleep(persistWorkload) + +} diff --git a/libs/tendermint/cmd/tendermint/commands/run_node.go b/libs/tendermint/cmd/tendermint/commands/run_node.go index 0bf24166ae..eaf988d164 100644 --- a/libs/tendermint/cmd/tendermint/commands/run_node.go +++ b/libs/tendermint/cmd/tendermint/commands/run_node.go @@ -121,6 +121,16 @@ func AddNodeFlags(cmd *cobra.Command) { config.Mempool.MaxGasUsedPerBlock, "Maximum gas used of transactions in a block", ) + cmd.Flags().Bool( + "mempool.enable-pgu", + false, + "enable precise gas used", + ) + cmd.Flags().Float64( + "mempool.pgu-adjustment", + 1, + "adjustment for pgu, such as 0.9 or 1.1", + ) cmd.Flags().Bool( "mempool.sort_tx_by_gp", config.Mempool.SortTxByGp, @@ -174,7 +184,11 @@ func AddNodeFlags(cmd *cobra.Command) { false, "Calculate tx type count and time in function checkTx per block", ) - + cmd.Flags().String( + "tx_index.indexer", + config.TxIndex.Indexer, + "indexer to use for transactions, options: null, kv", + ) cmd.Flags().String( "local_perf", "", diff --git a/libs/tendermint/config/config.go b/libs/tendermint/config/config.go index 0328b36abc..3622f3264d 100644 --- a/libs/tendermint/config/config.go +++ b/libs/tendermint/config/config.go @@ -694,15 +694,16 @@ func DefaultMempoolConfig() *MempoolConfig { Broadcast: true, // Each signature verification takes .5ms, Size reduced until we implement // ABCI Recheck - Size: 10000, // exchain memory pool size(max tx num) + Size: 200_000, // exchain memory pool size(max tx num) MaxTxsBytes: 1024 * 1024 * 1024, // 1GB - CacheSize: 10000, + CacheSize: 300_000, MaxTxBytes: 1024 * 1024, // 1MB MaxTxNumPerBlock: 300, MaxGasUsedPerBlock: -1, SortTxByGp: true, ForceRecheckGap: 2000, TxPriceBump: 10, + EnablePendingPool: false, PendingPoolSize: 50000, PendingPoolPeriod: 3, PendingPoolReserveBlocks: 100, @@ -829,7 +830,7 @@ func DefaultConsensusConfig() *ConsensusConfig { TimeoutPrevoteDelta: 500 * time.Millisecond, TimeoutPrecommit: 1000 * time.Millisecond, TimeoutPrecommitDelta: 500 * time.Millisecond, - TimeoutCommit: 3800 * time.Millisecond, + TimeoutCommit: types.TimeoutCommit * time.Millisecond, TimeoutConsensus: 1000 * time.Millisecond, SkipTimeoutCommit: false, CreateEmptyBlocks: true, diff --git a/libs/tendermint/config/dynamic_config_okchain.go b/libs/tendermint/config/dynamic_config_okchain.go index 101d2f37f1..7ce1ef5351 100644 --- a/libs/tendermint/config/dynamic_config_okchain.go +++ b/libs/tendermint/config/dynamic_config_okchain.go @@ -6,8 +6,11 @@ type IDynamicConfig interface { GetMempoolRecheck() bool GetMempoolForceRecheckGap() int64 GetMempoolSize() int + GetMempoolCacheSize() int GetMaxTxNumPerBlock() int64 GetMaxGasUsedPerBlock() int64 + GetEnablePGU() bool + GetPGUAdjustment() float64 GetMempoolFlush() bool GetNodeKeyWhitelist() []string GetMempoolCheckTxCost() bool @@ -45,6 +48,10 @@ func (d MockDynamicConfig) GetMempoolSize() int { return DefaultMempoolConfig().Size } +func (d MockDynamicConfig) GetMempoolCacheSize() int { + return DefaultMempoolConfig().CacheSize +} + func (d MockDynamicConfig) GetMaxTxNumPerBlock() int64 { return DefaultMempoolConfig().MaxTxNumPerBlock } @@ -53,6 +60,14 @@ func (d MockDynamicConfig) GetMaxGasUsedPerBlock() int64 { return DefaultMempoolConfig().MaxGasUsedPerBlock } +func (d MockDynamicConfig) GetEnablePGU() bool { + return false +} + +func (d MockDynamicConfig) GetPGUAdjustment() float64 { + return 1 +} + func (d MockDynamicConfig) GetMempoolFlush() bool { return false } diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index 7d214778a9..1e9b35ab1e 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "math/big" "strconv" "sync" @@ -13,8 +14,7 @@ import ( "github.com/VictoriaMetrics/fastcache" - "github.com/tendermint/go-amino" - + lru "github.com/hashicorp/golang-lru" "github.com/okex/exchain/libs/system/trace" abci "github.com/okex/exchain/libs/tendermint/abci/types" cfg "github.com/okex/exchain/libs/tendermint/config" @@ -23,6 +23,7 @@ import ( tmmath "github.com/okex/exchain/libs/tendermint/libs/math" "github.com/okex/exchain/libs/tendermint/proxy" "github.com/okex/exchain/libs/tendermint/types" + "github.com/tendermint/go-amino" ) type TxInfoParser interface { @@ -90,6 +91,10 @@ type CListMempool struct { checkP2PTotalTime int64 txs ITransactionQueue + + simQueue chan *mempoolTx + + gasCache *lru.Cache } var _ Mempool = &CListMempool{} @@ -110,6 +115,11 @@ func NewCListMempool( } else { txQueue = NewBaseTxQueue() } + + gasCache, err := lru.New(1000000) + if err != nil { + panic(err) + } mempool := &CListMempool{ config: config, proxyAppConn: proxyAppConn, @@ -120,9 +130,13 @@ func NewCListMempool( logger: log.NewNopLogger(), metrics: NopMetrics(), txs: txQueue, + simQueue: make(chan *mempoolTx, 100000), + gasCache: gasCache, } - if config.CacheSize > 0 { - mempool.cache = newMapTxCache(config.CacheSize) + go mempool.simulationRoutine() + + if cfg.DynamicConfig.GetMempoolCacheSize() > 0 { + mempool.cache = newMapTxCache(cfg.DynamicConfig.GetMempoolCacheSize()) } else { mempool.cache = nopTxCache{} } @@ -401,6 +415,14 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) error { if err := mem.txs.Insert(memTx); err != nil { return err } + if cfg.DynamicConfig.GetMaxGasUsedPerBlock() > -1 && cfg.DynamicConfig.GetEnablePGU() { + select { + case mem.simQueue <- memTx: + default: + mem.logger.Error("tx simulation queue is full") + } + } + atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) mem.eventBus.PublishEventPendingTx(types.EventDataTx{TxResult: types.TxResult{ @@ -408,8 +430,6 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) error { Tx: memTx.tx, }}) - types.SignatureCache().Remove(memTx.realTx.TxHash()) - return nil } @@ -702,6 +722,15 @@ func (mem *CListMempool) notifyTxsAvailable() { } } +func (mem *CListMempool) GetTxSimulateGas(txHash string) int64 { + hash := hex.EncodeToString([]byte(txHash)) + v, ok := mem.gasCache.Get(hash) + if !ok { + return -1 + } + return v.(int64) +} + func (mem *CListMempool) ReapEssentialTx(tx types.Tx) abci.TxEssentials { if ele, ok := mem.txs.Load(txKey(tx)); ok { return ele.Value.(*mempoolTx).realTx @@ -723,9 +752,12 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) []types.Tx { // size per tx, and set the initial capacity based off of that. // txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max/mem.avgTxSize)) txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), int(cfg.DynamicConfig.GetMaxTxNumPerBlock()))) + var simCount, simGas int64 defer func() { mem.logger.Info("ReapMaxBytesMaxGas", "ProposingHeight", mem.Height()+1, "MempoolTxs", mem.txs.Len(), "ReapTxs", len(txs)) + trace.GetElapsedInfo().AddInfo(trace.SimTx, fmt.Sprintf("%d:%d", mem.Height()+1, simCount)) + trace.GetElapsedInfo().AddInfo(trace.SimGasUsed, fmt.Sprintf("%d:%d", mem.Height()+1, simGas)) }() for e := mem.txs.Front(); e != nil; e = e.Next() { memTx := e.Value.(*mempoolTx) @@ -739,7 +771,8 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) []types.Tx { // If maxGas is negative, skip this check. // Since newTotalGas < masGas, which // must be non-negative, it follows that this won't overflow. - newTotalGas := totalGas + memTx.gasWanted + gasWanted := atomic.LoadInt64(&memTx.gasWanted) + newTotalGas := totalGas + gasWanted if maxGas > -1 && newTotalGas > maxGas { return txs } @@ -750,6 +783,10 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) []types.Tx { totalTxNum++ totalGas = newTotalGas txs = append(txs, memTx.tx) + simGas += gasWanted + if atomic.LoadUint32(&memTx.isSim) > 0 { + simCount++ + } } return txs @@ -858,11 +895,13 @@ func (mem *CListMempool) Update( if mem.pendingPool != nil { addressNonce = make(map[string]uint64) } + for i, tx := range txs { txCode := deliverTxResponses[i].Code addr := "" nonce := uint64(0) if ele := mem.cleanTx(height, tx, txCode); ele != nil { + atomic.AddUint32(&(ele.Value.(*mempoolTx).isOutdated), 1) addr = ele.Address nonce = ele.Nonce mem.logUpdate(ele.Address, ele.Nonce) @@ -1061,6 +1100,9 @@ type mempoolTx struct { from string senderNonce uint64 + isOutdated uint32 + isSim uint32 + // ids of peers who've sent us this tx (as a map for quick lookups). // senders: PeerID -> bool senders map[uint16]struct{} @@ -1222,3 +1264,26 @@ func (mem *CListMempool) simulateTx(tx types.Tx) (*SimulationResponse, error) { err = cdc.UnmarshalBinaryBare(res.Value, &simuRes) return &simuRes, err } + +func (mem *CListMempool) simulationRoutine() { + for memTx := range mem.simQueue { + mem.simulationJob(memTx) + } +} + +func (mem *CListMempool) simulationJob(memTx *mempoolTx) { + defer types.SignatureCache().Remove(memTx.realTx.TxHash()) + if atomic.LoadUint32(&memTx.isOutdated) != 0 { + // memTx is outdated + return + } + simuRes, err := mem.simulateTx(memTx.tx) + if err != nil { + mem.logger.Error("simulateTx", "error", err, "txHash", memTx.tx.Hash(mem.Height())) + return + } + gas := int64(simuRes.GasUsed) * int64(cfg.DynamicConfig.GetPGUAdjustment()*100) / 100 + atomic.StoreInt64(&memTx.gasWanted, gas) + atomic.AddUint32(&memTx.isSim, 1) + mem.gasCache.Add(hex.EncodeToString(memTx.realTx.TxHash()), gas) +} diff --git a/libs/tendermint/mempool/mempool.go b/libs/tendermint/mempool/mempool.go index 5afc8588db..9290b1553f 100644 --- a/libs/tendermint/mempool/mempool.go +++ b/libs/tendermint/mempool/mempool.go @@ -89,6 +89,8 @@ type Mempool interface { SetAccountRetriever(retriever AccountRetriever) SetTxInfoParser(parser TxInfoParser) + + GetTxSimulateGas(txHash string) int64 } //-------------------------------------------------------------------------------- diff --git a/libs/tendermint/mock/mempool.go b/libs/tendermint/mock/mempool.go index 7707a9a582..33a883dae6 100644 --- a/libs/tendermint/mock/mempool.go +++ b/libs/tendermint/mock/mempool.go @@ -75,3 +75,5 @@ func (Mempool) SetAccountRetriever(_ mempl.AccountRetriever) { func (Mempool) SetTxInfoParser(_ mempl.TxInfoParser) { } + +func (Mempool) GetTxSimulateGas(txHash string) int64 { return 0 } diff --git a/libs/tendermint/rpc/core/mempool.go b/libs/tendermint/rpc/core/mempool.go index 1d40028af6..fcc32101ca 100644 --- a/libs/tendermint/rpc/core/mempool.go +++ b/libs/tendermint/rpc/core/mempool.go @@ -153,6 +153,12 @@ func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, err TotalBytes: env.Mempool.TxsBytes()}, nil } +func TxSimulateGasCost(ctx *rpctypes.Context, hash string) (*ctypes.ResponseTxSimulateGas, error) { + return &ctypes.ResponseTxSimulateGas{ + GasCost: env.Mempool.GetTxSimulateGas(hash), + }, nil +} + func UserUnconfirmedTxs(address string, limit int) (*ctypes.ResultUserUnconfirmedTxs, error) { txs := env.Mempool.ReapUserTxs(address, limit) return &ctypes.ResultUserUnconfirmedTxs{ diff --git a/libs/tendermint/rpc/core/routes.go b/libs/tendermint/rpc/core/routes.go index ebe3f27c8e..fa7e432069 100644 --- a/libs/tendermint/rpc/core/routes.go +++ b/libs/tendermint/rpc/core/routes.go @@ -48,6 +48,8 @@ var Routes = map[string]*rpc.RPCFunc{ // evidence API "broadcast_evidence": rpc.NewRPCFunc(BroadcastEvidence, "evidence"), + + "tx_simulate_gas": rpc.NewRPCFunc(TxSimulateGasCost, "hash"), } func AddUnsafeRoutes() { diff --git a/libs/tendermint/rpc/core/types/responses.go b/libs/tendermint/rpc/core/types/responses.go index 3edbb691b9..2dfbc3c957 100644 --- a/libs/tendermint/rpc/core/types/responses.go +++ b/libs/tendermint/rpc/core/types/responses.go @@ -204,6 +204,10 @@ type ResultUserUnconfirmedTxs struct { Txs []types.Tx `json:"txs"` } +type ResponseTxSimulateGas struct { + GasCost int64 `json:"gas_cost"` +} + // List of mempool addresses type ResultUnconfirmedAddresses struct { Addresses []string `json:"addresses"` diff --git a/libs/tendermint/state/execution.go b/libs/tendermint/state/execution.go index 8fa99e08b0..15b9f9807d 100644 --- a/libs/tendermint/state/execution.go +++ b/libs/tendermint/state/execution.go @@ -19,7 +19,7 @@ import ( "github.com/tendermint/go-amino" ) -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- type ( // Enum mode for executing [deliverTx, ...] DeliverTxsExecMode int @@ -34,6 +34,7 @@ const ( // 1: execute [deliverTx,...] deprecated // 2: execute [deliverTx,...] parallel FlagDeliverTxsExecMode = "deliver-txs-mode" + FlagEnableConcurrency = "enable-concurrency" ) // BlockExecutor handles block execution and state updates. @@ -207,6 +208,7 @@ func (blockExec *BlockExecutor) ApplyBlock( } trc := trace.NewTracer(trace.ApplyBlock) trc.EnableSummary() + trc.SetWorkloadStatistic(trace.GetApplyBlockWorkloadSttistic()) dc := blockExec.deltaContext defer func() { @@ -214,6 +216,7 @@ func (blockExec *BlockExecutor) ApplyBlock( trace.GetElapsedInfo().AddInfo(trace.Tx, strconv.Itoa(len(block.Data.Txs))) trace.GetElapsedInfo().AddInfo(trace.BlockSize, strconv.Itoa(block.FastSize())) trace.GetElapsedInfo().AddInfo(trace.RunTx, trc.Format()) + trace.GetElapsedInfo().AddInfo(trace.Workload, trace.GetApplyBlockWorkloadSttistic().Format()) trace.GetElapsedInfo().SetElapsedTime(trc.GetElapsedTime()) now := time.Now().UnixNano() @@ -237,6 +240,7 @@ func (blockExec *BlockExecutor) ApplyBlock( abciResponses, duration, err := blockExec.runAbci(block, deltaInfo) trace.GetElapsedInfo().AddInfo(trace.LastRun, fmt.Sprintf("%dms", duration.Milliseconds())) + trace.GetApplyBlockWorkloadSttistic().Add(trace.LastRun, time.Now(), duration) if err != nil { return state, 0, ErrProxyAppConn(err) @@ -244,7 +248,6 @@ func (blockExec *BlockExecutor) ApplyBlock( fail.Fail() // XXX - // Save the results before we commit. blockExec.trySaveABCIResponsesAsync(block.Height, abciResponses) @@ -290,7 +293,6 @@ func (blockExec *BlockExecutor) ApplyBlock( fail.Fail() // XXX - // Update the app hash and save the state. state.AppHash = commitResp.Data blockExec.trySaveStateAsync(state) @@ -327,9 +329,10 @@ func (blockExec *BlockExecutor) runAbci(block *types.Block, deltaInfo *DeltaInfo if deltaInfo != nil { blockExec.logger.Info("Apply delta", "height", block.Height, "deltas-length", deltaInfo.deltaLen) - + t0 := time.Now() execBlockOnProxyAppWithDeltas(blockExec.proxyApp, block, blockExec.db) abciResponses = deltaInfo.abciResponses + duration = time.Now().Sub(t0) } else { pc := blockExec.prerunCtx if pc.prerunTx { diff --git a/libs/tendermint/types/params.go b/libs/tendermint/types/params.go index a58aae115c..4f5d204741 100644 --- a/libs/tendermint/types/params.go +++ b/libs/tendermint/types/params.go @@ -13,6 +13,9 @@ import ( const ( // MaxBlockSizeBytes is the maximum permitted size of the blocks. MaxBlockSizeBytes = 104857600 // 100MB + + // TimeoutCommit is set for the stable of blockTime + TimeoutCommit = 3800 // 3.8s ) var ( diff --git a/x/evm/types/msg_evm.go b/x/evm/types/msg_evm.go index 2228bff144..4f228e1e79 100644 --- a/x/evm/types/msg_evm.go +++ b/x/evm/types/msg_evm.go @@ -59,18 +59,18 @@ func (tx *MsgEthereumTx) SetFrom(addr string) { // GetFrom returns sender address of MsgEthereumTx if signature is valid, or returns "". func (tx *MsgEthereumTx) GetFrom() string { from := tx.BaseTx.GetFrom() - if from == "" { - from, _ = tmtypes.SignatureCache().Get(tx.TxHash()) - if from == "" { - addr, err := tx.firstVerifySig(tx.ChainID()) - if err != nil { - return "" - } - return EthAddressToString(&addr) - } + if from != "" { + return from } - - return from + from, _ = tmtypes.SignatureCache().Get(tx.TxHash()) + if from != "" { + return from + } + err := tx.firstVerifySig(tx.ChainID()) + if err != nil { + return "" + } + return tx.BaseTx.GetFrom() } func (msg MsgEthereumTx) GetSender(ctx sdk.Context) string { @@ -325,13 +325,13 @@ var sigBigNumPool = &sync.Pool{ }, } -func (msg *MsgEthereumTx) firstVerifySig(chainID *big.Int) (ethcmn.Address, error) { +func (msg *MsgEthereumTx) firstVerifySig(chainID *big.Int) error { var V *big.Int var sigHash ethcmn.Hash if isProtectedV(msg.Data.V) { // do not allow recovery for transactions with an unprotected chainID if chainID.Sign() == 0 { - return emptyEthAddr, errors.New("chainID cannot be zero") + return errors.New("chainID cannot be zero") } bigNum := sigBigNumPool.Get().(*big.Int) @@ -352,9 +352,13 @@ func (msg *MsgEthereumTx) firstVerifySig(chainID *big.Int) (ethcmn.Address, erro sender, err := recoverEthSig(msg.Data.R, msg.Data.S, V, &sigHash) if err != nil { - return emptyEthAddr, err + return err } - return sender, nil + from := EthAddressToString(&sender) + tmtypes.SignatureCache().Add(msg.TxHash(), from) + msg.BaseTx.From = from + msg.addr = sender + return nil } // VerifySig attempts to verify a Transaction's signature for a given chainID. @@ -371,14 +375,10 @@ func (msg *MsgEthereumTx) VerifySig(chainID *big.Int, height int64) error { msg.SetFrom(from) return nil } - addr, err := msg.firstVerifySig(chainID) + err := msg.firstVerifySig(chainID) if err != nil { return err } - from = EthAddressToString(&addr) - tmtypes.SignatureCache().Add(msg.TxHash(), from) - msg.BaseTx.From = from - msg.addr = addr return nil } diff --git a/x/evm/types/msg_test.go b/x/evm/types/msg_test.go index 4838411aaa..b2fbfdbff5 100644 --- a/x/evm/types/msg_test.go +++ b/x/evm/types/msg_test.go @@ -510,7 +510,7 @@ func BenchmarkEvmTxVerifySig(b *testing.B) { b.Run("firstVerifySig", func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - _, err := msg.firstVerifySig(chainID) + err := msg.firstVerifySig(chainID) if err != nil { b.Fatal(err) } diff --git a/x/feesplit/client/rest/rest.go b/x/feesplit/client/rest/rest.go index 9295b641d2..fd3b4e45cb 100644 --- a/x/feesplit/client/rest/rest.go +++ b/x/feesplit/client/rest/rest.go @@ -41,7 +41,7 @@ func queryParamsHandlerFn(cliCtx context.CLIContext) http.HandlerFunc { if !ok { return } - + res, height, err := cliCtx.QueryWithData(fmt.Sprintf("custom/%s/%s", types.RouterKey, types.QueryParameters), nil) if err != nil {