Skip to content

Commit

Permalink
Merge PR: enable pgu concurrency (#2941)
Browse files Browse the repository at this point in the history
* enable pgu concurrency

* enable dynamic config of pgu concurrency

* enable dynamic config of pgu concurrency
  • Loading branch information
yann-sjtu authored Feb 2, 2023
1 parent a89c367 commit bcfcd4f
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 4 deletions.
18 changes: 18 additions & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type OecConfig struct {
enablePGU bool
// mempool.pgu-adjustment
pguAdjustment float64
//mempool.pgu-concurrency
pguConcurrency int
// mempool.node_key_whitelist
nodeKeyWhitelist []string
//mempool.check_tx_cost
Expand Down Expand Up @@ -136,6 +138,7 @@ const (
FlagMaxGasUsedPerBlock = "mempool.max_gas_used_per_block"
FlagEnablePGU = "mempool.enable-pgu"
FlagPGUAdjustment = "mempool.pgu-adjustment"
FlagPGUConcurrency = "mempool.pgu-concurrency"
FlagNodeKeyWhitelist = "mempool.node_key_whitelist"
FlagMempoolCheckTxCost = "mempool.check_tx_cost"
FlagMempoolEnableDeleteMinGPTx = "mempool.enable_delete_min_gp_tx"
Expand Down Expand Up @@ -278,6 +281,7 @@ func (c *OecConfig) loadFromConfig() {
c.SetMaxGasUsedPerBlock(viper.GetInt64(FlagMaxGasUsedPerBlock))
c.SetEnablePGU(viper.GetBool(FlagEnablePGU))
c.SetPGUAdjustment(viper.GetFloat64(FlagPGUAdjustment))
c.SetPGUConcurrency(viper.GetInt(FlagPGUConcurrency))
c.SetGasLimitBuffer(viper.GetUint64(FlagGasLimitBuffer))

c.SetEnableDynamicGp(viper.GetBool(FlagEnableDynamicGp))
Expand Down Expand Up @@ -490,6 +494,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) {
return
}
c.SetPGUAdjustment(r)
case FlagPGUConcurrency:
r, err := strconv.Atoi(v)
if err != nil {
return
}
c.SetPGUConcurrency(r)
case FlagGasLimitBuffer:
r, err := strconv.ParseUint(v, 10, 64)
if err != nil {
Expand Down Expand Up @@ -810,6 +820,14 @@ func (c *OecConfig) SetPGUAdjustment(value float64) {
c.pguAdjustment = value
}

func (c *OecConfig) GetPGUConcurrency() int {
return c.pguConcurrency
}

func (c *OecConfig) SetPGUConcurrency(value int) {
c.pguConcurrency = value
}

func (c *OecConfig) GetGasLimitBuffer() uint64 {
return c.gasLimitBuffer
}
Expand Down
5 changes: 5 additions & 0 deletions libs/tendermint/cmd/tendermint/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func AddNodeFlags(cmd *cobra.Command) {
1,
"adjustment for pgu, such as 0.9 or 1.1",
)
cmd.Flags().Int(
"mempool.pgu-concurrency",
1,
"concurrency of pgu",
)
cmd.Flags().Bool(
"mempool.sort_tx_by_gp",
config.Mempool.SortTxByGp,
Expand Down
5 changes: 5 additions & 0 deletions libs/tendermint/config/dynamic_config_okchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type IDynamicConfig interface {
GetMaxGasUsedPerBlock() int64
GetEnablePGU() bool
GetPGUAdjustment() float64
GetPGUConcurrency() int
GetMempoolFlush() bool
GetNodeKeyWhitelist() []string
GetMempoolCheckTxCost() bool
Expand Down Expand Up @@ -72,6 +73,10 @@ func (d MockDynamicConfig) GetPGUAdjustment() float64 {
return 1
}

func (d MockDynamicConfig) GetPGUConcurrency() int {
return 1
}

func (d MockDynamicConfig) GetMempoolFlush() bool {
return false
}
Expand Down
39 changes: 35 additions & 4 deletions libs/tendermint/mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ type CListMempool struct {

txs ITransactionQueue

simQueue chan *mempoolTx
simQueue chan *mempoolTx
totalPGU int32
pguCloseSignals chan struct{}

gasCache *lru.Cache

Expand Down Expand Up @@ -142,7 +144,8 @@ func NewCListMempool(
mempool.rmPendingTxChan = make(chan types.EventDataRmPendingTx, 1000)
go mempool.fireRmPendingTxEvents()
}
go mempool.simulationRoutine()

go mempool.managePGU()

if cfg.DynamicConfig.GetMempoolCacheSize() > 0 {
mempool.cache = newMapTxCache(cfg.DynamicConfig.GetMempoolCacheSize())
Expand Down Expand Up @@ -1365,9 +1368,37 @@ func (mem *CListMempool) simulateTx(tx types.Tx) (*SimulationResponse, error) {
return &simuRes, err
}

func (mem *CListMempool) managePGU() {
mem.pguCloseSignals = make(chan struct{}, 10)
for {
concurrency := int32(cfg.DynamicConfig.GetPGUConcurrency())
// concurrency can not be less than 1
// disable pgu instead of setting concurrency less than 1
if concurrency < 1 {
concurrency = 1
}
for i := atomic.LoadInt32(&mem.totalPGU); i < concurrency; i++ {
// start new routine if concurrency is greater than totalPGU
go mem.simulationRoutine()
}
for i := concurrency; i < atomic.LoadInt32(&mem.totalPGU); i++ {
// stop routine if concurrency is less than totalPGU
mem.pguCloseSignals <- struct{}{}
}
time.Sleep(time.Minute)
}
}

func (mem *CListMempool) simulationRoutine() {
for memTx := range mem.simQueue {
mem.simulationJob(memTx)
atomic.AddInt32(&mem.totalPGU, 1)
defer atomic.AddInt32(&mem.totalPGU, -1)
for {
select {
case <-mem.pguCloseSignals:
return
case memTx := <-mem.simQueue:
mem.simulationJob(memTx)
}
}
}

Expand Down

0 comments on commit bcfcd4f

Please sign in to comment.