Skip to content

Commit

Permalink
Merge PR: partial concurrent for DeliverTxs (#1801)
Browse files Browse the repository at this point in the history
* remove setUpdateCount

* remove useless code

* remove Mutex in DTTManager

* remove unused import

* check whether getTxFee is nil before call it

* add log

* add log for fee-collector

* add break logic for replay

* add log

* add log

* remove SetFeeCollector log

* print info of commistStore

* print commitstore info

* add log

* print SetCoins

* remove unused import

* remove 01 for addr

* remove log

* print SetCoins

* print GasConsumed

* check GlobalHeight before print log

* print GasConsumed

* close multi cache

* remove log

* add log

* add log

* add log

* add log

* change deliverTxsMode when do mock replay

* remove log

* deal with empty block

* rerun no evm tx

* add log

* remove log

* add log

* print deliverState.ms

* add log info

* print account info

* create module account for feeCollector before setcoins

* add log

* set nextTaskRoutine whether ready or not

* optimize keepAliveTicker

* updateFeeCollectorAccHandler in EndBlock

* Stop keepAliveTicker

* rerun all concurrent txs after cosmos tx finished

* remove GetModuleAccount in updateFeeCollectorHandler

* add log

* add dttRoutineStepWaitRerun

* sleep some time to wait account update finished

* set prevTaskIndex on accountUpdated

* stop keepAliveTicker before running serial

* use rerunRoutines list

* add log

* add condition for ExtractNextSerialFromTicker

* change keepAliveIntervalMS

* reset nextTaskRoutine

* remove reset

* add condition for stop dttRoutine

* add log

* add log

* disable FlagMultiCache

* fix conflicts

* print commitids

* print commitIDs

* print string

* stop replay

* print string

* print setCoins

* print setCoins

* remove dirty code

* add RunMsgFailed log

* add RunMsgFailed info

* add log

* add log

* add log

* add log

* add log

* print gas info

* add log

* add log

* add ConsumeGas

* modify log

* add consumeGas

* remove log

* set context without withXXX

* add log

* use GetAccountGas instead of TryAddGetAccountGas

* remove log

* remove log

* remove log

* add log

* add log

* add log

* remove log

* use defer to catch panic and recover

* optimize judgement for !isEvm

* use AddConsumeGasForSendCoins to consume gas

* remove useless code

* add log

* remove log

* revert defer-s in runtxWithInfo

* add log

* modify consumed gas for SendCoins to FeeCollector

* add log

* add log

* add log

* modify AddConsumeGasForSendCoins

* add log

* modify AddConsumeGasForSendCoins

* use InfiniteGasMeter when GetModuleAccount

* remove log

* check writeGas

* merge from pc-no-fee-collector

* use FlagDeliverTxsExecMode

* remove DeliverTxsExecMode

* modify ut

* remove log

* optimize conditions for not rerun ante

* tidy code

* remove log

* add intItem for conflictPair

* remove log

* add log

* add log

* add log

* add log

* set DeliverTxsMode in replay

* optimize hasConflict

* remove log

* remove all log

* remove log

* remove comments

* rewrite deductFees for ethGasConsume

* update FeeCollector account before executing cosmos tx

* optimize code

* remove GetAccountBinarySize

* remove parameter in ethGasConsume

* rerun cosmos tx before running serial content

* calculate replay duration and print it

* modify ut for anteHandler

* remove suite.app.SupplyKeeper.GetModuleAccount

* add condition for not check rerun

* optimize serialRoutine

* add pin for RunMsg and Refund

* add log

* optimize logic for rerunRoutines

* add FlagDeliverTxsConcurrentNum

* add log

* add log

* do not calculate gas during updateFeeCollectorHandler

* updateFeeCollectorAcc only when fee changed

* remove log

* remove unused import

* remove parameter from SetAccount

* fix ut for SetAccount

* add condition for calling UpdateAccountInStateDB

* use optional parameter in SetAccount (#1900)

* use optional parameter in SetAccount

* optimize accountUpdated func in DTTManager

* first commit

* Mz/pc option (#1901)

* use optional parameter in SetAccount

* optimize accountUpdated func in DTTManager

* optimize OnAccountUpdated

* use normal parameter in OnAccountUpdated

* split functions in DTTManager

* encapsulate conditions

* set gasUsed 0 when ante failed

* add log

* use startRunMsg to judge whether ante failed or not

* add log

* delete unless code

* fix build

* remove log

* remove log

* fix ut bug

* merge defer funcs

* use recover when run ante

* delete preWriteMs

* split defer because of recover

* take care of error from VerifySig

* add comment

* remove a condition for calling updateFeeCollectorAccHandler

* Update store.go

* upd

* upd

* use const

* use cache in runTxModeDeliverPartConcurrent

* splite defer func-s in serialExecution

* fix conflict

* remove check config for FlagDeliverTxsExecMode and FlagParalleledTx

* Update baseapp_runtx.go

* typ

* address to string

* delete modeHandlerDeliverPartConcurrent

* support prerun

* optimize code

* revert changes for ParalleledTx

* remove useless code

* small modify

* fix random panic

* remove FlagParalleledTx

* write ms cache after handleGasConsumed

* modify stopChan

* register dynamic config in repair

* delete evmTxFeeHandler

* fix ut

Co-authored-by: KamiD <[email protected]>
Co-authored-by: Zhong Qiu <[email protected]>
Co-authored-by: chunfengSun <[email protected]>
Co-authored-by: zhongqiuwood <[email protected]>
Co-authored-by: xiangjianmeng <[email protected]>
  • Loading branch information
6 people authored Apr 28, 2022
1 parent 4926092 commit af9a55a
Show file tree
Hide file tree
Showing 44 changed files with 973 additions and 167 deletions.
2 changes: 1 addition & 1 deletion app/ante/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewAnteHandler(ak auth.AccountKeeper, evmKeeper EVMKeeper, sk types.SupplyK
authante.NewValidateBasicDecorator(),
authante.NewValidateMemoDecorator(ak),
authante.NewConsumeGasForTxSizeDecorator(ak),
authante.NewSetPubKeyDecorator(ak), // SetPubKeyDecorator must be called before all signature verification decorators
authante.NewSetPubKeyDecorator(ak), // SetPubKeyDecorator must be called before all signature verification decorators
authante.NewValidateSigCountDecorator(ak),
authante.NewDeductFeeDecorator(ak, sk),
authante.NewSigGasConsumeDecorator(ak, sigGasConsumer),
Expand Down
5 changes: 4 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func NewOKExChainApp(
app.AccountKeeper = auth.NewAccountKeeper(
codecProxy.GetCdc(), keys[auth.StoreKey], app.subspaces[auth.ModuleName], okexchain.ProtoAccount,
)
app.AccountKeeper.SetObserverKeeper(app)

bankKeeper := bank.NewBaseKeeperWithMarshal(
&app.AccountKeeper, codecProxy, app.subspaces[bank.ModuleName], app.ModuleAccountAddrs(),
Expand Down Expand Up @@ -528,8 +529,10 @@ func NewOKExChainApp(
app.SetEndBlocker(app.EndBlocker)
app.SetGasRefundHandler(refund.NewGasRefundHandler(app.AccountKeeper, app.SupplyKeeper))
app.SetAccHandler(NewAccHandler(app.AccountKeeper))
app.SetParallelTxHandlers(updateFeeCollectorHandler(app.BankKeeper, app.SupplyKeeper), evmTxFeeHandler(), fixLogForParallelTxHandler(app.EvmKeeper))
app.SetParallelTxHandlers(updateFeeCollectorHandler(app.BankKeeper, app.SupplyKeeper), fixLogForParallelTxHandler(app.EvmKeeper))
app.SetPreDeliverTxHandler(preDeliverTxHandler(app.AccountKeeper))
app.SetPartialConcurrentHandlers(getTxFeeAndFromHandler(app.AccountKeeper))

if loadLatest {
err := app.LoadLatestVersion(app.keys[bam.MainStoreKey])
if err != nil {
Expand Down
24 changes: 0 additions & 24 deletions app/app_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
ethermint "github.com/okex/exchain/app/types"
sdk "github.com/okex/exchain/libs/cosmos-sdk/types"
"github.com/okex/exchain/libs/cosmos-sdk/x/auth"
authante "github.com/okex/exchain/libs/cosmos-sdk/x/auth/ante"
"github.com/okex/exchain/libs/cosmos-sdk/x/bank"
"github.com/okex/exchain/libs/cosmos-sdk/x/supply"
"github.com/okex/exchain/x/evm"
Expand All @@ -18,29 +17,6 @@ func updateFeeCollectorHandler(bk bank.Keeper, sk supply.Keeper) sdk.UpdateFeeCo
}
}

// evmTxFeeHandler get tx fee for evm tx
func evmTxFeeHandler() sdk.GetTxFeeHandler {

return func(ctx sdk.Context, tx sdk.Tx, verifySig bool) (fee sdk.Coins, isEvm bool, from string, to string) {
if verifySig {
if evmTx, ok := tx.(*evmtypes.MsgEthereumTx); ok {
isEvm = true
_ = evmTx.VerifySig(evmTx.ChainID(), ctx.BlockHeight())
from = evmTx.BaseTx.From
to = ""
if evmTx.To() != nil {
to = evmTx.To().String()
}
}
}
if feeTx, ok := tx.(authante.FeeTx); ok {
fee = feeTx.GetFee()
}

return
}
}

// fixLogForParallelTxHandler fix log for parallel tx
func fixLogForParallelTxHandler(ek *evm.Keeper) sdk.LogFix {
return func(logIndex []int, anteErrs []error) (logs [][]byte) {
Expand Down
38 changes: 38 additions & 0 deletions app/app_partial_concurrent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package app

import (
"encoding/hex"
sdk "github.com/okex/exchain/libs/cosmos-sdk/types"
"github.com/okex/exchain/libs/cosmos-sdk/x/auth"
authante "github.com/okex/exchain/libs/cosmos-sdk/x/auth/ante"
evmtypes "github.com/okex/exchain/x/evm/types"
"strings"
)

// getTxFeeAndFromHandler get tx fee and from
func getTxFeeAndFromHandler(ak auth.AccountKeeper) sdk.GetTxFeeAndFromHandler {
return func(ctx sdk.Context, tx sdk.Tx) (fee sdk.Coins, isEvm bool, from string, to string, err error) {
if evmTx, ok := tx.(*evmtypes.MsgEthereumTx); ok {
isEvm = true
err = evmTx.VerifySig(evmTx.ChainID(), ctx.BlockHeight())
if err != nil {
return
}
fee = evmTx.GetFee()
from = evmTx.BaseTx.From
if len(from) > 2 {
from = strings.ToLower(from[2:])
}
if evmTx.To() != nil {
to = strings.ToLower(evmTx.To().String()[2:])
}
} else if feeTx, ok := tx.(authante.FeeTx); ok {
fee = feeTx.GetFee()
feePayer := feeTx.FeePayer(ctx)
feePayerAcc := ak.GetAccount(ctx, feePayer)
from = hex.EncodeToString(feePayerAcc.GetAddress())
}

return
}
}
2 changes: 1 addition & 1 deletion app/app_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func newTestOkcChainApp(
app.SetEndBlocker(app.EndBlocker)
app.SetGasRefundHandler(refund.NewGasRefundHandler(app.AccountKeeper, app.SupplyKeeper))
app.SetAccHandler(NewAccHandler(app.AccountKeeper))
app.SetParallelTxHandlers(updateFeeCollectorHandler(app.BankKeeper, app.SupplyKeeper), evmTxFeeHandler(), fixLogForParallelTxHandler(app.EvmKeeper))
app.SetParallelTxHandlers(updateFeeCollectorHandler(app.BankKeeper, app.SupplyKeeper), fixLogForParallelTxHandler(app.EvmKeeper))

if loadLatest {
err := app.LoadLatestVersion(app.keys[bam.MainStoreKey])
Expand Down
18 changes: 9 additions & 9 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type OecConfig struct {
// enable-analyzer
enableAnalyzer bool

enableParalleledTx bool
deliverTxsMode int
}

const (
Expand Down Expand Up @@ -196,7 +196,7 @@ func (c *OecConfig) loadFromConfig() {
c.SetNodeKeyWhitelist(viper.GetString(FlagNodeKeyWhitelist))
c.SetEnableWtx(viper.GetBool(FlagEnableWrappedTx))
c.SetEnableAnalyzer(viper.GetBool(analyzer.FlagEnableAnalyzer))
c.SetParalleledTxStatus(viper.GetBool(state.FlagParalleledTx))
c.SetDeliverTxsExecuteMode(viper.GetInt(state.FlagDeliverTxsExecMode))
}

func resolveNodeKeyWhitelist(plain string) []string {
Expand Down Expand Up @@ -364,12 +364,12 @@ func (c *OecConfig) update(key, value interface{}) {
return
}
c.SetEnableAnalyzer(r)
case state.FlagParalleledTx:
r, err := strconv.ParseBool(v)
case state.FlagDeliverTxsExecMode:
r, err := strconv.Atoi(v)
if err != nil {
return
}
c.SetParalleledTxStatus(r)
c.SetDeliverTxsExecuteMode(r)
}
}

Expand Down Expand Up @@ -418,12 +418,12 @@ func (c *OecConfig) GetEnableWtx() bool {
return c.enableWtx
}

func (c *OecConfig) SetParalleledTxStatus(enable bool) {
c.enableParalleledTx = enable
func (c *OecConfig) SetDeliverTxsExecuteMode(mode int) {
c.deliverTxsMode = mode
}

func (c *OecConfig) GetParalleledTxEnable() bool {
return c.enableParalleledTx
func (c *OecConfig) GetDeliverTxsExecuteMode() int {
return c.deliverTxsMode
}

func (c *OecConfig) SetEnableWtx(value bool) {
Expand Down
3 changes: 3 additions & 0 deletions app/repair_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"fmt"
"github.com/okex/exchain/app/config"
"io"
"io/ioutil"
"log"
Expand Down Expand Up @@ -151,6 +152,8 @@ func newRepairApp(logger tmlog.Logger, db dbm.DB, traceStore io.Writer) *repairA

func doRepair(ctx *server.Context, state sm.State, stateStoreDB dbm.DB,
proxyApp proxy.AppConns, startHeight, latestHeight int64, dataDir string) {
config.RegisterDynamicConfig(ctx.Logger.With("module", "config"))

stateCopy := state.Copy()
ctx.Logger.Debug("stateCopy", "state", fmt.Sprintf("%+v", stateCopy))
// construct state for repair
Expand Down
2 changes: 1 addition & 1 deletion app/rpc/namespaces/eth/simulation/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (a AccountKeeperProxy) GetAccount(ctx sdk.Context, addr sdk.AccAddress) aut
return account
}

func (a AccountKeeperProxy) SetAccount(ctx sdk.Context, account authexported.Account) {
func (a AccountKeeperProxy) SetAccount(ctx sdk.Context, account authexported.Account, updateState ...bool) {
acc, ok := account.(types.EthAccount)
if !ok {
return
Expand Down
27 changes: 21 additions & 6 deletions app/utils/sanity/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@ import (
var (
// conflicts flags
startConflictElems = []conflictPair{
// --fast-query conflict with --paralleled-tx=true
// --fast-query conflict with --deliver-txs-mode=1
{
configA: boolItem{name: watcher.FlagFastQuery, value: true},
configB: boolItem{name: state.FlagParalleledTx, value: true},
configB: intItem{name: state.FlagDeliverTxsExecMode, value: 1},
},
// --fast-query conflict with --deliver-txs-mode=2
{
configA: boolItem{name: watcher.FlagFastQuery, value: true},
configB: intItem{name: state.FlagDeliverTxsExecMode, value: 2},
},
// --fast-query conflict with --pruning=nothing
{
Expand All @@ -69,15 +74,25 @@ var (
configA: boolItem{name: consensus.EnablePrerunTx, value: true},
configB: boolItem{name: types.FlagDownloadDDS, value: true},
},
// --upload-delta conflict with --paralleled-tx=true
// --upload-delta conflict with --deliver-txs-mode=1
{
configA: boolItem{name: types.FlagUploadDDS, value: true},
configB: boolItem{name: state.FlagParalleledTx, value: true},
configB: intItem{name: state.FlagDeliverTxsExecMode, value: 1},
},
// --upload-delta conflict with --deliver-txs-mode=2
{
configA: boolItem{name: types.FlagUploadDDS, value: true},
configB: intItem{name: state.FlagDeliverTxsExecMode, value: 2},
},
// --node-mode=rpc(--fast-query) conflicts with --deliver-txs-mode=1
{
configA: stringItem{name: apptype.FlagNodeMode, value: string(apptype.RpcNode)},
configB: intItem{name: state.FlagDeliverTxsExecMode, value: 1},
},
// --node-mode=rpc(--fast-query) conflicts with --paralleled-tx=true and --pruning=nothing
// --node-mode=rpc(--fast-query) conflicts with --deliver-txs-mode=2 and --pruning=nothing
{
configA: stringItem{name: apptype.FlagNodeMode, value: string(apptype.RpcNode)},
configB: boolItem{name: state.FlagParalleledTx, value: true},
configB: intItem{name: state.FlagDeliverTxsExecMode, value: 2},
},
{
configA: stringItem{name: apptype.FlagNodeMode, value: string(apptype.RpcNode)},
Expand Down
61 changes: 3 additions & 58 deletions app/utils/sanity/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,12 @@ import (
"github.com/okex/exchain/libs/cosmos-sdk/server"
"github.com/okex/exchain/libs/cosmos-sdk/store/types"
"github.com/okex/exchain/libs/tendermint/consensus"
"github.com/okex/exchain/libs/tendermint/state"
ttypes "github.com/okex/exchain/libs/tendermint/types"
"github.com/okex/exchain/x/evm/watcher"
"github.com/spf13/cobra"
"testing"
)

func getCommandNodeModeRpcParalleledTx() *cobra.Command {
return getCommand([]universeFlag{
&stringFlag{
Name: apptype.FlagNodeMode,
Default: "",
Changed: true,
Value: string(apptype.RpcNode),
},
&boolFlag{
Name: state.FlagParalleledTx,
Default: false,
Changed: true,
Value: true,
},
})
}

func getCommandNodeModeRpcPruningNothing() *cobra.Command {
return getCommand([]universeFlag{
&stringFlag{
Expand All @@ -46,23 +28,6 @@ func getCommandNodeModeRpcPruningNothing() *cobra.Command {
})
}

func getCommandFastQueryParalleledTx() *cobra.Command {
return getCommand([]universeFlag{
&boolFlag{
Name: watcher.FlagFastQuery,
Default: false,
Changed: true,
Value: true,
},
&boolFlag{
Name: state.FlagParalleledTx,
Default: false,
Changed: true,
Value: true,
},
})
}

func getCommandFastQueryPruningNothing() *cobra.Command {
return getCommand([]universeFlag{
&boolFlag{
Expand Down Expand Up @@ -97,23 +62,6 @@ func getCommandEnablePreruntxDownloadDelta() *cobra.Command {
})
}

func getCommandUploadDeltaParalleledTx() *cobra.Command {
return getCommand([]universeFlag{
&boolFlag{
Name: ttypes.FlagUploadDDS,
Default: false,
Changed: true,
Value: true,
},
&boolFlag{
Name: state.FlagParalleledTx,
Default: false,
Changed: true,
Value: true,
},
})
}

func TestCheckStart(t *testing.T) {
type args struct {
cmd *cobra.Command
Expand All @@ -123,12 +71,9 @@ func TestCheckStart(t *testing.T) {
args args
wantErr bool
}{
{name: "1. conflicts --fast-query and --paralleled-tx", args: args{cmd: getCommandFastQueryParalleledTx()}, wantErr: true},
{name: "2. conflicts --fast-query and --pruning=nothing", args: args{cmd: getCommandFastQueryPruningNothing()}, wantErr: true},
{name: "3. conflicts --enable-preruntx and --download-delta", args: args{cmd: getCommandEnablePreruntxDownloadDelta()}, wantErr: true},
{name: "4. conflicts --upload-delta and --paralleled-tx=true", args: args{cmd: getCommandUploadDeltaParalleledTx()}, wantErr: true},
{name: "5. conflicts --node-mod=rpc and --paralleled-tx=true", args: args{cmd: getCommandNodeModeRpcParalleledTx()}, wantErr: true},
{name: "6. conflicts --node-mod=rpc and --pruning=nothing", args: args{cmd: getCommandNodeModeRpcPruningNothing()}, wantErr: true},
{name: "1. conflicts --fast-query and --pruning=nothing", args: args{cmd: getCommandFastQueryPruningNothing()}, wantErr: true},
{name: "2. conflicts --enable-preruntx and --download-delta", args: args{cmd: getCommandEnablePreruntxDownloadDelta()}, wantErr: true},
{name: "3. conflicts --node-mod=rpc and --pruning=nothing", args: args{cmd: getCommandNodeModeRpcPruningNothing()}, wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
17 changes: 17 additions & 0 deletions app/utils/sanity/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ func (b boolItem) verbose() string {
return fmt.Sprintf("--%v=%v", b.name, b.value)
}

type intItem struct {
name string
value int
}

func (i intItem) label() string {
return i.name
}

func (i intItem) check() bool {
return viper.GetInt(i.label()) == i.value
}

func (i intItem) verbose() string {
return fmt.Sprintf("--%v=%v", i.name, i.value)
}

type stringItem struct {
name string
value string
Expand Down
3 changes: 2 additions & 1 deletion cmd/exchaind/repair_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ func repairStateCmd(ctx *server.Context) *cobra.Command {
log.Println("--------- repair data success ---------")
},
}
cmd.Flags().Bool(sm.FlagParalleledTx, false, "parallel execution for evm txs")
cmd.Flags().Int64(app.FlagStartHeight, 0, "Set the start block height for repair")
cmd.Flags().Bool(flatkv.FlagEnable, false, "Enable flat kv storage for read performance")
cmd.Flags().String(app.Elapsed, app.DefaultElapsedSchemas, "schemaName=1|0,,,")
cmd.Flags().Bool(analyzer.FlagEnableAnalyzer, false, "Enable auto open log analyzer")
cmd.Flags().Int(sm.FlagDeliverTxsExecMode, 0, "execution mode for deliver txs")
cmd.Flags().Int(sm.FlagDeliverTxsConcurrentNum, 0, "concurrent number for deliver txs when using partial-concurrent mode")
return cmd
}
4 changes: 2 additions & 2 deletions cmd/exchaind/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ func doReplay(ctx *server.Context, state sm.State, stateStoreDB dbm.DB,
block := originBlockStore.LoadBlock(lastBlockHeight)
meta := originBlockStore.LoadBlockMeta(lastBlockHeight)
blockExec := sm.NewBlockExecutor(stateStoreDB, ctx.Logger, mockApp, mock.Mempool{}, sm.MockEvidencePool{})
config.GetOecConfig().SetParalleledTxStatus(false) // mockApp not support parallel tx
config.GetOecConfig().SetDeliverTxsExecuteMode(0) // mockApp not support parallel tx
state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block)
config.GetOecConfig().SetParalleledTxStatus(viper.GetBool(sm.FlagParalleledTx))
config.GetOecConfig().SetDeliverTxsExecuteMode(viper.GetInt(sm.FlagDeliverTxsExecMode))
panicError(err)
}

Expand Down
Loading

0 comments on commit af9a55a

Please sign in to comment.