Skip to content

Commit

Permalink
Updated finalizer batch RPC validation to use blockByNumber and added…
Browse files Browse the repository at this point in the history
… filter to DB query
  • Loading branch information
amit-momin committed Jul 10, 2024
1 parent 911d342 commit 005d972
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 36 deletions.
2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type TransactionStore[
FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error)
FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error)
FindConfirmedTxes(ctx context.Context, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxesToMarkFinalized(ctx context.Context, finalizedBlockNum int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
GetAbandonedTransactionsByBatch(ctx context.Context, chainID CHAIN_ID, enabledAddrs []ADDR, offset, limit uint) (txs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Expand Down
11 changes: 7 additions & 4 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2041,15 +2041,18 @@ func (o *evmTxStore) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context,
return err
}

// Returns all confirmed transactions
func (o *evmTxStore) FindConfirmedTxes(ctx context.Context, chainID *big.Int) (txes []*Tx, err error) {
// Returns all confirmed transactions with receipt block nums older than or equal to the finalized block number
func (o *evmTxStore) FindTxesToMarkFinalized(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) (txes []*Tx, err error) {
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()
err = o.Transact(ctx, true, func(orm *evmTxStore) error {
sql := "SELECT * FROM evm.txes WHERE state = 'confirmed' AND evm_chain_id = $1"
sql := `SELECT evm.txes.* FROM evm.txes
INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id
INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash
WHERE evm.txes.state = 'confirmed' AND evm.receipts.block_number <= $1 AND evm.txes.evm_chain_id = $2`
var dbEtxs []DbEthTx
err = o.q.SelectContext(ctx, &dbEtxs, sql, chainID.String())
err = o.q.SelectContext(ctx, &dbEtxs, sql, finalizedBlockNum, chainID.String())
if len(dbEtxs) == 0 {
return nil
}
Expand Down
8 changes: 5 additions & 3 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1856,15 +1856,17 @@ func TestORM_FindTransactionsByState(t *testing.T) {
txStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())
finalizedBlockNum := int64(100)

mustInsertUnstartedTx(t, txStore, fromAddress)
mustInsertInProgressEthTxWithAttempt(t, txStore, 0, fromAddress)
mustInsertUnconfirmedEthTxWithAttemptState(t, txStore, 1, fromAddress, txmgrtypes.TxAttemptBroadcast)
mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 2, 100, time.Now(), fromAddress)
mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 3, 100)
mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 2, finalizedBlockNum, time.Now(), fromAddress)
mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 3, finalizedBlockNum + 1)
mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 4, finalizedBlockNum)
mustInsertFatalErrorEthTx(t, txStore, fromAddress)

txs, err := txStore.FindConfirmedTxes(ctx, testutils.FixtureChainID)
txs, err := txStore.FindTxesToMarkFinalized(ctx, finalizedBlockNum, testutils.FixtureChainID)
require.NoError(t, err)
require.Len(t, txs, 1)
}
Expand Down
62 changes: 38 additions & 24 deletions core/chains/evm/txmgr/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand All @@ -19,12 +19,11 @@ import (

var _ Finalizer = (*evmFinalizer)(nil)

// processHeadTimeout represents a sanity limit on how long ProcessHead
// should take to complete
// processHeadTimeout represents a sanity limit on how long ProcessHead should take to complete
const processHeadTimeout = 10 * time.Minute

type finalizerTxStore interface {
FindConfirmedTxes(ctx context.Context, chainID *big.Int) ([]*Tx, error)
FindTxesToMarkFinalized(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) ([]*Tx, error)
UpdateTxesFinalized(ctx context.Context, txs []int64, chainId *big.Int) error
}

Expand Down Expand Up @@ -151,42 +150,46 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized
earliestBlockNumInChain := latestFinalizedHead.EarliestHeadInChain().BlockNumber()
f.lggr.Debugw("processing latest finalized head", "block num", latestFinalizedHead.BlockNumber(), "block hash", latestFinalizedHead.BlockHash(), "earliest block num in chain", earliestBlockNumInChain)

// Retrieve all confirmed transactions, loaded with attempts and receipts
unfinalizedTxs, err := f.txStore.FindConfirmedTxes(ctx, f.chainId)
// Retrieve all confirmed transactions with receipts older than or equal to the finalized block, loaded with attempts and receipts
unfinalizedTxs, err := f.txStore.FindTxesToMarkFinalized(ctx, latestFinalizedHead.BlockNumber(), f.chainId)
if err != nil {
return fmt.Errorf("failed to retrieve confirmed transactions: %w", err)
}

var finalizedTxs []*Tx
// Group by block hash transactions whose receipts cannot be validated using the cached heads
receiptBlockHashToTx := make(map[common.Hash][]*Tx)
blockNumToTxesMap := make(map[int64][]*Tx)
// Find transactions with receipt block nums older than the latest finalized block num and block hashes still in chain
for _, tx := range unfinalizedTxs {
receipt := tx.GetReceipt()
// The tx store query ensures transactions have receipts but leaving this check here for a belts and braces approach
if receipt == nil || receipt.IsZero() || receipt.IsUnmined() {
f.lggr.AssumptionViolationw("invalid receipt found for confirmed transaction", "tx", tx, "receipt", receipt)
continue
}
// Receipt newer than latest finalized head block num
// The tx store query only returns transactions with receipts older than or equal to the finalized block but leaving this check here for a belts and braces approach
if receipt.GetBlockNumber().Cmp(big.NewInt(latestFinalizedHead.BlockNumber())) > 0 {
continue
}
// Receipt block num older than earliest head in chain. Validate hash using RPC call later
if receipt.GetBlockNumber().Int64() < earliestBlockNumInChain {
receiptBlockHashToTx[receipt.GetBlockHash()] = append(receiptBlockHashToTx[receipt.GetBlockHash()], tx)
if receipt.GetBlockNumber().Cmp(big.NewInt(earliestBlockNumInChain)) < 0 {
blockNumToTxesMap[receipt.GetBlockNumber().Int64()] = append(blockNumToTxesMap[receipt.GetBlockNumber().Int64()], tx)
continue
}
blockHashInChain := latestFinalizedHead.HashAtHeight(receipt.GetBlockNumber().Int64())
// Receipt block hash does not match the block hash in chain. Transaction has been re-org'd out but DB state has not been updated yet
if blockHashInChain.String() != receipt.GetBlockHash().String() {
// Log a critical error if a transaction is marked as confirmed with a receipt older than the finalized block
// This scenario could potentially point to a re-org'd transaction the Confirmer has lost track of
f.lggr.Criticalw("found confirmed transaction with re-org'd receipt older than finalized block", "tx", tx, "receipt", receipt, "onchainBlockHash", blockHashInChain.String())
continue
}
finalizedTxs = append(finalizedTxs, tx)
}

// Check if block hashes exist for receipts on-chain older than the earliest cached head
// Transactions are grouped by their receipt block hash to avoid repeat requests on the same hash in case transactions were confirmed in the same block
validatedReceiptTxs, err := f.batchCheckReceiptHashesOnchain(ctx, receiptBlockHashToTx, latestFinalizedHead.BlockNumber())
validatedReceiptTxs, err := f.batchCheckReceiptHashesOnchain(ctx, blockNumToTxesMap)
if err != nil {
// Do not error out to allow transactions that did not need RPC validation to still be marked as finalized
// The transactions failed to be validated will be checked again in the next round
Expand All @@ -203,18 +206,18 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized
return nil
}

func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, receiptMap map[common.Hash][]*Tx, latestFinalizedBlockNum int64) ([]*Tx, error) {
if len(receiptMap) == 0 {
func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, blockNumToTxesMap map[int64][]*Tx) ([]*Tx, error) {
if len(blockNumToTxesMap) == 0 {
return nil, nil
}
// Group the RPC batch calls in groups of rpcBatchSize
var rpcBatchGroups [][]rpc.BatchElem
var rpcBatch []rpc.BatchElem
for hash := range receiptMap {
for blockNum := range blockNumToTxesMap {
elem := rpc.BatchElem{
Method: "eth_getBlockByHash",
Method: "eth_getBlockByNumber",
Args: []any{
hash,
hexutil.EncodeBig(big.NewInt(blockNum)),
false,
},
Result: new(evmtypes.Head),
Expand All @@ -225,6 +228,9 @@ func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, recei
rpcBatch = []rpc.BatchElem{}
}
}
if len(rpcBatch) > 0 {
rpcBatchGroups = append(rpcBatchGroups, rpcBatch)
}

var finalizedTxs []*Tx
for _, rpcBatch := range rpcBatchGroups {
Expand All @@ -237,20 +243,28 @@ func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, recei
for _, req := range rpcBatch {
if req.Error != nil {
// Continue if particular RPC call failed so other txs can still be considered for finalization
f.lggr.Debugw("failed to find block by hash", "hash", req.Args[0])
f.lggr.Debugw("failed to find block by number", "blockNum", req.Args[0])
continue
}
head := req.Result.(*evmtypes.Head)
if head == nil {
// Continue if particular RPC call yielded a nil head so other txs can still be considered for finalization
f.lggr.Debugw("failed to find block by hash", "hash", req.Args[0])
// Continue if particular RPC call yielded a nil block so other txs can still be considered for finalization
f.lggr.Debugw("failed to find block by number", "blockNum", req.Args[0])
continue
}
// Confirmed receipt's block hash exists on-chain
// Add to finalized list if block num less than or equal to the latest finalized head block num
if head.BlockNumber() <= latestFinalizedBlockNum {
txs := receiptMap[head.BlockHash()]
finalizedTxs = append(finalizedTxs, txs...)
txs := blockNumToTxesMap[head.BlockNumber()]
// Check if transaction receipts match the block hash at the given block num
// If they do not, the transactions may have been re-org'd out
// The expectation is for the Confirmer to pick up on these re-orgs and get the transaction included
for _, tx := range txs {
receipt := tx.GetReceipt()
if receipt.GetBlockHash().String() == head.BlockHash().String() {
finalizedTxs = append(finalizedTxs, tx)
} else {
// Log a critical error if a transaction is marked as confirmed with a receipt older than the finalized block
// This scenario could potentially point to a re-org'd transaction the Confirmer has lost track of
f.lggr.Criticalw("found confirmed transaction with re-org'd receipt older than finalized block", "tx", tx, "receipt", receipt, "onchainBlockHash", head.BlockHash().String())
}
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions core/chains/evm/txmgr/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package txmgr_test

import (
"errors"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
"github.com/google/uuid"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -171,14 +173,16 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) {
rpcElements := args.Get(1).([]rpc.BatchElem)
require.Equal(t, 1, len(rpcElements))

require.Equal(t, "eth_getBlockByHash", rpcElements[0].Method)
require.Equal(t, "eth_getBlockByNumber", rpcElements[0].Method)
require.Equal(t, false, rpcElements[0].Args[1])

reqHash := rpcElements[0].Args[0].(common.Hash).String()
reqBlockNum := rpcElements[0].Args[0].(string)
req1BlockNum := hexutil.EncodeBig(big.NewInt(head.Parent.Number - 2))
req2BlockNum := hexutil.EncodeBig(big.NewInt(head.Parent.Number - 1))
var headResult evmtypes.Head
if receiptHash1.String() == reqHash {
if req1BlockNum == reqBlockNum {
headResult = evmtypes.Head{Number: head.Parent.Number - 2, Hash: receiptHash1}
} else if receiptHash2.String() == reqHash {
} else if req2BlockNum == reqBlockNum {
headResult = evmtypes.Head{Number: head.Parent.Number - 1, Hash: receiptHash2}
} else {
require.Fail(t, "unrecognized block hash")
Expand Down

0 comments on commit 005d972

Please sign in to comment.