Skip to content

Commit

Permalink
pool: remove pool2 from legacypool
Browse files Browse the repository at this point in the history
  • Loading branch information
emailtovamos committed Sep 25, 2024
1 parent 0957562 commit 846e55b
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 164 deletions.
1 change: 0 additions & 1 deletion cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ var (
utils.TxPoolGlobalSlotsFlag,
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolPool2SlotsFlag,
utils.TxPoolPool3SlotsFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolReannounceTimeFlag,
Expand Down
10 changes: 0 additions & 10 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,6 @@ var (
Value: ethconfig.Defaults.TxPool.GlobalQueue,
Category: flags.TxPoolCategory,
}
TxPoolPool2SlotsFlag = &cli.Uint64Flag{
Name: "txpool.pool2slots",
Usage: "Maximum number of transaction slots in pool 2",
Value: ethconfig.Defaults.TxPool.Pool2Slots,
Category: flags.TxPoolCategory,
}
TxPoolPool3SlotsFlag = &cli.Uint64Flag{
Name: "txpool.pool3slots",
Usage: "Maximum number of transaction slots in pool 3",
Expand Down Expand Up @@ -1774,9 +1768,6 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) {
if ctx.IsSet(TxPoolGlobalQueueFlag.Name) {
cfg.GlobalQueue = ctx.Uint64(TxPoolGlobalQueueFlag.Name)
}
if ctx.IsSet(TxPoolPool2SlotsFlag.Name) {
cfg.Pool2Slots = ctx.Uint64(TxPoolPool2SlotsFlag.Name)
}
if ctx.IsSet(TxPoolPool3SlotsFlag.Name) {
cfg.Pool3Slots = ctx.Uint64(TxPoolPool3SlotsFlag.Name)
}
Expand Down Expand Up @@ -2310,7 +2301,6 @@ func EnableNodeInfo(poolConfig *legacypool.Config, nodeInfo *p2p.NodeInfo) Setup
"GlobalSlots": poolConfig.GlobalSlots,
"AccountQueue": poolConfig.AccountQueue,
"GlobalQueue": poolConfig.GlobalQueue,
"Pool2Slots": poolConfig.Pool2Slots,
"Pool3Slots": poolConfig.Pool3Slots,
"Lifetime": poolConfig.Lifetime,
})
Expand Down
102 changes: 25 additions & 77 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ var (
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)
pool2Gauge = metrics.NewRegisteredGauge("txpool/pool2", nil)
pool3Gauge = metrics.NewRegisteredGauge("txpool/pool3", nil)

reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)
Expand Down Expand Up @@ -140,7 +139,6 @@ type Config struct {
GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
Pool2Slots uint64 // Maximum number of transaction slots in pool 2
Pool3Slots uint64 // Maximum number of transaction slots in pool 3

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
Expand All @@ -159,7 +157,6 @@ var DefaultConfig = Config{
GlobalSlots: 4096 + 1024, // urgent + floating queue capacity with 4:1 ratio
AccountQueue: 64,
GlobalQueue: 1024,
Pool2Slots: 1024,
Pool3Slots: 1024,

Lifetime: 3 * time.Hour,
Expand Down Expand Up @@ -475,6 +472,7 @@ func (pool *LegacyPool) Close() error {
// Reset implements txpool.SubPool, allowing the legacy pool's internal state to be
// kept in sync with the main transaction pool's internal state.
func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
fmt.Println("Reset request")
wait := pool.requestReset(oldHead, newHead)
<-wait
}
Expand Down Expand Up @@ -781,7 +779,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}

maxPool1Size := pool.config.GlobalSlots + pool.config.GlobalQueue
maxPool2Size := pool.config.Pool2Slots
txPoolSizeAfterCurrentTx := uint64(pool.all.Slots() + numSlots(tx))

// Make the local flag. If it's from local source or it's from the network but
Expand Down Expand Up @@ -821,7 +818,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}

// If the transaction pool is full, discard underpriced transactions
if txPoolSizeAfterCurrentTx > (maxPool1Size + maxPool2Size) {
if txPoolSizeAfterCurrentTx > maxPool1Size {
// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
Expand All @@ -841,7 +838,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// New transaction is better than our worse ones, make room for it.
// If it's a local transaction, forcibly discard all available transactions.
// Otherwise if we can't make enough room for new one, abort the operation.
toBeDiscarded := pool.all.Slots() - int(pool.config.GlobalSlots+pool.config.GlobalQueue+pool.config.Pool2Slots) + numSlots(tx)
toBeDiscarded := pool.all.Slots() - int(pool.config.GlobalSlots+pool.config.GlobalQueue) + numSlots(tx)
drop, success := pool.priced.Discard(toBeDiscarded, isLocal)

// Special case, we still can't make the room for the new remote one.
Expand Down Expand Up @@ -903,7 +900,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pool.all.Add(tx, isLocal)
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.queueTxEvent(tx, false) // At this point pool1 can incorporate this. So no need for pool2 or pool3
pool.queueTxEvent(tx, false)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

// Successful promotion, bump the heartbeat
Expand All @@ -912,7 +909,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}

// New transaction isn't replacing a pending one, push into queue
replaced, err = pool.enqueueTx(hash, tx, isLocal, true, true) // At this point pool1 can incorporate this. So no need for pool2 or pool3
replaced, err = pool.enqueueTx(hash, tx, isLocal, true, true)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -943,57 +940,15 @@ func (pool *LegacyPool) addToPool3(drop types.Transactions, isLocal bool) {
txSlots := numSlots(tx)
if currentSlotsUsed+txSlots <= availableSlotsPool3 {
from, _ := types.Sender(pool.signer, tx)
pool.addToPool12OrPool3(tx, from, isLocal, false, false, true)
//pool.addToPool12OrPool3(tx, from, isLocal, false, false, true)
pool.localBufferPool.Add(tx)
log.Debug("adding to pool3", "transaction", tx.Hash().String(), "from", from.String())
currentSlotsUsed += txSlots
}
}
}
}

// addToPool12OrPool3 adds a transaction to pool1 or pool2 or pool3 depending on which one is asked for
func (pool *LegacyPool) addToPool12OrPool3(tx *types.Transaction, from common.Address, isLocal bool, pool1, pool2, pool3 bool) (bool, error) {
if pool1 {
pool.journalTx(from, tx)
pool.queueTxEvent(tx, false)
_, err := pool.enqueueTx(tx.Hash(), tx, isLocal, true, false) // At this point pool1 can incorporate this. So no need for pool2 or pool3
if err != nil {
return false, err
}
dirty := newAccountSet(pool.signer)
dirty.addTx(tx)
go func() {
<-pool.requestPromoteExecutables(dirty)
}()
log.Trace("Pooled new executable transaction", "hash", tx.Hash(), "from", from, "to", tx.To())

// Successful promotion, bump the heartbeat
pool.beats[from] = time.Now()

return true, nil
} else if pool2 {
pool.journalTx(from, tx)
pool.queueTxEvent(tx, true)
_, err := pool.enqueueTx(tx.Hash(), tx, isLocal, true, true)
if err != nil {
return false, err
}
dirty := newAccountSet(pool.signer)
dirty.addTx(tx)
pool2Gauge.Inc(1)
go func() {
<-pool.requestPromoteExecutables(dirty)
}()
log.Trace("Pooled new executable transaction", "hash", tx.Hash(), "from", from, "to", tx.To())

// Successful promotion, bump the heartbeat
pool.beats[from] = time.Now()
return true, nil
} else if pool3 {
pool.localBufferPool.Add(tx)
log.Debug("adding to pool3", "transaction", tx.Hash().String())
return true, nil
} else {
return false, errors.New("could not add to any pool")
log.Debug("adding to pool3 unsuccessful", "availableSlotsPool3", availableSlotsPool3)
fmt.Println("adding to pool3 unsuccessful")
}
}

Expand Down Expand Up @@ -1448,7 +1403,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
reorgDurationTimer.Update(time.Since(t0))
}(time.Now())
defer close(done)

fmt.Println("runReorg called")
var promoteAddrs []common.Address
if dirtyAccounts != nil && reset == nil {
// Only dirty accounts need to be promoted, unless we're resetting.
Expand All @@ -1475,9 +1430,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}
}

//// Transfer transactions from pool3 to pool2 for new block import
//pool.transferTransactions()

// Check for pending transactions for every account that sent new ones
promoted := pool.promoteExecutables(promoteAddrs)

Expand Down Expand Up @@ -1510,7 +1462,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.changesSinceReorg = 0 // Reset change counter
pool.mu.Unlock()

// Transfer transactions from pool3 to pool2 for new block import
// Transfer transactions from pool3 to pool1 for new block import
pool.transferTransactions()

// Notify subsystems for newly added transactions
Expand Down Expand Up @@ -1650,6 +1602,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
fmt.Println("promoteExecutables called")
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction

Expand Down Expand Up @@ -1685,10 +1638,8 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
}
}
log.Trace("Promoted queued transactions", "count", len(promoted))
fmt.Println("promoting")
queuedGauge.Dec(int64(len(readies)))
if list.txs.staticOnly {
pool2Gauge.Dec(int64(len(readies)))
}

// Drop all transactions over the allowed limit
var caps types.Transactions
Expand All @@ -1704,9 +1655,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
if list.txs.staticOnly {
pool2Gauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}

if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
Expand Down Expand Up @@ -1815,7 +1764,7 @@ func (pool *LegacyPool) truncateQueue() {
for _, list := range pool.queue {
queued += uint64(list.Len())
}
queueMax := pool.config.GlobalQueue + pool.config.Pool2Slots
queueMax := pool.config.GlobalQueue
if queued <= queueMax {
return
}
Expand Down Expand Up @@ -2210,31 +2159,30 @@ func (pool *LegacyPool) startPeriodicTransfer(t time.Duration) {
// transferTransactions mainly moves from pool 3 to pool 2
func (pool *LegacyPool) transferTransactions() {
maxPool1Size := int(pool.config.GlobalSlots + pool.config.GlobalQueue)
maxPool2Size := int(pool.config.Pool2Slots)
maxPool1Pool2CombinedSize := maxPool1Size + maxPool2Size
extraSizePool2Pool1 := maxPool1Pool2CombinedSize - int(uint64(len(pool.pending))+uint64(len(pool.queue)))
if extraSizePool2Pool1 <= 0 {
extraSizePool1 := maxPool1Size - int(uint64(len(pool.pending))+uint64(len(pool.queue)))
if extraSizePool1 <= 0 {
return
}

currentPool1Pool2Size := pool.all.Slots()
canTransferPool3ToPool2 := maxPool1Pool2CombinedSize > currentPool1Pool2Size
if !canTransferPool3ToPool2 {
currentPool1Size := pool.all.Slots()
canTransferPool3ToPool1 := maxPool1Size > currentPool1Size
if !canTransferPool3ToPool1 {
return
}
extraSlots := maxPool1Pool2CombinedSize - currentPool1Pool2Size
extraSlots := maxPool1Size - currentPool1Size
extraTransactions := (extraSlots + 3) / 4 // Since maximum slots per transaction is 4
// So now we can take out extraTransactions number of transactions from pool3 and put in pool2
// So now we can take out extraTransactions number of transactions from pool3 and put in pool1
if extraTransactions < 1 {
return
}

log.Debug("Will attempt to transfer from pool3 to pool2", "transactions", extraTransactions)
log.Debug("Will attempt to transfer from pool3 to pool1", "transactions", extraTransactions)

tx := pool.localBufferPool.Flush(extraTransactions)
if len(tx) == 0 {
return
}
fmt.Println("transferring tranasction")

pool.Add(tx, true, false)
}
Expand Down
Loading

0 comments on commit 846e55b

Please sign in to comment.