From 1e664abab8a27dfad850f23ad365aec8082a1fd3 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 2 Nov 2023 18:07:59 -0400 Subject: [PATCH] Query for non-finalized txes --- common/txmgr/abandoned_tracker.go | 194 -------------------------- common/txmgr/confirmer.go | 14 +- common/txmgr/tracker.go | 163 ++++++++++++++++++++++ common/txmgr/types/tx_store.go | 1 + core/chains/evm/txmgr/evm_tx_store.go | 10 ++ 5 files changed, 180 insertions(+), 202 deletions(-) delete mode 100644 common/txmgr/abandoned_tracker.go create mode 100644 common/txmgr/tracker.go diff --git a/common/txmgr/abandoned_tracker.go b/common/txmgr/abandoned_tracker.go deleted file mode 100644 index 080d50f0bef..00000000000 --- a/common/txmgr/abandoned_tracker.go +++ /dev/null @@ -1,194 +0,0 @@ -package txmgr - -import ( - "context" - "fmt" - "time" - - "golang.org/x/exp/slices" - - feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" - txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" - "github.com/smartcontractkit/chainlink/v2/common/types" - "github.com/smartcontractkit/chainlink/v2/core/logger" -) - -// TTL is the default time to live for abandoned transactions (6hrs) -const TTL = 6 * time.Hour - -// AbandonedErrorMsg occurs when an abandoned tx exceeds its time to live -var AbandonedErrorMsg = fmt.Sprintf( - "abandoned transaction exceeded time to live of %d hours", int(TTL.Hours())) - -// AbandonedTx is a transaction who's fromAddress was removed from the Confirmer's enabledAddresses list -type AbandonedTx[ - CHAIN_ID types.ID, - ADDR types.Hashable, - TX_HASH types.Hashable, - BLOCK_HASH types.Hashable, - SEQ types.Sequence, - FEE feetypes.Fee, -] struct { - tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - // fatalTime represents the time at which this transaction is to be marked fatal - fatalTime time.Time -} - -// AbandonedTracker tracks and handles abandoned transactions -type AbandonedTracker[ - CHAIN_ID types.ID, - ADDR types.Hashable, - TX_HASH types.Hashable, - BLOCK_HASH types.Hashable, - R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], - SEQ types.Sequence, - FEE feetypes.Fee, -] struct { - txStore *txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - ks *txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] - client *txmgrtypes.TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - chainID CHAIN_ID - lggr logger.Logger - txes []AbandonedTx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] -} - -// NewAbandonedTracker creates a new AbandonedTracker -func NewAbandonedTracker[ - CHAIN_ID types.ID, - ADDR types.Hashable, - TX_HASH types.Hashable, - BLOCK_HASH types.Hashable, - R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], - SEQ types.Sequence, - FEE feetypes.Fee, -]( - txStore *txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], - keystore *txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], - client *txmgrtypes.TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], - chainID CHAIN_ID, - lggr logger.Logger, -) AbandonedTracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { - return AbandonedTracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ - txStore: txStore, - ks: keystore, - chainID: chainID, - client: client, - lggr: lggr.Named("AbandonedTracker"), - txes: make([]AbandonedTx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0), - } -} - -func (tracker *AbandonedTracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) containsTx( - tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { - for _, atx := range tracker.txes { - if atx.tx.ID == tx.ID { - return true - } - } - return false -} - -// insertAbandonedTx inserts a transaction into the tracker -func (tracker *AbandonedTracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) insertAbandonedTx( - tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { - if tracker.containsTx(tx) { - return - } - - tracker.lggr.Debugw(fmt.Sprintf("inserting tx %v", tx.ID)) - tracker.txes = append(tracker.txes, AbandonedTx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ - tx: tx, - fatalTime: time.Now().Add(TTL), - }) -} - -// markFatal sets a transaction's state to fatal_error -func (tracker *AbandonedTracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFatal( - ctx context.Context, - atx AbandonedTx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { - tracker.lggr.Infow(fmt.Sprintf("tx %v marked as fatal for exceeding ttl", atx.tx.ID)) - - atx.tx.Error.SetValid(AbandonedErrorMsg) - - err := (*tracker.txStore).UpdateTxFatalError(ctx, atx.tx) - if err != nil { - tracker.lggr.Errorw(fmt.Sprintf("failed to mark tx %v as fatal", atx.tx.ID)) - // TODO: Handle error - } -} - -// getAbandonedAddresses retrieves fromAddress’s in evm.txes that are not present in the Confirmer's enabledAddresses list -func (tracker *AbandonedTracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) getAbandonedAddresses(enabledAddrs []ADDR) ([]ADDR, error) { - fromAddresses, err := (*tracker.ks).EnabledAddressesForChain(tracker.chainID) - if err != nil { - return nil, err - } - - var abandoned []ADDR - for _, addr := range fromAddresses { - if !slices.Contains(enabledAddrs, addr) { - abandoned = append(abandoned, addr) - } - } - - return abandoned, nil -} - -// HandleAbandonedTransactions is called by the Confirmer to track and handle all abandoned transactions -func (tracker *AbandonedTracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HandleAbandonedTransactions(ctx context.Context, enabledAddrs []ADDR) { - // Find abandoned addresses - abandonedAddrs, err := tracker.getAbandonedAddresses(enabledAddrs) - if err != nil { - // TODO handle error - } - - // Get abandoned txes from addresses and insert into the tracker - for _, addr := range abandonedAddrs { - seq, err := (*tracker.client).SequenceAt(ctx, addr, nil) - if err != nil { - // TODO handle error - } - - tx, err := (*tracker.txStore).FindTxWithSequence(ctx, addr, seq) - if err != nil { - // TODO handle error - } - - tracker.insertAbandonedTx(tx) - } - - // Check states of all current abandoned transactions and update tracking - tracker.handleTransactionStates(ctx) -} - -// handleTransactionStates handles all abandoned transactions based on their current state. -// Transactions with finalized states are no longer tracked, while transactions which -// exceed their ttl are marked as fatal. -func (tracker *AbandonedTracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleTransactionStates(ctx context.Context) { - temp := make([]AbandonedTx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0) - - for _, atx := range tracker.txes { - switch atx.tx.State { - case TxConfirmed, TxConfirmedMissingReceipt, TxFatalError: - // Stop tracking tx when finalized state is obtained - continue - case TxInProgress: - if time.Now().After(atx.fatalTime) { - tracker.markFatal(ctx, atx) - continue - } - temp = append(temp, atx) - case TxUnstarted, TxUnconfirmed: - if time.Now().After(atx.fatalTime) { - // TODO: Handle cancelling TxUnstarted, TxUnconfirmed - continue - } - temp = append(temp, atx) - default: - // This should never happen unless a new transaction state is added - tracker.lggr.Panicw(fmt.Sprintf("unhandled transaction state: %v", atx.tx.State)) - } - } - - tracker.txes = temp -} diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index 46d07f9c779..184deacd6b4 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -127,7 +127,7 @@ type Confirmer[ ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] enabledAddresses []ADDR - abandonedTracker AbandonedTracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + abandonedTracker Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] mb *utils.Mailbox[HEAD] ctx context.Context @@ -162,21 +162,19 @@ func NewConfirmer[ isReceiptNil func(R) bool, ) *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { lggr = lggr.Named("Confirmer") - chainID := client.ConfiguredChainID() - return &Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ txStore: txStore, lggr: lggr, client: client, TxAttemptBuilder: txAttemptBuilder, - abandonedTracker: NewAbandonedTracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]( - &txStore, &keystore, &client, chainID, lggr), + tracker: NewTracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]( + &txStore, lggr), resumeCallback: nil, chainConfig: chainConfig, feeConfig: feeConfig, txConfig: txConfig, dbConfig: dbConfig, - chainID: chainID, + chainID: client.ConfiguredChainID(), ks: keystore, mb: utils.NewSingleMailbox[HEAD](), isReceiptNil: isReceiptNil, @@ -312,8 +310,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro ec.lggr.Debugw("Finished EnsureConfirmedTransactionsInLongestChain", "headNum", head.BlockNumber(), "time", time.Since(mark), "id", "confirmer") mark = time.Now() - ec.abandonedTracker.HandleAbandonedTransactions(ctx, ec.enabledAddresses) - ec.lggr.Debugw("Finished HandleAbandonedTransactions", "headNum", head.BlockNumber(), "time", time.Since(mark), "id", "confirmer") + ec.abandonedTracker.HandleAbandonedTxes(ctx, ec.enabledAddresses) + ec.lggr.Debugw("Finished HandleAbandonedTxes", "headNum", head.BlockNumber(), "time", time.Since(mark), "id", "confirmer") if ec.resumeCallback != nil { mark = time.Now() diff --git a/common/txmgr/tracker.go b/common/txmgr/tracker.go new file mode 100644 index 00000000000..6b26c915b46 --- /dev/null +++ b/common/txmgr/tracker.go @@ -0,0 +1,163 @@ +package txmgr + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "slices" + "time" + + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + "github.com/smartcontractkit/chainlink/v2/common/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +const ( + // TTL is the default time to live for abandoned transactions (6hrs) + TTL = 6 * time.Hour + // MaxCacheSize is the max number of transactions to track at once + MaxCacheSize = 100 +) + +// AbandonedErrorMsg occurs when an abandoned tx exceeds its time to live +var AbandonedErrorMsg = fmt.Sprintf( + "abandoned transaction exceeded time to live of %d hours", int(TTL.Hours())) + +// AbandonedTx is a transaction who's 'FromAddress' was removed from Confirmer's enabled addresses list +type AbandonedTx[ + CHAIN_ID types.ID, + ADDR types.Hashable, + TX_HASH types.Hashable, + BLOCK_HASH types.Hashable, + SEQ types.Sequence, + FEE feetypes.Fee, +] struct { + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + // fatalTime represents the time at which this transaction is to be marked fatal + fatalTime time.Time +} + +// Tracker tracks and finalizes abandoned transactions +type Tracker[ + CHAIN_ID types.ID, + ADDR types.Hashable, + TX_HASH types.Hashable, + BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +] struct { + txStore *txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + lggr logger.Logger + // txCache stores abandoned transactions by ID + txCache map[int64]AbandonedTx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] +} + +// NewTracker creates a new Tracker +func NewTracker[ + CHAIN_ID types.ID, + ADDR types.Hashable, + TX_HASH types.Hashable, + BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +]( + txStore *txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], + lggr logger.Logger, +) Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + return Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ + txStore: txStore, + lggr: lggr.Named("Tracker"), + txCache: map[int64]AbandonedTx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}, + } +} + +// HandleAbandonedTxes is called by the Confirmer to track and finalize abandoned transactions +func (tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HandleAbandonedTxes(ctx context.Context, enabledAddrs []ADDR) { + // Try to finalize tracked transactions + for id, atx := range tracker.txCache { + if finalized := tracker.finalizeTx(ctx, atx); finalized { + delete(tracker.txCache, id) + } + } + + // Track more abandoned transactions if there's space + if len(tracker.txCache) == MaxCacheSize { + return + } + + nonFinalizedTxes, err := (*tracker.txStore).GetNonFinalizedTransactions(ctx, MaxCacheSize) + if err != nil { + tracker.lggr.Errorw("failed to get non finalized txes from txStore") + return + } + + for _, tx := range nonFinalizedTxes { + if _, contains := tracker.txCache[tx.ID]; contains { + continue + } + + // Check if tx is abandoned + if !slices.Contains(enabledAddrs, tx.FromAddress) { + tracker.insertTx(tx) + if len(tracker.txCache) == MaxCacheSize { + break + } + } + } +} + +// insertTx inserts a transaction into the tracker as an AbandonedTx +func (tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) insertTx( + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if _, contains := tracker.txCache[tx.ID]; contains { + return + } + + tracker.txCache[tx.ID] = AbandonedTx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ + tx: tx, + fatalTime: time.Now().Add(TTL), + } + tracker.lggr.Debugw(fmt.Sprintf("inserted tx %v", tx.ID)) +} + +// finalizeTx tries to finalize a transaction based on its current state. +// Returns true if the transaction was finalized. +func (tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) finalizeTx( + ctx context.Context, atx AbandonedTx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + switch atx.tx.State { + case TxConfirmed, TxConfirmedMissingReceipt, TxFatalError: + return true + case TxInProgress: + if time.Now().After(atx.fatalTime) { + // TODO: Confirm tx status on chain in case it was confirmed + if err := tracker.finalizeFatal(ctx, atx.tx); err != nil { + tracker.lggr.Errorw(err.Error()) + } + return true + } + case TxUnstarted, TxUnconfirmed: + // TODO: Handle TxUnstarted, TxUnconfirmed + default: + tracker.lggr.Panicw(fmt.Sprintf("unhandled transaction state: %v", atx.tx.State)) + } + + return false +} + +// finalizeFatal sets a transaction's state to fatal_error +func (tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) finalizeFatal( + ctx context.Context, + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + tx.Error.SetValid(AbandonedErrorMsg) + + err := (*tracker.txStore).UpdateTxFatalError(ctx, tx) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("failed to mark tx %v as fatal", tx.ID)) + } + + tracker.lggr.Infow(fmt.Sprintf("tx %v marked fatal for exceeding ttl", tx.ID)) + return nil +} diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 059a87d7ab2..829c7db7c10 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -59,6 +59,7 @@ type TransactionStore[ CreateTransaction(ctx context.Context, txRequest TxRequest[ADDR, TX_HASH], chainID CHAIN_ID) (tx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) DeleteInProgressAttempt(ctx context.Context, attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error FindLatestSequence(ctx context.Context, fromAddress ADDR, chainId CHAIN_ID) (SEQ, error) + GetNonFinalizedTransactions(ctx context.Context, limit int) (txs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindTxsRequiringGasBump(ctx context.Context, address ADDR, blockNum, gasBumpThreshold, depth int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindTxsRequiringResubmissionDueToInsufficientFunds(ctx context.Context, address ADDR, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindTxAttemptsConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 86a06b60250..7294d03b9e1 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1195,6 +1195,16 @@ func (o *evmTxStore) SaveInProgressAttempt(ctx context.Context, attempt *TxAttem return nil } +func (o *evmTxStore) GetNonFinalizedTransactions(ctx context.Context, limit int) (txes []*Tx, err error) { + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + queryStr := fmt.Sprintf(` +SELECT * FROM evm.txes +WHERE state = 'unconfirmed' OR state = 'unstarted' OR state = 'in_progress' +LIMIT %d`, limit) + err = qq.Get(txes, queryStr) + return txes, err +} + // FindTxsRequiringGasBump returns transactions that have all // attempts which are unconfirmed for at least gasBumpThreshold blocks, // limited by limit pending transactions