From 6d78f52839c983be73d9311c07a64344f4787ffb Mon Sep 17 00:00:00 2001 From: Tyler Smith Date: Mon, 10 Jun 2024 00:13:33 -0700 Subject: [PATCH] fix: Send drop events from blobpool. --- core/txpool/blobpool/blobpool.go | 66 ++++++++++++++++++++++++++++ core/txpool/drop_events.go | 14 ++++++ core/txpool/legacypool/legacypool.go | 44 +++++++------------ 3 files changed, 95 insertions(+), 29 deletions(-) create mode 100644 core/txpool/drop_events.go diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 6afa4dbe38b2..b73d384c2dab 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -403,6 +403,11 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.Addres p.Close() return err } + + p.dropTxFeed.Send(core.DropTxsEvent{ + Txs: []*types.Transaction{blobIDToTransaction(id)}, + Reason: txpool.DropUnexecutable, + }) } } // Sort the indexed transactions by nonce and delete anything gapped, create @@ -566,6 +571,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } + + p.dropTxFeed.Send(core.DropTxsEvent{ + Txs: []*types.Transaction{blobIDToTransaction(id)}, + Reason: txpool.DropUnexecutable, + }) } return } @@ -597,6 +607,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } + + p.dropTxFeed.Send(core.DropTxsEvent{ + Txs: []*types.Transaction{blobIDToTransaction(id)}, + Reason: txpool.DropLowNonce, + }) } p.index[addr] = txs } @@ -646,6 +661,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 txs = append(txs[:i], txs[i+1:]...) p.index[addr] = txs + p.dropTxFeed.Send(core.DropTxsEvent{ + Txs: []*types.Transaction{blobIDToTransaction(id)}, + Reason: txpool.DropReplaced, + }) + i-- continue } @@ -671,6 +691,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } + + p.dropTxFeed.Send(core.DropTxsEvent{ + Txs: []*types.Transaction{blobIDToTransaction(id)}, + Reason: txpool.DropUnexecutable, + }) } p.index[addr] = txs break @@ -717,6 +742,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } + + p.dropTxFeed.Send(core.DropTxsEvent{ + Txs: []*types.Transaction{blobIDToTransaction(id)}, + Reason: txpool.DropAccountCap, + }) } } // Sanity check that no account can have more queued transactions than the @@ -749,6 +779,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } + + p.dropTxFeed.Send(core.DropTxsEvent{ + Txs: []*types.Transaction{blobIDToTransaction(id)}, + Reason: txpool.DropAccountCap, + }) } } // Included cheap transactions might have left the remaining ones better from @@ -1075,6 +1110,11 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { if err := p.store.Delete(id); err != nil { log.Error("Failed to delete dropped transaction", "id", id, "err", err) } + + p.dropTxFeed.Send(core.DropTxsEvent{ + Txs: []*types.Transaction{blobIDToTransaction(id)}, + Reason: txpool.DropGasPriceUpdated, + }) } break } @@ -1343,6 +1383,12 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { delete(p.lookup, prev.hash) p.lookup[meta.hash] = meta.id p.stored += uint64(meta.size) - uint64(prev.size) + + p.dropTxFeed.Send(core.DropTxsEvent{ + Txs: []*types.Transaction{blobTxMetaToTransaction(prev)}, + Reason: txpool.DropReplaced, + Replacement: tx, + }) } else { // Transaction extends previously scheduled ones p.index[from] = append(p.index[from], meta) @@ -1464,6 +1510,11 @@ func (p *BlobPool) drop() { if err := p.store.Delete(drop.id); err != nil { log.Error("Failed to drop evicted transaction", "id", drop.id, "err", err) } + + p.dropTxFeed.Send(core.DropTxsEvent{ + Txs: []*types.Transaction{blobTxMetaToTransaction(&blobTxMeta{id: drop.id})}, + Reason: txpool.DropTruncating, + }) } // Pending retrieves all currently processable transactions, grouped by origin @@ -1712,3 +1763,18 @@ func (pool *BlobPool) SubscribeDropTxsEvent(ch chan<- core.DropTxsEvent) event.S func (pool *BlobPool) SubscribeRejectedTxEvent(ch chan<- core.RejectedTxEvent) event.Subscription { return pool.rejectTxFeed.Subscribe(ch) } + +func blobIDToTransaction(id uint64) *types.Transaction { + return blobTxMetaToTransaction(&blobTxMeta{id: id}) +} + +func blobTxMetaToTransaction(meta *blobTxMeta) *types.Transaction { + return types.NewTx(&types.BlobTx{ + Gas: meta.execGas, + BlobFeeCap: meta.blobFeeCap, + Nonce: meta.nonce, + GasFeeCap: meta.execFeeCap, + GasTipCap: meta.execTipCap, + BlobHashes: meta.blobHashes, + }) +} diff --git a/core/txpool/drop_events.go b/core/txpool/drop_events.go new file mode 100644 index 000000000000..75a679e0af5a --- /dev/null +++ b/core/txpool/drop_events.go @@ -0,0 +1,14 @@ +package txpool + +const ( + DropUnderpriced = "underpriced-txs" + DropLowNonce = "low-nonce-txs" + DropUnpayable = "unpayable-txs" + + DropAccountCap = "account-cap-txs" // Accounts exceeding txpool.accountslots transactions + DropReplaced = "replaced-txs" + DropUnexecutable = "unexecutable-txs" + DropTruncating = "truncating-txs" + DropOld = "old-txs" + DropGasPriceUpdated = "updated-gas-price" +) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index fae8c8ea72e3..b44ba9ec3936 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -384,7 +384,7 @@ func (pool *LegacyPool) loop() { } pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: list, - Reason: dropOld, + Reason: txpool.DropOld, }) queuedEvictionMeter.Mark(int64(len(list))) } @@ -468,7 +468,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) { pool.priced.Removed(len(drop)) pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: drop, - Reason: dropGasPriceUpdated, + Reason: txpool.DropGasPriceUpdated, }) } log.Info("Legacy pool tip threshold updated", "tip", newTip) @@ -794,7 +794,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e pool.changesSinceReorg += dropped pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: drop, - Reason: dropUnderpriced, + Reason: txpool.DropUnderpriced, }) } } @@ -814,7 +814,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e pendingReplaceMeter.Mark(1) pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: []*types.Transaction{old}, - Reason: dropReplaced, + Reason: txpool.DropReplaced, Replacement: tx, }) } @@ -894,7 +894,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local queuedReplaceMeter.Mark(1) pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: []*types.Transaction{old}, - Reason: dropReplaced, + Reason: txpool.DropReplaced, }) } else { // Nothing was replaced, bump the queued counter @@ -954,7 +954,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ pendingReplaceMeter.Mark(1) pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: []*types.Transaction{old}, - Reason: dropReplaced, + Reason: txpool.DropReplaced, }) } else { // Nothing was replaced, bump the pending counter @@ -1185,7 +1185,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo pendingGauge.Dec(int64(1 + len(invalids))) pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: invalids, - Reason: dropUnexecutable, + Reason: txpool.DropUnexecutable, }) return 1 + len(invalids) } @@ -1509,7 +1509,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T log.Trace("Removed old queued transactions", "count", len(forwards)) pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: forwards, - Reason: dropLowNonce, + Reason: txpool.DropLowNonce, }) // Drop all transactions that are too costly (low balance or out of gas) drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit) @@ -1521,7 +1521,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T queuedNofundsMeter.Mark(int64(len(drops))) pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: drops, - Reason: dropUnpayable, + Reason: txpool.DropUnpayable, }) // Gather all executable transactions and promote them @@ -1547,7 +1547,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T queuedRateLimitMeter.Mark(int64(len(caps))) pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: caps, - Reason: dropAccountCap, + Reason: txpool.DropAccountCap, }) } // Mark all the items dropped as removed @@ -1618,7 +1618,7 @@ func (pool *LegacyPool) truncatePending() { } pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: caps, - Reason: dropAccountCap, + Reason: txpool.DropAccountCap, }) pool.priced.Removed(len(caps)) pendingGauge.Dec(int64(len(caps))) @@ -1693,7 +1693,7 @@ func (pool *LegacyPool) truncateQueue() { } pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: txs, - Reason: dropTruncating, + Reason: txpool.DropTruncating, }) drop -= size queuedRateLimitMeter.Mark(int64(size)) @@ -1707,7 +1707,7 @@ func (pool *LegacyPool) truncateQueue() { queuedRateLimitMeter.Mark(1) pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: []*types.Transaction{txs[i]}, - Reason: dropTruncating, + Reason: txpool.DropTruncating, }) } } @@ -1735,7 +1735,7 @@ func (pool *LegacyPool) demoteUnexecutables() { } pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: olds, - Reason: dropLowNonce, + Reason: txpool.DropLowNonce, }) // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later drops, invalids := list.Filter(pool.currentState.GetBalance(addr), gasLimit) @@ -1746,7 +1746,7 @@ func (pool *LegacyPool) demoteUnexecutables() { } pool.dropTxFeed.Send(core.DropTxsEvent{ Txs: drops, - Reason: dropUnpayable, + Reason: txpool.DropUnpayable, }) pool.priced.Removed(len(olds) + len(drops)) pendingNofundsMeter.Mark(int64(len(drops))) @@ -2043,17 +2043,3 @@ func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions { func numSlots(tx *types.Transaction) int { return int((tx.Size() + txSlotSize - 1) / txSlotSize) } - - -const ( - dropUnderpriced = "underpriced-txs" - dropLowNonce = "low-nonce-txs" - dropUnpayable = "unpayable-txs" - - dropAccountCap = "account-cap-txs" // Accounts exceeding txpool.accountslots transactions - dropReplaced = "replaced-txs" - dropUnexecutable = "unexecutable-txs" - dropTruncating = "truncating-txs" - dropOld = "old-txs" - dropGasPriceUpdated = "updated-gas-price" -)