From af9a55a9a648797688ca60ee9b264a3ed2b1caa1 Mon Sep 17 00:00:00 2001 From: MinZhu Date: Thu, 28 Apr 2022 22:55:30 +0800 Subject: [PATCH] Merge PR: partial concurrent for DeliverTxs (#1801) * remove setUpdateCount * remove useless code * remove Mutex in DTTManager * remove unused import * check whether getTxFee is nil before call it * add log * add log for fee-collector * add break logic for replay * add log * add log * remove SetFeeCollector log * print info of commistStore * print commitstore info * add log * print SetCoins * remove unused import * remove 01 for addr * remove log * print SetCoins * print GasConsumed * check GlobalHeight before print log * print GasConsumed * close multi cache * remove log * add log * add log * add log * add log * change deliverTxsMode when do mock replay * remove log * deal with empty block * rerun no evm tx * add log * remove log * add log * print deliverState.ms * add log info * print account info * create module account for feeCollector before setcoins * add log * set nextTaskRoutine whether ready or not * optimize keepAliveTicker * updateFeeCollectorAccHandler in EndBlock * Stop keepAliveTicker * rerun all concurrent txs after cosmos tx finished * remove GetModuleAccount in updateFeeCollectorHandler * add log * add dttRoutineStepWaitRerun * sleep some time to wait account update finished * set prevTaskIndex on accountUpdated * stop keepAliveTicker before running serial * use rerunRoutines list * add log * add condition for ExtractNextSerialFromTicker * change keepAliveIntervalMS * reset nextTaskRoutine * remove reset * add condition for stop dttRoutine * add log * add log * disable FlagMultiCache * fix conflicts * print commitids * print commitIDs * print string * stop replay * print string * print setCoins * print setCoins * remove dirty code * add RunMsgFailed log * add RunMsgFailed info * add log * add log * add log * add log * add log * print gas info * add log * add log * add ConsumeGas * modify log * add consumeGas * remove log * set context without withXXX * add log * use GetAccountGas instead of TryAddGetAccountGas * remove log * remove log * remove log * add log * add log * add log * remove log * use defer to catch panic and recover * optimize judgement for !isEvm * use AddConsumeGasForSendCoins to consume gas * remove useless code * add log * remove log * revert defer-s in runtxWithInfo * add log * modify consumed gas for SendCoins to FeeCollector * add log * add log * add log * modify AddConsumeGasForSendCoins * add log * modify AddConsumeGasForSendCoins * use InfiniteGasMeter when GetModuleAccount * remove log * check writeGas * merge from pc-no-fee-collector * use FlagDeliverTxsExecMode * remove DeliverTxsExecMode * modify ut * remove log * optimize conditions for not rerun ante * tidy code * remove log * add intItem for conflictPair * remove log * add log * add log * add log * add log * set DeliverTxsMode in replay * optimize hasConflict * remove log * remove all log * remove log * remove comments * rewrite deductFees for ethGasConsume * update FeeCollector account before executing cosmos tx * optimize code * remove GetAccountBinarySize * remove parameter in ethGasConsume * rerun cosmos tx before running serial content * calculate replay duration and print it * modify ut for anteHandler * remove suite.app.SupplyKeeper.GetModuleAccount * add condition for not check rerun * optimize serialRoutine * add pin for RunMsg and Refund * add log * optimize logic for rerunRoutines * add FlagDeliverTxsConcurrentNum * add log * add log * do not calculate gas during updateFeeCollectorHandler * updateFeeCollectorAcc only when fee changed * remove log * remove unused import * remove parameter from SetAccount * fix ut for SetAccount * add condition for calling UpdateAccountInStateDB * use optional parameter in SetAccount (#1900) * use optional parameter in SetAccount * optimize accountUpdated func in DTTManager * first commit * Mz/pc option (#1901) * use optional parameter in SetAccount * optimize accountUpdated func in DTTManager * optimize OnAccountUpdated * use normal parameter in OnAccountUpdated * split functions in DTTManager * encapsulate conditions * set gasUsed 0 when ante failed * add log * use startRunMsg to judge whether ante failed or not * add log * delete unless code * fix build * remove log * remove log * fix ut bug * merge defer funcs * use recover when run ante * delete preWriteMs * split defer because of recover * take care of error from VerifySig * add comment * remove a condition for calling updateFeeCollectorAccHandler * Update store.go * upd * upd * use const * use cache in runTxModeDeliverPartConcurrent * splite defer func-s in serialExecution * fix conflict * remove check config for FlagDeliverTxsExecMode and FlagParalleledTx * Update baseapp_runtx.go * typ * address to string * delete modeHandlerDeliverPartConcurrent * support prerun * optimize code * revert changes for ParalleledTx * remove useless code * small modify * fix random panic * remove FlagParalleledTx * write ms cache after handleGasConsumed * modify stopChan * register dynamic config in repair * delete evmTxFeeHandler * fix ut Co-authored-by: KamiD <44460798+KamiD@users.noreply.github.com> Co-authored-by: Zhong Qiu <36867992+zhongqiuwood@users.noreply.github.com> Co-authored-by: chunfengSun <516108736@qq.com> Co-authored-by: zhongqiuwood Co-authored-by: xiangjianmeng <805442788@qq.com> --- app/ante/ante.go | 2 +- app/app.go | 5 +- app/app_parallel.go | 24 - app/app_partial_concurrent.go | 38 + app/app_upgrade_test.go | 2 +- app/config/config.go | 18 +- app/repair_state.go | 3 + app/rpc/namespaces/eth/simulation/impl.go | 2 +- app/utils/sanity/start.go | 27 +- app/utils/sanity/start_test.go | 61 +- app/utils/sanity/type.go | 17 + cmd/exchaind/repair_data.go | 3 +- cmd/exchaind/replay.go | 4 +- go.mod | 6 +- go.sum | 39 ++ libs/cosmos-sdk/baseapp/baseapp.go | 27 +- libs/cosmos-sdk/baseapp/baseapp_mode_base.go | 2 + libs/cosmos-sdk/baseapp/baseapp_parallel.go | 4 +- .../baseapp/baseapp_partial_concurrent.go | 659 ++++++++++++++++++ libs/cosmos-sdk/baseapp/baseapp_runtx.go | 6 +- libs/cosmos-sdk/baseapp/options.go | 10 +- libs/cosmos-sdk/server/start_okchain.go | 3 +- libs/cosmos-sdk/types/context.go | 32 +- libs/cosmos-sdk/types/handler.go | 2 +- libs/cosmos-sdk/x/auth/keeper/account.go | 5 +- libs/cosmos-sdk/x/auth/keeper/ante_okchain.go | 2 +- .../x/bank/internal/types/expected_keepers.go | 2 +- .../x/genutil/types/expected_keepers.go | 2 +- .../supply/internal/types/expected_keepers.go | 2 +- libs/ibc-go/testing/simapp/app.go | 1 - libs/tendermint/abci/client/client.go | 1 + libs/tendermint/abci/client/local_client.go | 6 + .../example/kvstore/persistent_kvstore.go | 4 + libs/tendermint/abci/types/application.go | 5 + .../config/dynamic_config_okchain.go | 6 +- libs/tendermint/proxy/app_conn.go | 5 + libs/tendermint/state/execution.go | 29 +- libs/tendermint/state/execution_concurrent.go | 45 ++ libs/tendermint/state/execution_parallel.go | 4 - libs/tendermint/state/execution_task.go | 10 +- x/common/analyzer/analyzer.go | 2 +- x/evm/keeper/keeper.go | 2 +- x/evm/types/expected_keepers.go | 2 +- x/evm/types/statedb.go | 9 +- 44 files changed, 973 insertions(+), 167 deletions(-) create mode 100644 app/app_partial_concurrent.go create mode 100644 libs/cosmos-sdk/baseapp/baseapp_partial_concurrent.go create mode 100644 libs/tendermint/state/execution_concurrent.go diff --git a/app/ante/ante.go b/app/ante/ante.go index b42fb31983..812aed68c1 100644 --- a/app/ante/ante.go +++ b/app/ante/ante.go @@ -40,7 +40,7 @@ func NewAnteHandler(ak auth.AccountKeeper, evmKeeper EVMKeeper, sk types.SupplyK authante.NewValidateBasicDecorator(), authante.NewValidateMemoDecorator(ak), authante.NewConsumeGasForTxSizeDecorator(ak), - authante.NewSetPubKeyDecorator(ak), // SetPubKeyDecorator must be called before all signature verification decorators + authante.NewSetPubKeyDecorator(ak), // SetPubKeyDecorator must be called before all signature verification decorators authante.NewValidateSigCountDecorator(ak), authante.NewDeductFeeDecorator(ak, sk), authante.NewSigGasConsumeDecorator(ak, sigGasConsumer), diff --git a/app/app.go b/app/app.go index 0a0fc26096..2a5c0dbd3d 100644 --- a/app/app.go +++ b/app/app.go @@ -293,6 +293,7 @@ func NewOKExChainApp( app.AccountKeeper = auth.NewAccountKeeper( codecProxy.GetCdc(), keys[auth.StoreKey], app.subspaces[auth.ModuleName], okexchain.ProtoAccount, ) + app.AccountKeeper.SetObserverKeeper(app) bankKeeper := bank.NewBaseKeeperWithMarshal( &app.AccountKeeper, codecProxy, app.subspaces[bank.ModuleName], app.ModuleAccountAddrs(), @@ -528,8 +529,10 @@ func NewOKExChainApp( app.SetEndBlocker(app.EndBlocker) app.SetGasRefundHandler(refund.NewGasRefundHandler(app.AccountKeeper, app.SupplyKeeper)) app.SetAccHandler(NewAccHandler(app.AccountKeeper)) - app.SetParallelTxHandlers(updateFeeCollectorHandler(app.BankKeeper, app.SupplyKeeper), evmTxFeeHandler(), fixLogForParallelTxHandler(app.EvmKeeper)) + app.SetParallelTxHandlers(updateFeeCollectorHandler(app.BankKeeper, app.SupplyKeeper), fixLogForParallelTxHandler(app.EvmKeeper)) app.SetPreDeliverTxHandler(preDeliverTxHandler(app.AccountKeeper)) + app.SetPartialConcurrentHandlers(getTxFeeAndFromHandler(app.AccountKeeper)) + if loadLatest { err := app.LoadLatestVersion(app.keys[bam.MainStoreKey]) if err != nil { diff --git a/app/app_parallel.go b/app/app_parallel.go index 1afa2937db..bb5113aada 100644 --- a/app/app_parallel.go +++ b/app/app_parallel.go @@ -4,7 +4,6 @@ import ( ethermint "github.com/okex/exchain/app/types" sdk "github.com/okex/exchain/libs/cosmos-sdk/types" "github.com/okex/exchain/libs/cosmos-sdk/x/auth" - authante "github.com/okex/exchain/libs/cosmos-sdk/x/auth/ante" "github.com/okex/exchain/libs/cosmos-sdk/x/bank" "github.com/okex/exchain/libs/cosmos-sdk/x/supply" "github.com/okex/exchain/x/evm" @@ -18,29 +17,6 @@ func updateFeeCollectorHandler(bk bank.Keeper, sk supply.Keeper) sdk.UpdateFeeCo } } -// evmTxFeeHandler get tx fee for evm tx -func evmTxFeeHandler() sdk.GetTxFeeHandler { - - return func(ctx sdk.Context, tx sdk.Tx, verifySig bool) (fee sdk.Coins, isEvm bool, from string, to string) { - if verifySig { - if evmTx, ok := tx.(*evmtypes.MsgEthereumTx); ok { - isEvm = true - _ = evmTx.VerifySig(evmTx.ChainID(), ctx.BlockHeight()) - from = evmTx.BaseTx.From - to = "" - if evmTx.To() != nil { - to = evmTx.To().String() - } - } - } - if feeTx, ok := tx.(authante.FeeTx); ok { - fee = feeTx.GetFee() - } - - return - } -} - // fixLogForParallelTxHandler fix log for parallel tx func fixLogForParallelTxHandler(ek *evm.Keeper) sdk.LogFix { return func(logIndex []int, anteErrs []error) (logs [][]byte) { diff --git a/app/app_partial_concurrent.go b/app/app_partial_concurrent.go new file mode 100644 index 0000000000..51567dc0b6 --- /dev/null +++ b/app/app_partial_concurrent.go @@ -0,0 +1,38 @@ +package app + +import ( + "encoding/hex" + sdk "github.com/okex/exchain/libs/cosmos-sdk/types" + "github.com/okex/exchain/libs/cosmos-sdk/x/auth" + authante "github.com/okex/exchain/libs/cosmos-sdk/x/auth/ante" + evmtypes "github.com/okex/exchain/x/evm/types" + "strings" +) + +// getTxFeeAndFromHandler get tx fee and from +func getTxFeeAndFromHandler(ak auth.AccountKeeper) sdk.GetTxFeeAndFromHandler { + return func(ctx sdk.Context, tx sdk.Tx) (fee sdk.Coins, isEvm bool, from string, to string, err error) { + if evmTx, ok := tx.(*evmtypes.MsgEthereumTx); ok { + isEvm = true + err = evmTx.VerifySig(evmTx.ChainID(), ctx.BlockHeight()) + if err != nil { + return + } + fee = evmTx.GetFee() + from = evmTx.BaseTx.From + if len(from) > 2 { + from = strings.ToLower(from[2:]) + } + if evmTx.To() != nil { + to = strings.ToLower(evmTx.To().String()[2:]) + } + } else if feeTx, ok := tx.(authante.FeeTx); ok { + fee = feeTx.GetFee() + feePayer := feeTx.FeePayer(ctx) + feePayerAcc := ak.GetAccount(ctx, feePayer) + from = hex.EncodeToString(feePayerAcc.GetAddress()) + } + + return + } +} diff --git a/app/app_upgrade_test.go b/app/app_upgrade_test.go index 5466c88e0d..d2052a1048 100644 --- a/app/app_upgrade_test.go +++ b/app/app_upgrade_test.go @@ -514,7 +514,7 @@ func newTestOkcChainApp( app.SetEndBlocker(app.EndBlocker) app.SetGasRefundHandler(refund.NewGasRefundHandler(app.AccountKeeper, app.SupplyKeeper)) app.SetAccHandler(NewAccHandler(app.AccountKeeper)) - app.SetParallelTxHandlers(updateFeeCollectorHandler(app.BankKeeper, app.SupplyKeeper), evmTxFeeHandler(), fixLogForParallelTxHandler(app.EvmKeeper)) + app.SetParallelTxHandlers(updateFeeCollectorHandler(app.BankKeeper, app.SupplyKeeper), fixLogForParallelTxHandler(app.EvmKeeper)) if loadLatest { err := app.LoadLatestVersion(app.keys[bam.MainStoreKey]) diff --git a/app/config/config.go b/app/config/config.go index f0a9188863..99f2fe0e50 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -66,7 +66,7 @@ type OecConfig struct { // enable-analyzer enableAnalyzer bool - enableParalleledTx bool + deliverTxsMode int } const ( @@ -196,7 +196,7 @@ func (c *OecConfig) loadFromConfig() { c.SetNodeKeyWhitelist(viper.GetString(FlagNodeKeyWhitelist)) c.SetEnableWtx(viper.GetBool(FlagEnableWrappedTx)) c.SetEnableAnalyzer(viper.GetBool(analyzer.FlagEnableAnalyzer)) - c.SetParalleledTxStatus(viper.GetBool(state.FlagParalleledTx)) + c.SetDeliverTxsExecuteMode(viper.GetInt(state.FlagDeliverTxsExecMode)) } func resolveNodeKeyWhitelist(plain string) []string { @@ -364,12 +364,12 @@ func (c *OecConfig) update(key, value interface{}) { return } c.SetEnableAnalyzer(r) - case state.FlagParalleledTx: - r, err := strconv.ParseBool(v) + case state.FlagDeliverTxsExecMode: + r, err := strconv.Atoi(v) if err != nil { return } - c.SetParalleledTxStatus(r) + c.SetDeliverTxsExecuteMode(r) } } @@ -418,12 +418,12 @@ func (c *OecConfig) GetEnableWtx() bool { return c.enableWtx } -func (c *OecConfig) SetParalleledTxStatus(enable bool) { - c.enableParalleledTx = enable +func (c *OecConfig) SetDeliverTxsExecuteMode(mode int) { + c.deliverTxsMode = mode } -func (c *OecConfig) GetParalleledTxEnable() bool { - return c.enableParalleledTx +func (c *OecConfig) GetDeliverTxsExecuteMode() int { + return c.deliverTxsMode } func (c *OecConfig) SetEnableWtx(value bool) { diff --git a/app/repair_state.go b/app/repair_state.go index a97c9ff9f5..71132f6f7c 100644 --- a/app/repair_state.go +++ b/app/repair_state.go @@ -2,6 +2,7 @@ package app import ( "fmt" + "github.com/okex/exchain/app/config" "io" "io/ioutil" "log" @@ -151,6 +152,8 @@ func newRepairApp(logger tmlog.Logger, db dbm.DB, traceStore io.Writer) *repairA func doRepair(ctx *server.Context, state sm.State, stateStoreDB dbm.DB, proxyApp proxy.AppConns, startHeight, latestHeight int64, dataDir string) { + config.RegisterDynamicConfig(ctx.Logger.With("module", "config")) + stateCopy := state.Copy() ctx.Logger.Debug("stateCopy", "state", fmt.Sprintf("%+v", stateCopy)) // construct state for repair diff --git a/app/rpc/namespaces/eth/simulation/impl.go b/app/rpc/namespaces/eth/simulation/impl.go index 5308e51135..aa1269566c 100644 --- a/app/rpc/namespaces/eth/simulation/impl.go +++ b/app/rpc/namespaces/eth/simulation/impl.go @@ -84,7 +84,7 @@ func (a AccountKeeperProxy) GetAccount(ctx sdk.Context, addr sdk.AccAddress) aut return account } -func (a AccountKeeperProxy) SetAccount(ctx sdk.Context, account authexported.Account) { +func (a AccountKeeperProxy) SetAccount(ctx sdk.Context, account authexported.Account, updateState ...bool) { acc, ok := account.(types.EthAccount) if !ok { return diff --git a/app/utils/sanity/start.go b/app/utils/sanity/start.go index e0cfe4cef6..29faa95801 100644 --- a/app/utils/sanity/start.go +++ b/app/utils/sanity/start.go @@ -54,10 +54,15 @@ import ( var ( // conflicts flags startConflictElems = []conflictPair{ - // --fast-query conflict with --paralleled-tx=true + // --fast-query conflict with --deliver-txs-mode=1 { configA: boolItem{name: watcher.FlagFastQuery, value: true}, - configB: boolItem{name: state.FlagParalleledTx, value: true}, + configB: intItem{name: state.FlagDeliverTxsExecMode, value: 1}, + }, + // --fast-query conflict with --deliver-txs-mode=2 + { + configA: boolItem{name: watcher.FlagFastQuery, value: true}, + configB: intItem{name: state.FlagDeliverTxsExecMode, value: 2}, }, // --fast-query conflict with --pruning=nothing { @@ -69,15 +74,25 @@ var ( configA: boolItem{name: consensus.EnablePrerunTx, value: true}, configB: boolItem{name: types.FlagDownloadDDS, value: true}, }, - // --upload-delta conflict with --paralleled-tx=true + // --upload-delta conflict with --deliver-txs-mode=1 { configA: boolItem{name: types.FlagUploadDDS, value: true}, - configB: boolItem{name: state.FlagParalleledTx, value: true}, + configB: intItem{name: state.FlagDeliverTxsExecMode, value: 1}, + }, + // --upload-delta conflict with --deliver-txs-mode=2 + { + configA: boolItem{name: types.FlagUploadDDS, value: true}, + configB: intItem{name: state.FlagDeliverTxsExecMode, value: 2}, + }, + // --node-mode=rpc(--fast-query) conflicts with --deliver-txs-mode=1 + { + configA: stringItem{name: apptype.FlagNodeMode, value: string(apptype.RpcNode)}, + configB: intItem{name: state.FlagDeliverTxsExecMode, value: 1}, }, - // --node-mode=rpc(--fast-query) conflicts with --paralleled-tx=true and --pruning=nothing + // --node-mode=rpc(--fast-query) conflicts with --deliver-txs-mode=2 and --pruning=nothing { configA: stringItem{name: apptype.FlagNodeMode, value: string(apptype.RpcNode)}, - configB: boolItem{name: state.FlagParalleledTx, value: true}, + configB: intItem{name: state.FlagDeliverTxsExecMode, value: 2}, }, { configA: stringItem{name: apptype.FlagNodeMode, value: string(apptype.RpcNode)}, diff --git a/app/utils/sanity/start_test.go b/app/utils/sanity/start_test.go index 14825bdab4..f889a439a1 100644 --- a/app/utils/sanity/start_test.go +++ b/app/utils/sanity/start_test.go @@ -5,30 +5,12 @@ import ( "github.com/okex/exchain/libs/cosmos-sdk/server" "github.com/okex/exchain/libs/cosmos-sdk/store/types" "github.com/okex/exchain/libs/tendermint/consensus" - "github.com/okex/exchain/libs/tendermint/state" ttypes "github.com/okex/exchain/libs/tendermint/types" "github.com/okex/exchain/x/evm/watcher" "github.com/spf13/cobra" "testing" ) -func getCommandNodeModeRpcParalleledTx() *cobra.Command { - return getCommand([]universeFlag{ - &stringFlag{ - Name: apptype.FlagNodeMode, - Default: "", - Changed: true, - Value: string(apptype.RpcNode), - }, - &boolFlag{ - Name: state.FlagParalleledTx, - Default: false, - Changed: true, - Value: true, - }, - }) -} - func getCommandNodeModeRpcPruningNothing() *cobra.Command { return getCommand([]universeFlag{ &stringFlag{ @@ -46,23 +28,6 @@ func getCommandNodeModeRpcPruningNothing() *cobra.Command { }) } -func getCommandFastQueryParalleledTx() *cobra.Command { - return getCommand([]universeFlag{ - &boolFlag{ - Name: watcher.FlagFastQuery, - Default: false, - Changed: true, - Value: true, - }, - &boolFlag{ - Name: state.FlagParalleledTx, - Default: false, - Changed: true, - Value: true, - }, - }) -} - func getCommandFastQueryPruningNothing() *cobra.Command { return getCommand([]universeFlag{ &boolFlag{ @@ -97,23 +62,6 @@ func getCommandEnablePreruntxDownloadDelta() *cobra.Command { }) } -func getCommandUploadDeltaParalleledTx() *cobra.Command { - return getCommand([]universeFlag{ - &boolFlag{ - Name: ttypes.FlagUploadDDS, - Default: false, - Changed: true, - Value: true, - }, - &boolFlag{ - Name: state.FlagParalleledTx, - Default: false, - Changed: true, - Value: true, - }, - }) -} - func TestCheckStart(t *testing.T) { type args struct { cmd *cobra.Command @@ -123,12 +71,9 @@ func TestCheckStart(t *testing.T) { args args wantErr bool }{ - {name: "1. conflicts --fast-query and --paralleled-tx", args: args{cmd: getCommandFastQueryParalleledTx()}, wantErr: true}, - {name: "2. conflicts --fast-query and --pruning=nothing", args: args{cmd: getCommandFastQueryPruningNothing()}, wantErr: true}, - {name: "3. conflicts --enable-preruntx and --download-delta", args: args{cmd: getCommandEnablePreruntxDownloadDelta()}, wantErr: true}, - {name: "4. conflicts --upload-delta and --paralleled-tx=true", args: args{cmd: getCommandUploadDeltaParalleledTx()}, wantErr: true}, - {name: "5. conflicts --node-mod=rpc and --paralleled-tx=true", args: args{cmd: getCommandNodeModeRpcParalleledTx()}, wantErr: true}, - {name: "6. conflicts --node-mod=rpc and --pruning=nothing", args: args{cmd: getCommandNodeModeRpcPruningNothing()}, wantErr: true}, + {name: "1. conflicts --fast-query and --pruning=nothing", args: args{cmd: getCommandFastQueryPruningNothing()}, wantErr: true}, + {name: "2. conflicts --enable-preruntx and --download-delta", args: args{cmd: getCommandEnablePreruntxDownloadDelta()}, wantErr: true}, + {name: "3. conflicts --node-mod=rpc and --pruning=nothing", args: args{cmd: getCommandNodeModeRpcPruningNothing()}, wantErr: true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/app/utils/sanity/type.go b/app/utils/sanity/type.go index df12c9019a..a39f96decf 100644 --- a/app/utils/sanity/type.go +++ b/app/utils/sanity/type.go @@ -37,6 +37,23 @@ func (b boolItem) verbose() string { return fmt.Sprintf("--%v=%v", b.name, b.value) } +type intItem struct { + name string + value int +} + +func (i intItem) label() string { + return i.name +} + +func (i intItem) check() bool { + return viper.GetInt(i.label()) == i.value +} + +func (i intItem) verbose() string { + return fmt.Sprintf("--%v=%v", i.name, i.value) +} + type stringItem struct { name string value string diff --git a/cmd/exchaind/repair_data.go b/cmd/exchaind/repair_data.go index 8ff4b6894a..91e3e2b06a 100644 --- a/cmd/exchaind/repair_data.go +++ b/cmd/exchaind/repair_data.go @@ -23,10 +23,11 @@ func repairStateCmd(ctx *server.Context) *cobra.Command { log.Println("--------- repair data success ---------") }, } - cmd.Flags().Bool(sm.FlagParalleledTx, false, "parallel execution for evm txs") cmd.Flags().Int64(app.FlagStartHeight, 0, "Set the start block height for repair") cmd.Flags().Bool(flatkv.FlagEnable, false, "Enable flat kv storage for read performance") cmd.Flags().String(app.Elapsed, app.DefaultElapsedSchemas, "schemaName=1|0,,,") cmd.Flags().Bool(analyzer.FlagEnableAnalyzer, false, "Enable auto open log analyzer") + cmd.Flags().Int(sm.FlagDeliverTxsExecMode, 0, "execution mode for deliver txs") + cmd.Flags().Int(sm.FlagDeliverTxsConcurrentNum, 0, "concurrent number for deliver txs when using partial-concurrent mode") return cmd } diff --git a/cmd/exchaind/replay.go b/cmd/exchaind/replay.go index 7a69833e93..c142623cc6 100644 --- a/cmd/exchaind/replay.go +++ b/cmd/exchaind/replay.go @@ -265,9 +265,9 @@ func doReplay(ctx *server.Context, state sm.State, stateStoreDB dbm.DB, block := originBlockStore.LoadBlock(lastBlockHeight) meta := originBlockStore.LoadBlockMeta(lastBlockHeight) blockExec := sm.NewBlockExecutor(stateStoreDB, ctx.Logger, mockApp, mock.Mempool{}, sm.MockEvidencePool{}) - config.GetOecConfig().SetParalleledTxStatus(false) // mockApp not support parallel tx + config.GetOecConfig().SetDeliverTxsExecuteMode(0) // mockApp not support parallel tx state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block) - config.GetOecConfig().SetParalleledTxStatus(viper.GetBool(sm.FlagParalleledTx)) + config.GetOecConfig().SetDeliverTxsExecuteMode(viper.GetInt(sm.FlagDeliverTxsExecMode)) panicError(err) } diff --git a/go.mod b/go.mod index 7bc3b8b690..b4edffc01c 100644 --- a/go.mod +++ b/go.mod @@ -87,6 +87,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect github.com/cosmos/ledger-go v0.9.2 // indirect github.com/danieljoos/wincred v1.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -103,16 +104,19 @@ require ( github.com/go-stack/stack v1.8.0 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/pprof v0.0.0-20220318212150-b2ab0324ddda // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/gtank/ristretto255 v0.1.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.2.0 // indirect github.com/huin/goupnp v1.0.2 // indirect + github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d // indirect + github.com/kisielk/godepgraph v0.0.0-20190626013829-57a7e4a651a9 // indirect github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect @@ -143,7 +147,7 @@ require ( go.uber.org/multierr v1.5.0 // indirect go.uber.org/zap v1.15.0 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0 // indirect + golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 // indirect golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect golang.org/x/text v0.3.6 // indirect gopkg.in/ini.v1 v1.51.0 // indirect diff --git a/go.sum b/go.sum index fb990ed2e5..6c8f56fb27 100644 --- a/go.sum +++ b/go.sum @@ -337,6 +337,21 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= +github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= +github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= +github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= +github.com/google/pprof v0.0.0-20200507031123-427632fa3b1c/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= +github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= +github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20220318212150-b2ab0324ddda/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -408,6 +423,12 @@ github.com/huin/goupnp v1.0.2 h1:RfGLP+h3mvisuWEyybxNq5Eft3NWhHLPeUN72kpKZoI= github.com/huin/goupnp v1.0.2/go.mod h1:0dxJBVBHqTMjIUMkESDTNgOOx/Mw5wYIfyFmdzSamkM= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= +github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= +github.com/imdario/mergo v0.3.4/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/improbable-eng/grpc-web v0.14.1/go.mod h1:zEjGHa8DAlkoOXmswrNvhUGEYQA9UI7DhrGeHR1DMGU= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/flux v0.65.1/go.mod h1:J754/zds0vvpfwuq7Gc2wRdVwEodfpCFM7mYlOw2LqY= @@ -458,6 +479,8 @@ github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F github.com/karalabe/usb v0.0.0-20190919080040-51dc0efba356 h1:I/yrLt2WilKxlQKCM52clh5rGzTKpVctGT1lH4Dc8Jw= github.com/karalabe/usb v0.0.0-20190919080040-51dc0efba356/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/errcheck v1.6.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/godepgraph v0.0.0-20190626013829-57a7e4a651a9/go.mod h1:Gb5YEgxqiSSVrXKWQxDcKoCM94NO5QAwOwTaVmIUAMI= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/kkdai/bstream v1.0.0/go.mod h1:FDnDOHt5Yx4p3FaHcioFT0QjDOtgUpvjeZqAs+NVZZA= @@ -971,6 +994,22 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0 h1:xrCZDmdtoloIiooiA9q0OQb9r8HejIHYoHGhGCe1pGg= golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210915083310-ed5796bab164/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211105183446-c75c47738b0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211213223007-03aa0b5f6827/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= +golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 h1:OH54vjqzRWmbJ62fjuhxy7AxFFgoHN0/DPc/UrL8cAs= +golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/libs/cosmos-sdk/baseapp/baseapp.go b/libs/cosmos-sdk/baseapp/baseapp.go index 5b1e81b1ec..a9898265a2 100644 --- a/libs/cosmos-sdk/baseapp/baseapp.go +++ b/libs/cosmos-sdk/baseapp/baseapp.go @@ -10,11 +10,10 @@ import ( "reflect" "strings" - "github.com/okex/exchain/libs/cosmos-sdk/codec/types" - "github.com/okex/exchain/libs/tendermint/trace" "github.com/gogo/protobuf/proto" + "github.com/okex/exchain/libs/cosmos-sdk/codec/types" "github.com/okex/exchain/libs/cosmos-sdk/store" "github.com/okex/exchain/libs/cosmos-sdk/store/rootmulti" storetypes "github.com/okex/exchain/libs/cosmos-sdk/store/types" @@ -32,12 +31,13 @@ import ( ) const ( - runTxModeCheck runTxMode = iota // Check a transaction - runTxModeReCheck // Recheck a (pending) transaction after a commit - runTxModeSimulate // Simulate a transaction - runTxModeDeliver // Deliver a transaction - runTxModeDeliverInAsync //Deliver a transaction in Aysnc - runTxModeTrace // Trace a transaction + runTxModeCheck runTxMode = iota // Check a transaction + runTxModeReCheck // Recheck a (pending) transaction after a commit + runTxModeSimulate // Simulate a transaction + runTxModeDeliver // Deliver a transaction + runTxModeDeliverInAsync // Deliver a transaction in Aysnc + runTxModeDeliverPartConcurrent // Deliver a transaction partial concurrent + runTxModeTrace // Trace a transaction runTxModeWrappedCheck // MainStoreKey is the string representation of the main store @@ -104,6 +104,8 @@ func (m runTxMode) String() (res string) { res = "ModeSimulate" case runTxModeDeliver: res = "ModeDeliver" + case runTxModeDeliverPartConcurrent: + res = "ModeDeliverPartConcurrent" case runTxModeDeliverInAsync: res = "ModeDeliverInAsync" case runTxModeWrappedCheck: @@ -143,10 +145,11 @@ type BaseApp struct { // nolint: maligned idPeerFilter sdk.PeerFilter // filter peers by node ID fauxMerkleMode bool // if true, IAVL MountStores uses MountStoresDB for simulation speed. - getTxFee sdk.GetTxFeeHandler updateFeeCollectorAccHandler sdk.UpdateFeeCollectorAccHandler logFix sdk.LogFix + getTxFeeAndFromHandler sdk.GetTxFeeAndFromHandler + // volatile states: // // checkState is set on InitChain and reset on Commit @@ -190,7 +193,7 @@ type BaseApp struct { // nolint: maligned endLog recordHandle parallelTxManage *parallelTxManager - + deliverTxsMgr *DTTManager feeForCollector sdk.Coins feeChanged bool // used to judge whether should update the fee-collector account @@ -620,7 +623,7 @@ func validateBasicTxMsgs(msgs []sdk.Msg) error { // Returns the applications's deliverState if app is in runTxModeDeliver, // otherwise it returns the application's checkstate. func (app *BaseApp) getState(mode runTxMode) *state { - if mode == runTxModeDeliver || mode == runTxModeDeliverInAsync { + if mode == runTxModeDeliver || mode == runTxModeDeliverInAsync || mode == runTxModeDeliverPartConcurrent { return app.deliverState } @@ -652,7 +655,7 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context ctx.SetTxBytes(txBytes) } - if mode == runTxModeDeliver { + if mode == runTxModeDeliver || mode == runTxModeDeliverPartConcurrent { ctx.SetDeliver() } diff --git a/libs/cosmos-sdk/baseapp/baseapp_mode_base.go b/libs/cosmos-sdk/baseapp/baseapp_mode_base.go index 4abab5a74e..1baeb0b621 100644 --- a/libs/cosmos-sdk/baseapp/baseapp_mode_base.go +++ b/libs/cosmos-sdk/baseapp/baseapp_mode_base.go @@ -31,6 +31,8 @@ func (app *BaseApp) getModeHandler(mode runTxMode) modeHandler { case runTxModeTrace: h = &modeHandlerTrace{&modeHandlerDeliver{&modeHandlerBase{mode: mode, app: app}}} case runTxModeDeliver: + fallthrough + case runTxModeDeliverPartConcurrent: h = &modeHandlerDeliver{&modeHandlerBase{mode: mode, app: app}} case runTxModeSimulate: h = &modeHandlerSimulate{&modeHandlerBase{mode: mode, app: app}} diff --git a/libs/cosmos-sdk/baseapp/baseapp_parallel.go b/libs/cosmos-sdk/baseapp/baseapp_parallel.go index 5c0facc4f2..1d38d44a07 100644 --- a/libs/cosmos-sdk/baseapp/baseapp_parallel.go +++ b/libs/cosmos-sdk/baseapp/baseapp_parallel.go @@ -47,7 +47,7 @@ func (app *BaseApp) getExtraDataByTxs(txs [][]byte) { } return } - coin, isEvm, s, toAddr := app.getTxFee(app.getContextForTx(runTxModeDeliver, txBytes), tx, true) + coin, isEvm, s, toAddr, _ := app.getTxFeeAndFromHandler(app.getContextForTx(runTxModeDeliver, txBytes), tx) para.extraTxsInfo[index] = &extraDataForTx{ fee: coin, isEvm: isEvm, @@ -535,7 +535,7 @@ type parallelTxManager struct { } func newParallelTxManager() *parallelTxManager { - isAsync := viper.GetBool(sm.FlagParalleledTx) + isAsync := sm.DeliverTxsExecMode(viper.GetInt(sm.FlagDeliverTxsExecMode)) == sm.DeliverTxsExecModeParallel return ¶llelTxManager{ isAsyncDeliverTx: isAsync, workgroup: newAsyncWorkGroup(), diff --git a/libs/cosmos-sdk/baseapp/baseapp_partial_concurrent.go b/libs/cosmos-sdk/baseapp/baseapp_partial_concurrent.go new file mode 100644 index 0000000000..46728cf669 --- /dev/null +++ b/libs/cosmos-sdk/baseapp/baseapp_partial_concurrent.go @@ -0,0 +1,659 @@ +package baseapp + +import ( + "encoding/hex" + "fmt" + sdk "github.com/okex/exchain/libs/cosmos-sdk/types" + sdkerrors "github.com/okex/exchain/libs/cosmos-sdk/types/errors" + "github.com/okex/exchain/libs/cosmos-sdk/x/auth/exported" + abci "github.com/okex/exchain/libs/tendermint/abci/types" + "github.com/okex/exchain/libs/tendermint/libs/log" + sm "github.com/okex/exchain/libs/tendermint/state" + "github.com/okex/exchain/libs/tendermint/trace" + "github.com/spf13/viper" + "time" +) + +type ( + dttRoutineStep uint8 +) + +const ( + dttRoutineStepNone dttRoutineStep = iota + dttRoutineStepStart + dttRoutineStepWaitRerun + dttRoutineStepAnteStart + dttRoutineStepAnteFinished + dttRoutineStepReadyForSerial + dttRoutineStepSerial + dttRoutineStepFinished + + keepAliveIntervalMS = 5 + maxConcurrentCount = 4 +) + +type DeliverTxTask struct { + index int + canRerun int8 + routineIndex int8 + + info *runTxInfo + fee sdk.Coins + isEvm bool + from string + to string + err error + prevTaskIndex int // the index of a running tx with a smaller index while its from or to equals to this tx's from +} + +func newDeliverTxTask(tx sdk.Tx, index int) *DeliverTxTask { + t := &DeliverTxTask{ + index: index, + info: &runTxInfo{tx: tx}, + prevTaskIndex: -1, + } + + return t +} + +//------------------------------------- +type BasicProcessFn func(txByte []byte, index int) *DeliverTxTask +type RunAnteFn func(task *DeliverTxTask) error + +type dttRoutine struct { + done chan int8 + task *DeliverTxTask + txByteCh chan []byte + txIndex int + rerunCh chan int8 + index int8 + step dttRoutineStep + needToRerun bool + + logger log.Logger + + basicProFn BasicProcessFn + runAnteFn RunAnteFn +} + +func newDttRoutine(index int8, basicProcess BasicProcessFn, runAnte RunAnteFn) *dttRoutine { + dttr := &dttRoutine{ + index: index, + basicProFn: basicProcess, + runAnteFn: runAnte, + txIndex: -1, + } + + return dttr +} + +func (dttr *dttRoutine) setLogger(logger log.Logger) { + dttr.logger = logger +} + +func (dttr *dttRoutine) makeNewTask(txByte []byte, index int) { + dttr.step = dttRoutineStepStart + dttr.txIndex = index + dttr.needToRerun = false + dttr.txByteCh <- txByte +} + +func (dttr *dttRoutine) start() { + dttr.done = make(chan int8) + dttr.txByteCh = make(chan []byte, maxConcurrentCount) + dttr.rerunCh = make(chan int8, maxConcurrentCount) + dttr.step = dttRoutineStepNone + go dttr.executeTaskRoutine() +} + +func (dttr *dttRoutine) stop() { + dttr.step = dttRoutineStepNone + dttr.txIndex = -1 + dttr.done <- 0 +} + +func (dttr *dttRoutine) executeTaskRoutine() { + for { + select { + case <-dttr.done: + close(dttr.txByteCh) + close(dttr.rerunCh) + return + case tx := <-dttr.txByteCh: + dttr.task = dttr.basicProFn(tx, dttr.txIndex) + dttr.task.routineIndex = dttr.index + if dttr.task.err == nil { + dttr.runAnteFn(dttr.task) + } else { + dttr.step = dttRoutineStepReadyForSerial + } + case <-dttr.rerunCh: + //dttr.logger.Error("readRerunCh", "index", dttr.task.index, "step", dttr.step) + dttr.task.prevTaskIndex = -1 + if dttr.step == dttRoutineStepAnteFinished { + dttr.needToRerun = false + dttr.step = dttRoutineStepWaitRerun + dttr.runAnteFn(dttr.task) + } else if dttr.step == dttRoutineStepReadyForSerial || + dttr.step == dttRoutineStepSerial || + dttr.step == dttRoutineStepFinished { + dttr.needToRerun = false + } else { + // maybe the task is in other condition, running concurrent execution or running to make new task. + dttr.task.canRerun++ + dttr.needToRerun = false + } + } + } +} + +func (dttr *dttRoutine) shouldRerun(fromIndex int, fromAccountUpdate int) { + if dttr.step == dttRoutineStepReadyForSerial || dttr.needToRerun == true || (dttr.task.prevTaskIndex >= 0 && dttr.task.prevTaskIndex > fromIndex) { + return + } + if dttr.step == dttRoutineStepAnteStart || dttr.step == dttRoutineStepAnteFinished { + if fromAccountUpdate >= 0 && dttr.task.prevTaskIndex < fromAccountUpdate { + dttr.task.prevTaskIndex = fromAccountUpdate + } else { + dttr.needToRerun = true + dttr.rerunCh <- 0 + } + } +} + +func (dttr *dttRoutine) needToRerunWhenContextChanged() bool { + switch dttr.step { + case dttRoutineStepNone: + fallthrough + case dttRoutineStepStart: + fallthrough + case dttRoutineStepReadyForSerial: + fallthrough + case dttRoutineStepSerial: + fallthrough + case dttRoutineStepFinished: + return false + } + return true +} + +func (dttr *dttRoutine) readyForSerialExecution() bool { + if dttr.task == nil || dttr.needToRerun || dttr.task.canRerun > 0 || dttr.task.prevTaskIndex >= 0 { + return false + } + switch dttr.step { + case dttRoutineStepNone: + fallthrough + case dttRoutineStepStart: + fallthrough + case dttRoutineStepWaitRerun: + fallthrough + case dttRoutineStepAnteStart: + fallthrough + case dttRoutineStepFinished: + fallthrough + case dttRoutineStepSerial: + return false + } + return true +} + +func (dttr *dttRoutine) notReadyForCheckRerun(serialIndex int) bool { + if dttr.task == nil || + dttr.txIndex != dttr.task.index || + dttr.task.index <= serialIndex || + dttr.step < dttRoutineStepAnteStart { + return true + } + return false +} + +//------------------------------------- + +type DTTManager struct { + done chan int8 + totalCount int + txs [][]byte + dttRoutineList []*dttRoutine // key: txIndex, value: dttRoutine + serialIndex int + serialTask *DeliverTxTask + serialCh chan int8 + + txResponses []*abci.ResponseDeliverTx + invalidTxs int + app *BaseApp + checkStateCtx sdk.Context + + maxConcurrentNum int +} + +func NewDTTManager(app *BaseApp) *DTTManager { + dttm := &DTTManager{ + app: app, + maxConcurrentNum: maxConcurrentCount, + } + conNum := viper.GetInt(sm.FlagDeliverTxsConcurrentNum) + if conNum > 0 { + dttm.maxConcurrentNum = conNum + dttm.app.logger.Info("maxConcurrentNum", "value", dttm.maxConcurrentNum) + } + dttm.dttRoutineList = make([]*dttRoutine, 0, dttm.maxConcurrentNum) //sync.Map{} + for i := 0; i < dttm.maxConcurrentNum; i++ { + dttr := newDttRoutine(int8(i), dttm.concurrentBasic, dttm.runConcurrentAnte) + dttr.setLogger(dttm.app.logger) + dttm.dttRoutineList = append(dttm.dttRoutineList, dttr) + } + + return dttm +} + +func (dttm *DTTManager) deliverTxs(txs [][]byte) { + dttm.totalCount = len(txs) + dttm.app.logger.Info("TotalTxs", "count", dttm.totalCount) + dttm.txResponses = make([]*abci.ResponseDeliverTx, dttm.totalCount) + dttm.invalidTxs = 0 + if dttm.totalCount == 0 { + return + } + + dttm.done = make(chan int8, 1) + dttm.txs = txs + dttm.serialTask = nil + dttm.serialIndex = -1 + dttm.serialCh = make(chan int8, 4) + + dttm.checkStateCtx = dttm.app.checkState.ctx + dttm.checkStateCtx.SetBlockHeight(dttm.app.checkState.ctx.BlockHeight() + 1) + + go dttm.serialRoutine() + + for i := 0; i < dttm.maxConcurrentNum && i < dttm.totalCount; i++ { + dttr := dttm.dttRoutineList[i] + dttr.start() + dttr.makeNewTask(txs[i], i) + } +} + +func (dttm *DTTManager) concurrentBasic(txByte []byte, index int) *DeliverTxTask { + // create a new task + var realTx sdk.Tx + var err error + if mem := GetGlobalMempool(); mem != nil { + realTx, _ = mem.ReapEssentialTx(txByte).(sdk.Tx) + } + if realTx == nil { + realTx, err = dttm.app.txDecoder(txByte) + } + task := newDeliverTxTask(realTx, index) + task.info.txBytes = txByte + if err != nil { + task.err = err + return task + } + + task.info.handler = dttm.app.getModeHandler(runTxModeDeliverPartConcurrent) //dm.handler + task.fee, task.isEvm, task.from, task.to, err = dttm.app.getTxFeeAndFromHandler(dttm.checkStateCtx, task.info.tx) + if err != nil { + task.err = err + return task + } + + if err = validateBasicTxMsgs(task.info.tx.GetMsgs()); err != nil { + task.err = err + } + return task +} + +func (dttm *DTTManager) hasConflict(taskA *DeliverTxTask, taskB *DeliverTxTask) bool { + if len(taskA.from) > 0 && taskA.from == taskB.from { + return true + } + if len(taskA.to) == 0 && len(taskB.to) == 0 { + return false + } + if taskA.index < taskB.index && taskA.to == taskB.from { + return true + } else if taskA.index > taskB.index && taskA.from == taskB.to { + return true + } + return false +} + +func (dttm *DTTManager) runConcurrentAnte(task *DeliverTxTask) error { + if dttm.app.anteHandler == nil { + return fmt.Errorf("anteHandler cannot be nil") + } + + curDttr := dttm.dttRoutineList[task.routineIndex] + curDttr.step = dttRoutineStepAnteStart + defer func() { + curDttr.step = dttRoutineStepAnteFinished + }() + + dttm.setPrevTaskIndex(task) + if task.prevTaskIndex < dttm.serialIndex || (task.prevTaskIndex == dttm.serialIndex && dttm.serialTask == nil) { + task.prevTaskIndex = -1 + } else if task.index <= dttm.serialIndex || task.prevTaskIndex >= 0 { + return nil + } + + defer func() { + if r := recover(); r != nil { + err := dttm.app.runTx_defer_recover(r, task.info) + task.info.msCache = nil //TODO msCache not write + task.info.result = nil + task.err = err + } + }() + + task.info.ctx = dttm.app.getContextForTx(runTxModeDeliverPartConcurrent, task.info.txBytes) // same context for all txs in a block + task.canRerun = 0 + + task.info.ctx.SetCache(sdk.NewCache(dttm.app.blockCache, useCache(runTxModeDeliverPartConcurrent))) // one cache for a tx + + err := dttm.runAnte(task) + task.err = err + + if task.canRerun > 0 { + curDttr.shouldRerun(-1, -1) + } else if dttm.serialIndex+1 == task.index && !curDttr.needToRerun && task.prevTaskIndex < 0 && dttm.serialTask == nil { + dttm.serialCh <- task.routineIndex + } + + return nil +} + +func (dttm *DTTManager) setPrevTaskIndex(task *DeliverTxTask) { + count := len(dttm.dttRoutineList) + for i := 0; i < count; i++ { + dttr := dttm.dttRoutineList[i] + if dttr.task == nil || + dttr.txIndex != dttr.task.index || + dttr.task.index == task.index || + dttr.step == dttRoutineStepNone || + dttr.step == dttRoutineStepFinished || + dttr.step == dttRoutineStepReadyForSerial || + (dttr.task.index > task.index && dttr.task.prevTaskIndex >= task.index) || + (dttr.task.index < task.index && task.prevTaskIndex >= dttr.task.index) { + continue + } + conflict := dttm.hasConflict(dttr.task, task) + if dttr.task.isEvm && task.isEvm && !conflict { + continue + } + + if !dttr.task.isEvm || conflict { + if dttr.task.index < task.index && dttr.task.index > task.prevTaskIndex { + task.prevTaskIndex = dttr.task.index + } + } else if !task.isEvm || conflict { + if dttr.task.index > task.index && dttr.task.prevTaskIndex < task.index { + dttr.task.prevTaskIndex = task.index + } + } + } +} + +func (dttm *DTTManager) runAnte(task *DeliverTxTask) error { + info := task.info + var anteCtx sdk.Context + anteCtx, info.msCacheAnte = dttm.app.cacheTxContext(info.ctx, info.txBytes) + anteCtx.SetEventManager(sdk.NewEventManager()) + + newCtx, err := dttm.app.anteHandler(anteCtx, info.tx, false) // NewAnteHandler + + ms := info.ctx.MultiStore() + + if !newCtx.IsZero() { + info.ctx = newCtx + info.ctx.SetMultiStore(ms) + } + // GasMeter expected to be set in AnteHandler + info.gasWanted = info.ctx.GasMeter().Limit() + if err != nil { + return err + } + + return nil +} + +func (dttm *DTTManager) serialRoutine() { + keepAliveTicker := time.NewTicker(keepAliveIntervalMS * time.Millisecond) + nextTaskRoutine := int8(-1) + for { + select { + case routineIndex := <-dttm.serialCh: + rt := dttm.dttRoutineList[routineIndex] + task := rt.task + if task.index != dttm.serialIndex+1 { + break + } + keepAliveTicker.Stop() + nextTaskRoutine = -1 + + dttm.serialIndex = task.index + dttm.serialTask = task + rt.step = dttRoutineStepSerial + dttm.serialExecution() + rt.step = dttRoutineStepFinished + dttm.serialTask = nil + + if dttm.isTxsAllExecuted(rt) { + return + } + + // check whether there are ante-finished task + nextTaskRoutine = dttm.setRerunAndNextSerial(task) + if nextTaskRoutine >= 0 { + keepAliveTicker.Reset(keepAliveIntervalMS * time.Microsecond) + } + + case <-keepAliveTicker.C: + if dttm.serialTask == nil && nextTaskRoutine >= 0 && len(dttm.serialCh) == 0 { + dttr := dttm.dttRoutineList[nextTaskRoutine] + if dttr.task.index == dttm.serialIndex+1 && dttr.readyForSerialExecution() { + keepAliveTicker.Stop() + dttm.serialCh <- nextTaskRoutine + } + } + } + } +} + +func (dttm *DTTManager) isTxsAllExecuted(rt *dttRoutine) bool { + if dttm.serialIndex == dttm.totalCount-1 { + count := len(dttm.dttRoutineList) + for i := 0; i < count && i < dttm.totalCount; i++ { + dttr := dttm.dttRoutineList[i] + dttr.stop() + } + + dttm.done <- 0 + close(dttm.serialCh) + return true + } + + // make new task for this routine + nextIndex := dttm.maxConcurrentNum + rt.task.index + if nextIndex < dttm.totalCount { + rt.makeNewTask(dttm.txs[nextIndex], nextIndex) + } + return false +} + +func (dttm *DTTManager) setRerunAndNextSerial(task *DeliverTxTask) int8 { + count := len(dttm.dttRoutineList) + rerunRoutines := make([]*dttRoutine, 0) + updateFeeAcc := false + nextTaskRoutine := int8(-1) + for i := 0; i < count; i++ { + dttr := dttm.dttRoutineList[i] + notCare, needRerun := compareTasks(dttr, task) + if notCare { + continue + } + + if needRerun { + if !dttr.task.isEvm && dttr.task.index == task.index+1 { + updateFeeAcc = true + } + dttr.task.prevTaskIndex = task.index + rerunRoutines = append(rerunRoutines, dttr) + } else if dttr.task.index == dttm.serialIndex+1 { + nextTaskRoutine = dttr.index + if dttr.readyForSerialExecution() { + dttm.serialCh <- nextTaskRoutine + nextTaskRoutine = -1 + } + } + } + + if updateFeeAcc && dttm.app.updateFeeCollectorAccHandler != nil { + // should update the balance of FeeCollector's account when run non-evm tx + // which uses non-infiniteGasMeter during AnteHandleChain + ctx, cache := dttm.app.cacheTxContext(dttm.app.getContextForTx(runTxModeDeliver, []byte{}), []byte{}) + if err := dttm.app.updateFeeCollectorAccHandler(ctx, dttm.app.feeForCollector); err != nil { + panic(err) + } + cache.Write() + } + for _, rerunRoutine := range rerunRoutines { + rerunRoutine.shouldRerun(task.index, -1) + } + return nextTaskRoutine +} + +func compareTasks(target *dttRoutine, base *DeliverTxTask) (notCare bool, needRerun bool) { + notReady := target.notReadyForCheckRerun(base.index) + if notReady || + target.task.prevTaskIndex > base.index || + (!target.task.isEvm && target.task.index > base.index+1) { + notCare = true + return + } + + if target.task.prevTaskIndex == base.index || + !base.isEvm || + (!target.task.isEvm && target.task.index == base.index+1) || + target.task.from == base.from || + target.task.from == base.to { + needRerun = true + } + return +} + +func (dttm *DTTManager) serialExecution() { + info := dttm.serialTask.info + handler := info.handler + + err := dttm.serialHandleBeforeRunMsg(info) + if err != nil { + return + } + + defer func() { + if r := recover(); r != nil { + err = dttm.app.runTx_defer_recover(r, info) + info.msCache = nil + info.result = nil + } + info.gInfo = sdk.GasInfo{GasWanted: info.gasWanted, GasUsed: info.ctx.GasMeter().GasConsumed()} + + var resp abci.ResponseDeliverTx + if err != nil { + resp = sdkerrors.ResponseDeliverTx(err, info.gInfo.GasWanted, info.gInfo.GasUsed, dttm.app.trace) + } else { + resp = abci.ResponseDeliverTx{ + GasWanted: int64(info.gInfo.GasWanted), + GasUsed: int64(info.gInfo.GasUsed), + Log: info.result.Log, + Data: info.result.Data, + Events: info.result.Events.ToABCIEvents(), + } + } + dttm.dealWithResponse(resp) + }() + + defer handler.handleDeferGasConsumed(info) + + mode := runTxModeDeliver + defer func() { + dttm.app.pin(Refund, true, mode) + defer dttm.app.pin(Refund, false, mode) + handler.handleDeferRefund(info) + }() + + // execute runMsgs + dttm.app.pin(RunMsg, true, mode) + err = handler.handleRunMsg(info) + dttm.app.pin(RunMsg, false, mode) +} + +func (dttm *DTTManager) serialHandleBeforeRunMsg(info *runTxInfo) error { + // execute anteHandler failed + if dttm.serialTask.err != nil { + txRs := sdkerrors.ResponseDeliverTx(dttm.serialTask.err, 0, 0, dttm.app.trace) + dttm.dealWithResponse(txRs) + return dttm.serialTask.err + } + + err := info.handler.handleGasConsumed(info) + if err != nil { + txRs := sdkerrors.ResponseDeliverTx(err, 0, 0, dttm.app.trace) + dttm.dealWithResponse(txRs) + return err + } + + info.msCacheAnte.Write() + info.ctx.Cache().Write(true) + + dttm.app.UpdateFeeForCollector(dttm.serialTask.fee, true) + return nil +} + +func (dttm *DTTManager) dealWithResponse(txRs abci.ResponseDeliverTx) { + dttm.txResponses[dttm.serialTask.index] = &txRs + if txRs.Code != abci.CodeTypeOK { + dttm.invalidTxs++ + } +} + +func (dttm *DTTManager) accountUpdated(acc exported.Account) { + address := hex.EncodeToString(acc.GetAddress()) + + num := len(dttm.dttRoutineList) + serialIndex := dttm.serialIndex + for i := 0; i < num; i++ { + dttr := dttm.dttRoutineList[i] + if dttr.task == nil || dttr.txIndex != dttr.task.index || !dttr.needToRerunWhenContextChanged() || dttr.task.from != address { + continue + } + dttr.shouldRerun(-1, serialIndex) + } +} + +//------------------------------------------------------------- + +func (app *BaseApp) DeliverTxsConcurrent(txs [][]byte) []*abci.ResponseDeliverTx { + if app.deliverTxsMgr == nil { + app.deliverTxsMgr = NewDTTManager(app) + } + + app.deliverTxsMgr.deliverTxs(txs) + + if len(txs) > 0 { + //waiting for call back + <-app.deliverTxsMgr.done + close(app.deliverTxsMgr.done) + } + app.logger.Info("InvalidTxs", "count", app.deliverTxsMgr.invalidTxs) + trace.GetElapsedInfo().AddInfo(trace.InvalidTxs, fmt.Sprintf("%d", app.deliverTxsMgr.invalidTxs)) + + return app.deliverTxsMgr.txResponses +} + +func (app *BaseApp) OnAccountUpdated(acc exported.Account, updateState bool) { + if app.deliverTxsMgr != nil && updateState { + app.deliverTxsMgr.accountUpdated(acc) + } +} diff --git a/libs/cosmos-sdk/baseapp/baseapp_runtx.go b/libs/cosmos-sdk/baseapp/baseapp_runtx.go index 6b1ac162d0..5119a6a55b 100644 --- a/libs/cosmos-sdk/baseapp/baseapp_runtx.go +++ b/libs/cosmos-sdk/baseapp/baseapp_runtx.go @@ -125,8 +125,8 @@ func (app *BaseApp) runtxWithInfo(info *runTxInfo, mode runTxMode, txBytes []byt } app.pin(RunAnte, false, mode) - if app.getTxFee != nil && mode == runTxModeDeliver { - fee, _, _, _ := app.getTxFee(info.ctx, tx, true) + if app.getTxFeeAndFromHandler != nil && mode == runTxModeDeliver { + fee, _, _, _, _ := app.getTxFeeAndFromHandler(info.ctx, tx) app.UpdateFeeForCollector(fee, true) } @@ -361,7 +361,7 @@ func useCache(mode runTxMode) bool { if !sdk.UseCache { return false } - if mode == runTxModeDeliver { + if mode == runTxModeDeliver || mode == runTxModeDeliverPartConcurrent { return true } return false diff --git a/libs/cosmos-sdk/baseapp/options.go b/libs/cosmos-sdk/baseapp/options.go index 68fcf46192..2ed0db2adc 100644 --- a/libs/cosmos-sdk/baseapp/options.go +++ b/libs/cosmos-sdk/baseapp/options.go @@ -163,12 +163,11 @@ func (app *BaseApp) SetRouter(router sdk.Router) { } // SetParallelTxHandler some resources for parallel txs -func (app *BaseApp) SetParallelTxHandlers(feeCollectt sdk.UpdateFeeCollectorAccHandler, txFee sdk.GetTxFeeHandler, fixLog sdk.LogFix) { +func (app *BaseApp) SetParallelTxHandlers(feeCollectt sdk.UpdateFeeCollectorAccHandler, fixLog sdk.LogFix) { if app.sealed { panic("SetPallTxHandler() on sealed BaseApp") } app.updateFeeCollectorAccHandler = feeCollectt - app.getTxFee = txFee app.logFix = fixLog } @@ -178,3 +177,10 @@ func (app *BaseApp) SetPreDeliverTxHandler(handler sdk.PreDeliverTxHandler) { } app.preDeliverTxHandler = handler } + +func (app *BaseApp) SetPartialConcurrentHandlers(etf sdk.GetTxFeeAndFromHandler){ + if app.sealed { + panic("SetPartialConcurrentHandlers() on sealed BaseApp") + } + app.getTxFeeAndFromHandler = etf +} diff --git a/libs/cosmos-sdk/server/start_okchain.go b/libs/cosmos-sdk/server/start_okchain.go index fb52d216c6..3b7e20e5d9 100644 --- a/libs/cosmos-sdk/server/start_okchain.go +++ b/libs/cosmos-sdk/server/start_okchain.go @@ -227,7 +227,8 @@ func RegisterServerFlags(cmd *cobra.Command) *cobra.Command { viper.BindPFlag(FlagEvmImportPath, cmd.Flags().Lookup(FlagEvmImportPath)) viper.BindPFlag(FlagGoroutineNum, cmd.Flags().Lookup(FlagGoroutineNum)) - cmd.Flags().Bool(state.FlagParalleledTx, false, "Enable Parallel Tx") + cmd.Flags().Int(state.FlagDeliverTxsExecMode, 0, "Execution mode for deliver txs") + cmd.Flags().Int(state.FlagDeliverTxsConcurrentNum, 0, "concurrent number for deliver txs when using partial-concurrent mode") cmd.Flags().String(FlagListenAddr, "tcp://0.0.0.0:26659", "EVM RPC and cosmos-sdk REST API listen address.") cmd.Flags().String(FlagUlockKey, "", "Select the keys to unlock on the RPC server") diff --git a/libs/cosmos-sdk/types/context.go b/libs/cosmos-sdk/types/context.go index e222f5797e..3360b42023 100644 --- a/libs/cosmos-sdk/types/context.go +++ b/libs/cosmos-sdk/types/context.go @@ -23,22 +23,22 @@ but please do not over-use it. We try to keep all data structured and standard additions here would be better just to add to the Context struct */ type Context struct { - ctx context.Context - ms MultiStore - header *abci.Header - chainID string - from string - txBytes []byte - logger log.Logger - voteInfo []abci.VoteInfo - gasMeter GasMeter - blockGasMeter GasMeter - isDeliver bool - checkTx bool - recheckTx bool // if recheckTx == true, then checkTx must also be true - wrappedCheckTx bool // if wrappedCheckTx == true, then checkTx must also be true - traceTx bool // traceTx is set true for trace tx and its predesessors , traceTx was set in app.beginBlockForTrace() - traceTxLog bool // traceTxLog is used to create trace logger for evm , traceTxLog is set to true when only tracing target tx (its predesessors will set false), traceTxLog is set before runtx + ctx context.Context + ms MultiStore + header *abci.Header + chainID string + from string + txBytes []byte + logger log.Logger + voteInfo []abci.VoteInfo + gasMeter GasMeter + blockGasMeter GasMeter + isDeliver bool + checkTx bool + recheckTx bool // if recheckTx == true, then checkTx must also be true + wrappedCheckTx bool // if wrappedCheckTx == true, then checkTx must also be true + traceTx bool // traceTx is set true for trace tx and its predesessors , traceTx was set in app.beginBlockForTrace() + traceTxLog bool // traceTxLog is used to create trace logger for evm , traceTxLog is set to true when only tracing target tx (its predesessors will set false), traceTxLog is set before runtx traceTxConfigBytes []byte // traceTxConfigBytes is used to save traceTxConfig, passed from api to x/evm minGasPrice DecCoins consParams *abci.ConsensusParams diff --git a/libs/cosmos-sdk/types/handler.go b/libs/cosmos-sdk/types/handler.go index ca7857b068..505876e2c7 100644 --- a/libs/cosmos-sdk/types/handler.go +++ b/libs/cosmos-sdk/types/handler.go @@ -17,7 +17,7 @@ type UpdateFeeCollectorAccHandler func(ctx Context, balance Coins) error type LogFix func(logIndex []int, errs []error) (logs [][]byte) -type GetTxFeeHandler func(ctx Context, tx Tx, verifySig bool) (Coins, bool, string, string) +type GetTxFeeAndFromHandler func(ctx Context, tx Tx) (Coins, bool, string, string, error) // AnteDecorator wraps the next AnteHandler to perform custom pre- and post-processing. type AnteDecorator interface { diff --git a/libs/cosmos-sdk/x/auth/keeper/account.go b/libs/cosmos-sdk/x/auth/keeper/account.go index 78ab04fd02..a36ee4a7ed 100644 --- a/libs/cosmos-sdk/x/auth/keeper/account.go +++ b/libs/cosmos-sdk/x/auth/keeper/account.go @@ -78,7 +78,7 @@ func (ak AccountKeeper) GetAllAccounts(ctx sdk.Context) (accounts []exported.Acc } // SetAccount implements sdk.AccountKeeper. -func (ak AccountKeeper) SetAccount(ctx sdk.Context, acc exported.Account) { +func (ak AccountKeeper) SetAccount(ctx sdk.Context, acc exported.Account, updateState ...bool) { addr := acc.GetAddress() store := ctx.KVStore(ak.key) bz, err := ak.cdc.MarshalBinaryBareWithRegisteredMarshaller(acc) @@ -95,7 +95,8 @@ func (ak AccountKeeper) SetAccount(ctx sdk.Context, acc exported.Account) { if ak.observers != nil { for _, observer := range ak.observers { if observer != nil { - observer.OnAccountUpdated(acc) + updated := len(updateState) > 0 && updateState[0] + observer.OnAccountUpdated(acc, updated) } } } diff --git a/libs/cosmos-sdk/x/auth/keeper/ante_okchain.go b/libs/cosmos-sdk/x/auth/keeper/ante_okchain.go index 432e0c9e99..7bea2da5ef 100644 --- a/libs/cosmos-sdk/x/auth/keeper/ante_okchain.go +++ b/libs/cosmos-sdk/x/auth/keeper/ante_okchain.go @@ -10,7 +10,7 @@ type ValidateMsgHandler func(ctx sdk.Context, msgs []sdk.Msg) sdk.Result type IsSystemFreeHandler func(ctx sdk.Context, msgs []sdk.Msg) bool type ObserverI interface { - OnAccountUpdated(acc exported.Account) + OnAccountUpdated(acc exported.Account, updateState bool) } func (k *AccountKeeper) SetObserverKeeper(observer ObserverI) { diff --git a/libs/cosmos-sdk/x/bank/internal/types/expected_keepers.go b/libs/cosmos-sdk/x/bank/internal/types/expected_keepers.go index a3c54cab5b..19b35cc3dd 100644 --- a/libs/cosmos-sdk/x/bank/internal/types/expected_keepers.go +++ b/libs/cosmos-sdk/x/bank/internal/types/expected_keepers.go @@ -12,7 +12,7 @@ type AccountKeeper interface { GetAccount(ctx sdk.Context, addr sdk.AccAddress) exported.Account GetAllAccounts(ctx sdk.Context) []exported.Account - SetAccount(ctx sdk.Context, acc exported.Account) + SetAccount(ctx sdk.Context, acc exported.Account, updateState ...bool) IterateAccounts(ctx sdk.Context, process func(exported.Account) bool) } diff --git a/libs/cosmos-sdk/x/genutil/types/expected_keepers.go b/libs/cosmos-sdk/x/genutil/types/expected_keepers.go index 0c93d2040b..abaa361efa 100644 --- a/libs/cosmos-sdk/x/genutil/types/expected_keepers.go +++ b/libs/cosmos-sdk/x/genutil/types/expected_keepers.go @@ -18,7 +18,7 @@ type StakingKeeper interface { // AccountKeeper defines the expected account keeper (noalias) type AccountKeeper interface { NewAccount(sdk.Context, authexported.Account) authexported.Account - SetAccount(sdk.Context, authexported.Account) + SetAccount(sdk.Context, authexported.Account, ...bool) IterateAccounts(ctx sdk.Context, process func(authexported.Account) (stop bool)) } diff --git a/libs/cosmos-sdk/x/supply/internal/types/expected_keepers.go b/libs/cosmos-sdk/x/supply/internal/types/expected_keepers.go index 11dca7d708..728ec46e6e 100644 --- a/libs/cosmos-sdk/x/supply/internal/types/expected_keepers.go +++ b/libs/cosmos-sdk/x/supply/internal/types/expected_keepers.go @@ -9,7 +9,7 @@ import ( type AccountKeeper interface { IterateAccounts(ctx sdk.Context, process func(exported.Account) (stop bool)) GetAccount(sdk.Context, sdk.AccAddress) exported.Account - SetAccount(sdk.Context, exported.Account) + SetAccount(sdk.Context, exported.Account, ...bool) NewAccount(sdk.Context, exported.Account) exported.Account } diff --git a/libs/ibc-go/testing/simapp/app.go b/libs/ibc-go/testing/simapp/app.go index 5d36e4c679..a8b3a684e7 100644 --- a/libs/ibc-go/testing/simapp/app.go +++ b/libs/ibc-go/testing/simapp/app.go @@ -556,7 +556,6 @@ func NewSimApp( app.SetEndBlocker(app.EndBlocker) app.SetGasRefundHandler(refund.NewGasRefundHandler(app.AccountKeeper, app.SupplyKeeper)) app.SetAccHandler(NewAccHandler(app.AccountKeeper)) - //app.SetParallelTxHandlers(updateFeeCollectorHandler(app.BankKeeper, app.SupplyKeeper), evmTxFeeHandler(), fixLogForParallelTxHandler(app.EvmKeeper)) if loadLatest { err := app.LoadLatestVersion(app.keys[bam.MainStoreKey]) diff --git a/libs/tendermint/abci/client/client.go b/libs/tendermint/abci/client/client.go index 5108c27e82..0d5aeb0331 100644 --- a/libs/tendermint/abci/client/client.go +++ b/libs/tendermint/abci/client/client.go @@ -49,6 +49,7 @@ type Client interface { BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error) EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error) ParallelTxs([][]byte, bool) []*types.ResponseDeliverTx + DeliverTxsConcurrent([][]byte) []*types.ResponseDeliverTx } //---------------------------------------- diff --git a/libs/tendermint/abci/client/local_client.go b/libs/tendermint/abci/client/local_client.go index c13499d47e..33458a326f 100644 --- a/libs/tendermint/abci/client/local_client.go +++ b/libs/tendermint/abci/client/local_client.go @@ -183,6 +183,12 @@ func (app *localClient) ParallelTxs(txs [][]byte, onlyCalSender bool) []*types.R return app.Application.ParallelTxs(txs, onlyCalSender) } +func (app *localClient) DeliverTxsConcurrent(txs [][]byte) []*types.ResponseDeliverTx { + app.mtx.Lock() + defer app.mtx.Unlock() + return app.Application.DeliverTxsConcurrent(txs) +} + //------------------------------------------------------- func (app *localClient) FlushSync() error { diff --git a/libs/tendermint/abci/example/kvstore/persistent_kvstore.go b/libs/tendermint/abci/example/kvstore/persistent_kvstore.go index 5ab8fb6782..5eac81d2c7 100644 --- a/libs/tendermint/abci/example/kvstore/persistent_kvstore.go +++ b/libs/tendermint/abci/example/kvstore/persistent_kvstore.go @@ -156,6 +156,10 @@ func (app *PersistentKVStoreApplication) ParallelTxs(_ [][]byte, _ bool) []*type return nil } +func (app *PersistentKVStoreApplication) DeliverTxsConcurrent(_ [][]byte) []*types.ResponseDeliverTx { + return nil +} + //--------------------------------------------- // update validators diff --git a/libs/tendermint/abci/types/application.go b/libs/tendermint/abci/types/application.go index cb797f682b..25bd9f222f 100644 --- a/libs/tendermint/abci/types/application.go +++ b/libs/tendermint/abci/types/application.go @@ -31,6 +31,7 @@ type Application interface { PreDeliverRealTx([]byte) TxEssentials // DeliverRealTx deliver tx returned by PreDeliverRealTx, if PreDeliverRealTx returns nil, DeliverRealTx SHOULD NOT be called DeliverRealTx(TxEssentials) ResponseDeliverTx + DeliverTxsConcurrent(txs [][]byte) []*ResponseDeliverTx } //------------------------------------------------------- @@ -93,6 +94,10 @@ func (a BaseApplication) ParallelTxs(_ [][]byte, onlyCalSender bool) []*Response return nil } +func (a BaseApplication) DeliverTxsConcurrent(_ [][]byte) []*ResponseDeliverTx { + return nil +} + //------------------------------------------------------- // GRPCApplication is a GRPC wrapper for Application diff --git a/libs/tendermint/config/dynamic_config_okchain.go b/libs/tendermint/config/dynamic_config_okchain.go index 2d8ae890be..685924183c 100644 --- a/libs/tendermint/config/dynamic_config_okchain.go +++ b/libs/tendermint/config/dynamic_config_okchain.go @@ -17,7 +17,7 @@ type IDynamicConfig interface { GetCsTimeoutPrecommit() time.Duration GetCsTimeoutPrecommitDelta() time.Duration GetEnableWtx() bool - GetParalleledTxEnable() bool + GetDeliverTxsExecuteMode() int } var DynamicConfig IDynamicConfig = MockDynamicConfig{} @@ -79,6 +79,6 @@ func (d MockDynamicConfig) GetCsTimeoutPrecommitDelta() time.Duration { func (d MockDynamicConfig) GetEnableWtx() bool { return false } -func (d MockDynamicConfig) GetParalleledTxEnable() bool { - return false +func (d MockDynamicConfig) GetDeliverTxsExecuteMode() int { + return 0 } diff --git a/libs/tendermint/proxy/app_conn.go b/libs/tendermint/proxy/app_conn.go index e50cd37d83..77771f5f9f 100644 --- a/libs/tendermint/proxy/app_conn.go +++ b/libs/tendermint/proxy/app_conn.go @@ -22,6 +22,7 @@ type AppConnConsensus interface { CommitSync(types.RequestCommit) (*types.ResponseCommit, error) SetOptionAsync(req types.RequestSetOption) *abcicli.ReqRes ParallelTxs([][]byte, bool) []*types.ResponseDeliverTx + DeliverTxsConcurrent([][]byte) []*types.ResponseDeliverTx SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) } @@ -151,6 +152,10 @@ func (app *appConnConsensus) ParallelTxs(txs [][]byte, onlyCalSender bool) []*ty return app.appConn.ParallelTxs(txs, onlyCalSender) } +func (app *appConnConsensus) DeliverTxsConcurrent(txs [][]byte) []*types.ResponseDeliverTx { + return app.appConn.DeliverTxsConcurrent(txs) +} + //------------------------------------------------ // Implements AppConnQuery (subset of abcicli.Client) diff --git a/libs/tendermint/state/execution.go b/libs/tendermint/state/execution.go index 79c9849367..3843851858 100644 --- a/libs/tendermint/state/execution.go +++ b/libs/tendermint/state/execution.go @@ -20,6 +20,25 @@ import ( ) //----------------------------------------------------------------------------- +type ( + // Enum mode for executing [deliverTx, ...] + DeliverTxsExecMode int +) + +const ( + DeliverTxsExecModeSerial DeliverTxsExecMode = iota // execute [deliverTx,...] sequentially + DeliverTxsExecModePartConcurrent // execute [deliverTx,...] partially-concurrent + DeliverTxsExecModeParallel // execute [deliverTx,...] parallel + + // There are three modes. + // 0: execute [deliverTx,...] sequentially (default) + // 1: execute [deliverTx,...] partially-concurrent + // 2: execute [deliverTx,...] parallel + FlagDeliverTxsExecMode = "deliver-txs-mode" + + FlagDeliverTxsConcurrentNum = "deliver-txs-concurrent-num" +) + // BlockExecutor handles block execution and state updates. // It exposes ApplyBlock(), which validates & executes the block, updates state w/ ABCI responses, // then commits and updates the mempool atomically, then saves state. @@ -302,9 +321,15 @@ func (blockExec *BlockExecutor) runAbci(block *types.Block, deltaInfo *DeltaInfo db: blockExec.db, proxyApp: blockExec.proxyApp, } - if cfg.DynamicConfig.GetParalleledTxEnable() { + mode := DeliverTxsExecMode(cfg.DynamicConfig.GetDeliverTxsExecuteMode()) + switch mode { + case DeliverTxsExecModeSerial: + abciResponses, err = execBlockOnProxyApp(ctx) + case DeliverTxsExecModePartConcurrent: + abciResponses, err = execBlockOnProxyAppPartConcurrent(blockExec.logger, blockExec.proxyApp, block, blockExec.db) + case DeliverTxsExecModeParallel: abciResponses, err = execBlockOnProxyAppAsync(blockExec.logger, blockExec.proxyApp, block, blockExec.db) - } else { + default: abciResponses, err = execBlockOnProxyApp(ctx) } } diff --git a/libs/tendermint/state/execution_concurrent.go b/libs/tendermint/state/execution_concurrent.go new file mode 100644 index 0000000000..6833f490cd --- /dev/null +++ b/libs/tendermint/state/execution_concurrent.go @@ -0,0 +1,45 @@ +package state + +import ( + abci "github.com/okex/exchain/libs/tendermint/abci/types" + "github.com/okex/exchain/libs/tendermint/libs/log" + "github.com/okex/exchain/libs/tendermint/proxy" + "github.com/okex/exchain/libs/tendermint/types" + dbm "github.com/okex/exchain/libs/tm-db" +) + +// Executes block's transactions on proxyAppConn. +// Returns a list of transaction results and updates to the validator set +func execBlockOnProxyAppPartConcurrent(logger log.Logger, + proxyAppConn proxy.AppConnConsensus, + block *types.Block, + stateDB dbm.DB, + ) (*ABCIResponses, error) { + + abciResponses := NewABCIResponses(block) + commitInfo, byzVals := getBeginBlockValidatorInfo(block, stateDB) + + // Begin block + var err error + abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{ + Hash: block.Hash(), + Header: types.TM2PB.Header(&block.Header), + LastCommitInfo: commitInfo, + ByzantineValidators: byzVals, + }) + if err != nil { + logger.Error("Error in proxyAppConn.BeginBlock", "err", err) + return nil, err + } + + // Run txs of block. + abciResponses.DeliverTxs = proxyAppConn.DeliverTxsConcurrent(transTxsToBytes(block.Txs)) + + abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{Height: block.Height}) + if err != nil { + logger.Error("Error in proxyAppConn.EndBlock", "err", err) + return nil, err + } + + return abciResponses, nil +} diff --git a/libs/tendermint/state/execution_parallel.go b/libs/tendermint/state/execution_parallel.go index 304afbc550..5cbb69ffae 100644 --- a/libs/tendermint/state/execution_parallel.go +++ b/libs/tendermint/state/execution_parallel.go @@ -10,10 +10,6 @@ import ( dbm "github.com/okex/exchain/libs/tm-db" ) -var ( - FlagParalleledTx = "paralleled-tx" -) - func execBlockOnProxyAppAsync( logger log.Logger, proxyAppConn proxy.AppConnConsensus, diff --git a/libs/tendermint/state/execution_task.go b/libs/tendermint/state/execution_task.go index 84bf287e9b..fc3e62cdf0 100644 --- a/libs/tendermint/state/execution_task.go +++ b/libs/tendermint/state/execution_task.go @@ -72,9 +72,15 @@ func (t *executionTask) run() { var abciResponses *ABCIResponses var err error - if cfg.DynamicConfig.GetParalleledTxEnable() { + mode := DeliverTxsExecMode(cfg.DynamicConfig.GetDeliverTxsExecuteMode()) + switch mode { + case DeliverTxsExecModeSerial: + abciResponses, err = execBlockOnProxyApp(t) + case DeliverTxsExecModePartConcurrent: + abciResponses, err = execBlockOnProxyAppPartConcurrent(t.logger, t.proxyApp, t.block, t.db) + case DeliverTxsExecModeParallel: abciResponses, err = execBlockOnProxyAppAsync(t.logger, t.proxyApp, t.block, t.db) - } else { + default: abciResponses, err = execBlockOnProxyApp(t) } diff --git a/x/common/analyzer/analyzer.go b/x/common/analyzer/analyzer.go index d817fe7d6a..dc70433aed 100644 --- a/x/common/analyzer/analyzer.go +++ b/x/common/analyzer/analyzer.go @@ -77,7 +77,7 @@ func init() { func newAnalys(height int64) { if isParalleledTxOn == nil { isParalleledTxOn = new(bool) - *isParalleledTxOn = !viper.GetBool(sm.FlagParalleledTx) + *isParalleledTxOn = sm.DeliverTxsExecMode(viper.GetInt(sm.FlagDeliverTxsExecMode)) != sm.DeliverTxsExecModeParallel } if singleAnalys == nil { singleAnalys = &analyer{} diff --git a/x/evm/keeper/keeper.go b/x/evm/keeper/keeper.go index 91671d697e..a1f9533753 100644 --- a/x/evm/keeper/keeper.go +++ b/x/evm/keeper/keeper.go @@ -133,7 +133,7 @@ func NewSimulateKeeper( } } -func (k Keeper) OnAccountUpdated(acc auth.Account) { +func (k Keeper) OnAccountUpdated(acc auth.Account, updateState bool) { account := acc.GetAddress() k.Watcher.DeleteAccount(account) } diff --git a/x/evm/types/expected_keepers.go b/x/evm/types/expected_keepers.go index a303ac6c5b..92d3fe6849 100644 --- a/x/evm/types/expected_keepers.go +++ b/x/evm/types/expected_keepers.go @@ -13,7 +13,7 @@ type AccountKeeper interface { GetAllAccounts(ctx sdk.Context) (accounts []authexported.Account) IterateAccounts(ctx sdk.Context, cb func(account authexported.Account) bool) GetAccount(ctx sdk.Context, addr sdk.AccAddress) authexported.Account - SetAccount(ctx sdk.Context, account authexported.Account) + SetAccount(ctx sdk.Context, account authexported.Account, updateState ...bool) RemoveAccount(ctx sdk.Context, account authexported.Account) SetObserverKeeper(observer auth.ObserverI) } diff --git a/x/evm/types/statedb.go b/x/evm/types/statedb.go index aac2dd1b5f..3e73d1b987 100644 --- a/x/evm/types/statedb.go +++ b/x/evm/types/statedb.go @@ -782,7 +782,7 @@ func (csdb *CommitStateDB) Commit(deleteEmptyObjects bool) (ethcmn.Hash, error) } // update the object in the KVStore - if err := csdb.updateStateObject(stateEntry.stateObject); err != nil { + if err := csdb.updateStateObject(stateEntry.stateObject, true); err != nil { return ethcmn.Hash{}, err } } @@ -821,7 +821,7 @@ func (csdb *CommitStateDB) Finalise(deleteEmptyObjects bool) error { // Set all the dirty state storage items for the state object in the // KVStore and finally set the account in the account mapper. stateEntry.stateObject.commitState() - if err := csdb.updateStateObject(stateEntry.stateObject); err != nil { + if err := csdb.updateStateObject(stateEntry.stateObject, false); err != nil { return err } } @@ -851,7 +851,7 @@ func (csdb *CommitStateDB) IntermediateRoot(deleteEmptyObjects bool) (ethcmn.Has } // updateStateObject writes the given state object to the store. -func (csdb *CommitStateDB) updateStateObject(so *stateObject) error { +func (csdb *CommitStateDB) updateStateObject(so *stateObject, fromCommit bool) error { // NOTE: we don't use sdk.NewCoin here to avoid panic on test importer's genesis newBalance := sdk.Coin{Denom: sdk.DefaultBondDenom, Amount: sdk.NewDecFromBigIntWithPrec(so.Balance(), sdk.Precision)} // int2dec if !newBalance.IsValid() { @@ -873,7 +873,8 @@ func (csdb *CommitStateDB) updateStateObject(so *stateObject) error { return err } - csdb.accountKeeper.SetAccount(csdb.ctx, so.account) + updateState := fromCommit && csdb.ctx.IsDeliver() + csdb.accountKeeper.SetAccount(csdb.ctx, so.account, updateState) if !csdb.ctx.IsCheckTx() { if csdb.Watcher.Enabled() { csdb.Watcher.SaveAccount(so.account, false)