diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 72461e76100..f577ddf8166 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -84,7 +84,7 @@ type TransactionStore[ FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) - FindConfirmedTxes(ctx context.Context, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + FindTxesToMarkFinalized(ctx context.Context, finalizedBlockNum int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetAbandonedTransactionsByBatch(ctx context.Context, chainID CHAIN_ID, enabledAddrs []ADDR, offset, limit uint) (txs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index c1c05dbcebf..325401ff063 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -2041,15 +2041,18 @@ func (o *evmTxStore) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, return err } -// Returns all confirmed transactions -func (o *evmTxStore) FindConfirmedTxes(ctx context.Context, chainID *big.Int) (txes []*Tx, err error) { +// Returns all confirmed transactions with receipt block nums older than or equal to the finalized block number +func (o *evmTxStore) FindTxesToMarkFinalized(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) (txes []*Tx, err error) { var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) defer cancel() err = o.Transact(ctx, true, func(orm *evmTxStore) error { - sql := "SELECT * FROM evm.txes WHERE state = 'confirmed' AND evm_chain_id = $1" + sql := `SELECT evm.txes.* FROM evm.txes + INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id + INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash + WHERE evm.txes.state = 'confirmed' AND evm.receipts.block_number <= $1 AND evm.txes.evm_chain_id = $2` var dbEtxs []DbEthTx - err = o.q.SelectContext(ctx, &dbEtxs, sql, chainID.String()) + err = o.q.SelectContext(ctx, &dbEtxs, sql, finalizedBlockNum, chainID.String()) if len(dbEtxs) == 0 { return nil } diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index fee2fc2d377..be5b63c83ff 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -1856,15 +1856,17 @@ func TestORM_FindTransactionsByState(t *testing.T) { txStore := cltest.NewTestTxStore(t, db) kst := cltest.NewKeyStore(t, db) _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + finalizedBlockNum := int64(100) mustInsertUnstartedTx(t, txStore, fromAddress) mustInsertInProgressEthTxWithAttempt(t, txStore, 0, fromAddress) mustInsertUnconfirmedEthTxWithAttemptState(t, txStore, 1, fromAddress, txmgrtypes.TxAttemptBroadcast) - mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 2, 100, time.Now(), fromAddress) - mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 3, 100) + mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 2, finalizedBlockNum, time.Now(), fromAddress) + mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 3, finalizedBlockNum + 1) + mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 4, finalizedBlockNum) mustInsertFatalErrorEthTx(t, txStore, fromAddress) - txs, err := txStore.FindConfirmedTxes(ctx, testutils.FixtureChainID) + txs, err := txStore.FindTxesToMarkFinalized(ctx, finalizedBlockNum, testutils.FixtureChainID) require.NoError(t, err) require.Len(t, txs, 1) } diff --git a/core/chains/evm/txmgr/finalizer.go b/core/chains/evm/txmgr/finalizer.go index 94c54e534fa..9df979629fd 100644 --- a/core/chains/evm/txmgr/finalizer.go +++ b/core/chains/evm/txmgr/finalizer.go @@ -7,7 +7,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -19,12 +19,11 @@ import ( var _ Finalizer = (*evmFinalizer)(nil) -// processHeadTimeout represents a sanity limit on how long ProcessHead -// should take to complete +// processHeadTimeout represents a sanity limit on how long ProcessHead should take to complete const processHeadTimeout = 10 * time.Minute type finalizerTxStore interface { - FindConfirmedTxes(ctx context.Context, chainID *big.Int) ([]*Tx, error) + FindTxesToMarkFinalized(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) ([]*Tx, error) UpdateTxesFinalized(ctx context.Context, txs []int64, chainId *big.Int) error } @@ -151,34 +150,38 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized earliestBlockNumInChain := latestFinalizedHead.EarliestHeadInChain().BlockNumber() f.lggr.Debugw("processing latest finalized head", "block num", latestFinalizedHead.BlockNumber(), "block hash", latestFinalizedHead.BlockHash(), "earliest block num in chain", earliestBlockNumInChain) - // Retrieve all confirmed transactions, loaded with attempts and receipts - unfinalizedTxs, err := f.txStore.FindConfirmedTxes(ctx, f.chainId) + // Retrieve all confirmed transactions with receipts older than or equal to the finalized block, loaded with attempts and receipts + unfinalizedTxs, err := f.txStore.FindTxesToMarkFinalized(ctx, latestFinalizedHead.BlockNumber(), f.chainId) if err != nil { return fmt.Errorf("failed to retrieve confirmed transactions: %w", err) } var finalizedTxs []*Tx // Group by block hash transactions whose receipts cannot be validated using the cached heads - receiptBlockHashToTx := make(map[common.Hash][]*Tx) + blockNumToTxesMap := make(map[int64][]*Tx) // Find transactions with receipt block nums older than the latest finalized block num and block hashes still in chain for _, tx := range unfinalizedTxs { receipt := tx.GetReceipt() + // The tx store query ensures transactions have receipts but leaving this check here for a belts and braces approach if receipt == nil || receipt.IsZero() || receipt.IsUnmined() { f.lggr.AssumptionViolationw("invalid receipt found for confirmed transaction", "tx", tx, "receipt", receipt) continue } - // Receipt newer than latest finalized head block num + // The tx store query only returns transactions with receipts older than or equal to the finalized block but leaving this check here for a belts and braces approach if receipt.GetBlockNumber().Cmp(big.NewInt(latestFinalizedHead.BlockNumber())) > 0 { continue } // Receipt block num older than earliest head in chain. Validate hash using RPC call later - if receipt.GetBlockNumber().Int64() < earliestBlockNumInChain { - receiptBlockHashToTx[receipt.GetBlockHash()] = append(receiptBlockHashToTx[receipt.GetBlockHash()], tx) + if receipt.GetBlockNumber().Cmp(big.NewInt(earliestBlockNumInChain)) < 0 { + blockNumToTxesMap[receipt.GetBlockNumber().Int64()] = append(blockNumToTxesMap[receipt.GetBlockNumber().Int64()], tx) continue } blockHashInChain := latestFinalizedHead.HashAtHeight(receipt.GetBlockNumber().Int64()) // Receipt block hash does not match the block hash in chain. Transaction has been re-org'd out but DB state has not been updated yet if blockHashInChain.String() != receipt.GetBlockHash().String() { + // Log a critical error if a transaction is marked as confirmed with a receipt older than the finalized block + // This scenario could potentially point to a re-org'd transaction the Confirmer has lost track of + f.lggr.Criticalw("found confirmed transaction with re-org'd receipt older than finalized block", "tx", tx, "receipt", receipt, "onchainBlockHash", blockHashInChain.String()) continue } finalizedTxs = append(finalizedTxs, tx) @@ -186,7 +189,7 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized // Check if block hashes exist for receipts on-chain older than the earliest cached head // Transactions are grouped by their receipt block hash to avoid repeat requests on the same hash in case transactions were confirmed in the same block - validatedReceiptTxs, err := f.batchCheckReceiptHashesOnchain(ctx, receiptBlockHashToTx, latestFinalizedHead.BlockNumber()) + validatedReceiptTxs, err := f.batchCheckReceiptHashesOnchain(ctx, blockNumToTxesMap) if err != nil { // Do not error out to allow transactions that did not need RPC validation to still be marked as finalized // The transactions failed to be validated will be checked again in the next round @@ -203,18 +206,18 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized return nil } -func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, receiptMap map[common.Hash][]*Tx, latestFinalizedBlockNum int64) ([]*Tx, error) { - if len(receiptMap) == 0 { +func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, blockNumToTxesMap map[int64][]*Tx) ([]*Tx, error) { + if len(blockNumToTxesMap) == 0 { return nil, nil } // Group the RPC batch calls in groups of rpcBatchSize var rpcBatchGroups [][]rpc.BatchElem var rpcBatch []rpc.BatchElem - for hash := range receiptMap { + for blockNum := range blockNumToTxesMap { elem := rpc.BatchElem{ - Method: "eth_getBlockByHash", + Method: "eth_getBlockByNumber", Args: []any{ - hash, + hexutil.EncodeBig(big.NewInt(blockNum)), false, }, Result: new(evmtypes.Head), @@ -225,6 +228,9 @@ func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, recei rpcBatch = []rpc.BatchElem{} } } + if len(rpcBatch) > 0 { + rpcBatchGroups = append(rpcBatchGroups, rpcBatch) + } var finalizedTxs []*Tx for _, rpcBatch := range rpcBatchGroups { @@ -237,20 +243,28 @@ func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, recei for _, req := range rpcBatch { if req.Error != nil { // Continue if particular RPC call failed so other txs can still be considered for finalization - f.lggr.Debugw("failed to find block by hash", "hash", req.Args[0]) + f.lggr.Debugw("failed to find block by number", "blockNum", req.Args[0]) continue } head := req.Result.(*evmtypes.Head) if head == nil { - // Continue if particular RPC call yielded a nil head so other txs can still be considered for finalization - f.lggr.Debugw("failed to find block by hash", "hash", req.Args[0]) + // Continue if particular RPC call yielded a nil block so other txs can still be considered for finalization + f.lggr.Debugw("failed to find block by number", "blockNum", req.Args[0]) continue } - // Confirmed receipt's block hash exists on-chain - // Add to finalized list if block num less than or equal to the latest finalized head block num - if head.BlockNumber() <= latestFinalizedBlockNum { - txs := receiptMap[head.BlockHash()] - finalizedTxs = append(finalizedTxs, txs...) + txs := blockNumToTxesMap[head.BlockNumber()] + // Check if transaction receipts match the block hash at the given block num + // If they do not, the transactions may have been re-org'd out + // The expectation is for the Confirmer to pick up on these re-orgs and get the transaction included + for _, tx := range txs { + receipt := tx.GetReceipt() + if receipt.GetBlockHash().String() == head.BlockHash().String() { + finalizedTxs = append(finalizedTxs, tx) + } else { + // Log a critical error if a transaction is marked as confirmed with a receipt older than the finalized block + // This scenario could potentially point to a re-org'd transaction the Confirmer has lost track of + f.lggr.Criticalw("found confirmed transaction with re-org'd receipt older than finalized block", "tx", tx, "receipt", receipt, "onchainBlockHash", head.BlockHash().String()) + } } } } diff --git a/core/chains/evm/txmgr/finalizer_test.go b/core/chains/evm/txmgr/finalizer_test.go index 50d46a61c08..a6f0d13f99f 100644 --- a/core/chains/evm/txmgr/finalizer_test.go +++ b/core/chains/evm/txmgr/finalizer_test.go @@ -2,10 +2,12 @@ package txmgr_test import ( "errors" + "math/big" "testing" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" "github.com/google/uuid" "github.com/stretchr/testify/mock" @@ -171,14 +173,16 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { rpcElements := args.Get(1).([]rpc.BatchElem) require.Equal(t, 1, len(rpcElements)) - require.Equal(t, "eth_getBlockByHash", rpcElements[0].Method) + require.Equal(t, "eth_getBlockByNumber", rpcElements[0].Method) require.Equal(t, false, rpcElements[0].Args[1]) - reqHash := rpcElements[0].Args[0].(common.Hash).String() + reqBlockNum := rpcElements[0].Args[0].(string) + req1BlockNum := hexutil.EncodeBig(big.NewInt(head.Parent.Number - 2)) + req2BlockNum := hexutil.EncodeBig(big.NewInt(head.Parent.Number - 1)) var headResult evmtypes.Head - if receiptHash1.String() == reqHash { + if req1BlockNum == reqBlockNum { headResult = evmtypes.Head{Number: head.Parent.Number - 2, Hash: receiptHash1} - } else if receiptHash2.String() == reqHash { + } else if req2BlockNum == reqBlockNum { headResult = evmtypes.Head{Number: head.Parent.Number - 1, Hash: receiptHash2} } else { require.Fail(t, "unrecognized block hash")