Skip to content

Commit

Permalink
Merge PR: optimize tx pool feature (#966)
Browse files Browse the repository at this point in the history
Co-authored-by: MengXiangJian <[email protected]>
  • Loading branch information
ilovers and xiangjianmeng authored Aug 16, 2021
1 parent f6d331f commit 9e670c0
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 33 deletions.
27 changes: 15 additions & 12 deletions app/rpc/namespaces/eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ import (
)

const (
FlagGasLimitBuffer = "gas-limit-buffer"
CacheOfEthCallLru = 40960
FlagGasLimitBuffer = "gas-limit-buffer"
CacheOfEthCallLru = 40960
)

// PublicEthereumAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec.
Expand Down Expand Up @@ -609,6 +609,11 @@ func (api *PublicEthereumAPI) SendTransaction(args rpctypes.SendTxArgs) (common.
return common.Hash{}, err
}

// send chanData to txPool
if api.txPool != nil {
return broadcastTxByTxPool(api, tx, txBytes)
}

// Broadcast transaction in sync mode (default)
// NOTE: If error is encountered on the node, the broadcast will not return an error
res, err := api.clientCtx.BroadcastTx(txBytes)
Expand Down Expand Up @@ -644,7 +649,7 @@ func (api *PublicEthereumAPI) SendRawTransaction(data hexutil.Bytes) (common.Has
}

// send chanData to txPool
if viper.GetBool(FlagEnableTxPool) {
if api.txPool != nil {
return broadcastTxByTxPool(api, tx, txBytes)
}

Expand Down Expand Up @@ -1239,15 +1244,13 @@ func (api *PublicEthereumAPI) generateFromArgs(args rpctypes.SendTxArgs) (*evmty
gasPrice = ParseGasPrice().ToInt()
}

// get the nonce from the account retriever and the pending transactions
nonce, err = api.accountNonce(api.clientCtx, *args.From, true)
if err != nil {
return nil, err
}

if args.Nonce != nil {
if nonce != (uint64)(*args.Nonce) {
return nil, fmt.Errorf(fmt.Sprintf("invalid nonce; got %d, expected %d", (uint64)(*args.Nonce), nonce))
if args.Nonce != nil && (uint64)(*args.Nonce) > 0 {
nonce = (uint64)(*args.Nonce)
} else {
// get the nonce from the account retriever and the pending transactions
nonce, err = api.accountNonce(api.clientCtx, *args.From, true)
if err != nil {
return nil, err
}
}

Expand Down
39 changes: 19 additions & 20 deletions app/rpc/namespaces/eth/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

const (
FlagEnableTxPool = "enable-tx-pool"
TxPoolSliceMaxLen = "tx-pool-cap"
TxPoolCap = "tx-pool-cap"
BroadcastPeriodSecond = "broadcast-period-second"
txPoolDb = "tx_pool"
)
Expand All @@ -36,28 +36,27 @@ var broadcastErrors = map[uint32]*sdkerrors.Error{
sdkerrors.ErrTxTooLarge.ABCICode(): sdkerrors.ErrTxTooLarge,
}

var txPoolSliceMaxLen uint64


type TxPool struct {
addressTxsPool map[common.Address][]*evmtypes.MsgEthereumTx // All currently processable transactions
clientCtx clientcontext.CLIContext
db tmdb.DB
mu sync.Mutex
addressTxsPool map[common.Address][]*evmtypes.MsgEthereumTx // All currently processable transactions
clientCtx clientcontext.CLIContext
db tmdb.DB
mu sync.Mutex
cap uint64
broadcastInterval time.Duration
}

func NewTxPool(clientCtx clientcontext.CLIContext, api *PublicEthereumAPI) *TxPool {
db, err := openDB()
if err != nil {
panic(err)
}

txPoolSliceMaxLen = viper.GetUint64(TxPoolSliceMaxLen)

interval := time.Second * time.Duration(viper.GetInt(BroadcastPeriodSecond))
pool := &TxPool{
addressTxsPool: make(map[common.Address][]*evmtypes.MsgEthereumTx),
clientCtx: clientCtx,
db: db,
addressTxsPool: make(map[common.Address][]*evmtypes.MsgEthereumTx),
clientCtx: clientCtx,
db: db,
cap: viper.GetUint64(TxPoolCap),
broadcastInterval: interval,
}

if err = pool.initDB(api); err != nil {
Expand Down Expand Up @@ -151,7 +150,7 @@ func (pool *TxPool) CacheAndBroadcastTx(api *PublicEthereumAPI, address common.A
return fmt.Errorf("AccountNonce of tx is less than currentNonce in memPool: AccountNonce[%d], currentNonce[%d]", tx.Data.AccountNonce, currentNonce)
}

if tx.Data.AccountNonce > currentNonce + txPoolSliceMaxLen {
if tx.Data.AccountNonce > currentNonce+pool.cap {
return fmt.Errorf("AccountNonce of tx is bigger than txPool capacity, please try later: AccountNonce[%d]", tx.Data.AccountNonce)
}

Expand Down Expand Up @@ -187,7 +186,7 @@ func (pool *TxPool) update(index int, address common.Address, tx *evmtypes.MsgEt
func (pool *TxPool) insertTx(address common.Address, tx *evmtypes.MsgEthereumTx) error {
// if this is the first time to insertTx, make the cap of txPool be TxPoolSliceMaxLen
if _, ok := pool.addressTxsPool[address]; !ok {
pool.addressTxsPool[address] = make([]*evmtypes.MsgEthereumTx, 0, txPoolSliceMaxLen)
pool.addressTxsPool[address] = make([]*evmtypes.MsgEthereumTx, 0, pool.cap)
}
index := 0
for index < len(pool.addressTxsPool[address]) {
Expand Down Expand Up @@ -232,7 +231,7 @@ func (pool *TxPool) continueBroadcast(api *PublicEthereumAPI, currentNonce uint6
// tx has err, and err is not mempoolfull, the tx should be dropped
err = fmt.Errorf("%s, nonce %d of tx has been dropped, please send again",
err.Error(), pool.addressTxsPool[address][i].Data.AccountNonce)
pool.dropTxs(i + 1, address)
pool.dropTxs(i+1, address)
} else {
err = fmt.Errorf("%s, nonce %d :", err.Error(), pool.addressTxsPool[address][i].Data.AccountNonce)
pool.dropTxs(i, address)
Expand All @@ -244,8 +243,8 @@ func (pool *TxPool) continueBroadcast(api *PublicEthereumAPI, currentNonce uint6
}

// drop [0:index) txs in txpool
func (pool *TxPool)dropTxs(index int, address common.Address) {
tmp := make([]*evmtypes.MsgEthereumTx, len(pool.addressTxsPool[address][index:]), txPoolSliceMaxLen)
func (pool *TxPool) dropTxs(index int, address common.Address) {
tmp := make([]*evmtypes.MsgEthereumTx, len(pool.addressTxsPool[address][index:]), pool.cap)
copy(tmp, pool.addressTxsPool[address][index:])
pool.addressTxsPool[address] = tmp
}
Expand Down Expand Up @@ -301,7 +300,7 @@ func (pool *TxPool) delTxInDB(address common.Address, txNonce uint64) error {

func (pool *TxPool) broadcastPeriod(api *PublicEthereumAPI) {
for {
time.Sleep(time.Second * time.Duration(viper.GetInt(BroadcastPeriodSecond)))
time.Sleep(pool.broadcastInterval)
pool.broadcastPeriodCore(api)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/client/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func RegisterAppFlag(cmd *cobra.Command) {
cmd.Flags().String(token.FlagOSSObjectPath, "", "The OSS object path")

cmd.Flags().Bool(eth.FlagEnableTxPool, false, "Enable the function of txPool to support concurrency call eth_sendRawTransaction")
cmd.Flags().Uint64(eth.TxPoolSliceMaxLen, 10000, "Set the txPool slice max length")
cmd.Flags().Uint64(eth.TxPoolCap, 10000, "Set the txPool slice max length")
cmd.Flags().Int(eth.BroadcastPeriodSecond, 10, "every BroadcastPeriodSecond second check the txPool, and broadcast when it's eligible")

cmd.Flags().Bool(rpc.FlagEnableMonitor, false, "Enable the rpc monitor and register rpc metrics to prometheus")
Expand Down

0 comments on commit 9e670c0

Please sign in to comment.