Skip to content

Commit

Permalink
pool: remove pool2 related code
Browse files Browse the repository at this point in the history
  • Loading branch information
emailtovamos committed Sep 26, 2024
1 parent 846e55b commit 6a6e09c
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 135 deletions.
8 changes: 1 addition & 7 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,7 @@ import (
)

// NewTxsEvent is posted when a batch of transactions enters the transaction pool.
type NewTxsEvent struct {
Txs []*types.Transaction
// Static bool is Whether to send to only Static peer or not.
// This is because at high traffic we still want to broadcast transactions to at least some peers so that we
// minimize the transaction lost.
Static bool
}
type NewTxsEvent struct{ Txs []*types.Transaction }

// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }
Expand Down
4 changes: 0 additions & 4 deletions core/txpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ var (
// configured for the transaction pool.
ErrUnderpriced = errors.New("transaction underpriced")

// ErrUnderpricedTransferredtoAnotherPool is returned if a transaction's gas price is below the minimum
// configured for the transaction pool.
ErrUnderpricedTransferredtoAnotherPool = errors.New("transaction underpriced, so it is either in pool2 or pool3")

// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
Expand Down
74 changes: 25 additions & 49 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ type LegacyPool struct {

reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan QueueTxEventCh
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
Expand Down Expand Up @@ -280,7 +280,7 @@ func New(config Config, chain BlockChain) *LegacyPool {
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan QueueTxEventCh),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
Expand Down Expand Up @@ -405,7 +405,7 @@ func (pool *LegacyPool) loop() {
}
// Any non-locals old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list, _ := pool.queue[addr].Flatten()
list := pool.queue[addr].Flatten()
for _, tx := range list {
pool.removeTx(tx.Hash(), true, true)
}
Expand All @@ -416,27 +416,25 @@ func (pool *LegacyPool) loop() {

case <-reannounce.C:
pool.mu.RLock()
reannoTxs, _ := func() ([]*types.Transaction, []bool) {
reannoTxs := func() []*types.Transaction {
txs := make([]*types.Transaction, 0)
statics := make([]bool, 0)
for addr, list := range pool.pending {
if !pool.locals.contains(addr) {
continue
}
transactions, static := list.Flatten()
transactions := list.Flatten()
for _, tx := range transactions {
// Default ReannounceTime is 10 years, won't announce by default.
if time.Since(tx.Time()) < pool.config.ReannounceTime {
break
}
txs = append(txs, tx)
statics = append(statics, static)
if len(txs) >= txReannoMaxNum {
return txs, statics
return txs
}
}
}
return txs, statics
return txs
}()
pool.mu.RUnlock()
if len(reannoTxs) > 0 {
Expand Down Expand Up @@ -567,11 +565,11 @@ func (pool *LegacyPool) Content() (map[common.Address][]*types.Transaction, map[

pending := make(map[common.Address][]*types.Transaction, len(pool.pending))
for addr, list := range pool.pending {
pending[addr], _ = list.Flatten()
pending[addr] = list.Flatten()
}
queued := make(map[common.Address][]*types.Transaction, len(pool.queue))
for addr, list := range pool.queue {
queued[addr], _ = list.Flatten()
queued[addr] = list.Flatten()
}
return pending, queued
}
Expand All @@ -584,11 +582,11 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,

var pending []*types.Transaction
if list, ok := pool.pending[addr]; ok {
pending, _ = list.Flatten()
pending = list.Flatten()
}
var queued []*types.Transaction
if list, ok := pool.queue[addr]; ok {
queued, _ = list.Flatten()
queued = list.Flatten()
}
return pending, queued
}
Expand Down Expand Up @@ -620,7 +618,7 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs, static := list.Flatten()
txs := list.Flatten()

// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
Expand All @@ -643,7 +641,6 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
Static: static,
}
}
pending[addr] = lazies
Expand All @@ -667,11 +664,11 @@ func (pool *LegacyPool) local() map[common.Address]types.Transactions {
txs := make(map[common.Address]types.Transactions)
for addr := range pool.locals.accounts {
if pending := pool.pending[addr]; pending != nil {
transactions, _ := pending.Flatten()
transactions := pending.Flatten()
txs[addr] = append(txs[addr], transactions...)
}
if queued := pool.queue[addr]; queued != nil {
transactions, _ := queued.Flatten()
transactions := queued.Flatten()
txs[addr] = append(txs[addr], transactions...)
}
}
Expand Down Expand Up @@ -1315,12 +1312,8 @@ func (pool *LegacyPool) requestPromoteExecutables(set *accountSet) chan struct{}

// queueTxEvent enqueues a transaction event to be sent in the next reorg run.
func (pool *LegacyPool) queueTxEvent(tx *types.Transaction, static bool) {
event := QueueTxEventCh{
tx: tx,
static: static,
}
select {
case pool.queueTxEventCh <- event:
case pool.queueTxEventCh <- tx:
case <-pool.reorgShutdownCh:
}
}
Expand Down Expand Up @@ -1374,14 +1367,14 @@ func (pool *LegacyPool) scheduleReorgLoop() {
launchNextRun = true
pool.reorgDoneCh <- nextDone

case queue := <-pool.queueTxEventCh:
case tx := <-pool.queueTxEventCh:
// Queue up the event, but don't schedule a reorg. It's up to the caller to
// request one later if they want the events sent.
addr, _ := types.Sender(pool.signer, queue.tx)
addr, _ := types.Sender(pool.signer, tx)
if _, ok := queuedEvents[addr]; !ok {
queuedEvents[addr] = newSortedMap()
}
queuedEvents[addr].Put(queue.tx, queue.static)
queuedEvents[addr].Put(tx)

case <-curDone:
curDone = nil
Expand Down Expand Up @@ -1449,7 +1442,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
highestPending, _ := list.LastElement()
highestPending := list.LastElement()
nonces[addr] = highestPending.Nonce() + 1
}
pool.pendingNonces.setAll(nonces)
Expand All @@ -1471,31 +1464,14 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
if _, ok := events[addr]; !ok {
events[addr] = newSortedMap()
}
events[addr].Put(tx, false) // todo putting false as placeholder for now
events[addr].Put(tx)
}
if len(events) > 0 {
staticTxs := make([]*types.Transaction, 0)
nonStaticTxs := make([]*types.Transaction, 0)
var txs []*types.Transaction
for _, set := range events {
flattenedTxs, _ := set.Flatten()
if set.staticOnly {
staticTxs = append(staticTxs, flattenedTxs...)
} else {
nonStaticTxs = append(nonStaticTxs, flattenedTxs...)
}
}
// Send static transactions
if len(staticTxs) > 0 {
fmt.Println("New txevent emitted for static ", staticTxs[0].Hash())
pool.txFeed.Send(core.NewTxsEvent{Txs: staticTxs, Static: true})
}

// Send dynamic transactions
if len(nonStaticTxs) > 0 {
from, _ := types.Sender(pool.signer, nonStaticTxs[0])
fmt.Println("New txevent emitted for non static ", nonStaticTxs[0].Hash(), len(nonStaticTxs), from.String())
pool.txFeed.Send(core.NewTxsEvent{Txs: nonStaticTxs, Static: false})
txs = append(txs, set.Flatten()...)
}
pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
}
}

Expand Down Expand Up @@ -1787,7 +1763,7 @@ func (pool *LegacyPool) truncateQueue() {

// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
transactions, _ := list.Flatten()
transactions := list.Flatten()
for _, tx := range transactions {
pool.removeTx(tx.Hash(), true, true)
}
Expand All @@ -1796,7 +1772,7 @@ func (pool *LegacyPool) truncateQueue() {
continue
}
// Otherwise drop only last few transactions
txs, _ := list.Flatten()
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true, true)
drop--
Expand Down
34 changes: 16 additions & 18 deletions core/txpool/legacypool/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ func (h *nonceHeap) Pop() interface{} {
// sortedMap is a nonce->transaction hash map with a heap based index to allow
// iterating over the contents in a nonce-incrementing way.
type sortedMap struct {
items map[uint64]*types.Transaction // Hash map storing the transaction data
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
cache types.Transactions // Cache of the transactions already sorted
cacheMu sync.Mutex // Mutex covering the cache
staticOnly bool // only send this transaction to static peers
items map[uint64]*types.Transaction // Hash map storing the transaction data
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
cache types.Transactions // Cache of the transactions already sorted
cacheMu sync.Mutex // Mutex covering the cache
}

// newSortedMap creates a new nonce-sorted transaction map.
Expand All @@ -85,7 +84,7 @@ func (m *sortedMap) Get(nonce uint64) *types.Transaction {

// Put inserts a new transaction into the map, also updating the map's nonce
// index. If a transaction already exists with the same nonce, it's overwritten.
func (m *sortedMap) Put(tx *types.Transaction, static bool) {
func (m *sortedMap) Put(tx *types.Transaction) {
nonce := tx.Nonce()
if m.items[nonce] == nil {
heap.Push(m.index, nonce)
Expand All @@ -95,7 +94,6 @@ func (m *sortedMap) Put(tx *types.Transaction, static bool) {
txSortedMapPool.Put(m.cache)
}
m.items[nonce], m.cache = tx, nil
m.staticOnly = static
m.cacheMu.Unlock()
}

Expand Down Expand Up @@ -255,7 +253,7 @@ func (m *sortedMap) Len() int {
return len(m.items)
}

func (m *sortedMap) flatten() (types.Transactions, bool) {
func (m *sortedMap) flatten() types.Transactions {
m.cacheMu.Lock()
defer m.cacheMu.Unlock()
// If the sorting was not cached yet, create and cache it
Expand All @@ -272,25 +270,25 @@ func (m *sortedMap) flatten() (types.Transactions, bool) {
}
sort.Sort(types.TxByNonce(m.cache))
}
return m.cache, m.staticOnly
return m.cache
}

// Flatten creates a nonce-sorted slice of transactions based on the loosely
// sorted internal representation. The result of the sorting is cached in case
// it's requested again before any modifications are made to the contents.
func (m *sortedMap) Flatten() (types.Transactions, bool) {
cache, static := m.flatten()
func (m *sortedMap) Flatten() types.Transactions {
cache := m.flatten()
// Copy the cache to prevent accidental modification
txs := make(types.Transactions, len(cache))
copy(txs, cache)
return txs, static
return txs
}

// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (m *sortedMap) LastElement() (*types.Transaction, bool) {
cache, static := m.flatten()
return cache[len(cache)-1], static
func (m *sortedMap) LastElement() *types.Transaction {
cache := m.flatten()
return cache[len(cache)-1]
}

// list is a "list" of transactions belonging to an account, sorted by account
Expand Down Expand Up @@ -362,7 +360,7 @@ func (l *list) Add(tx *types.Transaction, priceBump uint64, static bool) (bool,
l.totalcost.Add(l.totalcost, cost)

// Otherwise overwrite the old transaction with the current one
l.txs.Put(tx, static)
l.txs.Put(tx)
if l.costcap.Cmp(cost) < 0 {
l.costcap = cost
}
Expand Down Expand Up @@ -477,13 +475,13 @@ func (l *list) Empty() bool {
// Flatten creates a nonce-sorted slice of transactions based on the loosely
// sorted internal representation. The result of the sorting is cached in case
// it's requested again before any modifications are made to the contents.
func (l *list) Flatten() (types.Transactions, bool) {
func (l *list) Flatten() types.Transactions {
return l.txs.Flatten()
}

// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (l *list) LastElement() (*types.Transaction, bool) {
func (l *list) LastElement() *types.Transaction {
return l.txs.LastElement()
}

Expand Down
2 changes: 0 additions & 2 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ type LazyTransaction struct {

Gas uint64 // Amount of gas required by the transaction
BlobGas uint64 // Amount of blob gas required by the transaction

Static bool // To specify whether to broadcast it to static peers or not
}

// Resolve retrieves the full transaction belonging to a lazy handle if it is still
Expand Down
Loading

0 comments on commit 6a6e09c

Please sign in to comment.