diff --git a/common/txmgr/mocks/tx_manager.go b/common/txmgr/mocks/tx_manager.go index 89abf1dea51..27077218f6e 100644 --- a/common/txmgr/mocks/tx_manager.go +++ b/common/txmgr/mocks/tx_manager.go @@ -9,6 +9,8 @@ import ( feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" mock "github.com/stretchr/testify/mock" + null "gopkg.in/guregu/null.v4" + txmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" @@ -35,6 +37,30 @@ func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Close( return r0 } +// CountTransactionsByState provides a mock function with given fields: ctx, state +func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (uint32, error) { + ret := _m.Called(ctx, state) + + var r0 uint32 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState) (uint32, error)); ok { + return rf(ctx, state) + } + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState) uint32); ok { + r0 = rf(ctx, state) + } else { + r0 = ret.Get(0).(uint32) + } + + if rf, ok := ret.Get(1).(func(context.Context, txmgrtypes.TxState) error); ok { + r1 = rf(ctx, state) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // CreateTransaction provides a mock function with given fields: ctx, txRequest func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, txRequest) @@ -59,6 +85,54 @@ func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Create return r0, r1 } +// FindEarliestUnconfirmedBroadcastTime provides a mock function with given fields: ctx +func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (null.Time, error) { + ret := _m.Called(ctx) + + var r0 null.Time + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (null.Time, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) null.Time); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(null.Time) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FindEarliestUnconfirmedTxAttemptBlock provides a mock function with given fields: ctx +func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (null.Int, error) { + ret := _m.Called(ctx) + + var r0 null.Int + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (null.Int, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) null.Int); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(null.Int) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindTxesByMetaFieldAndStates provides a mock function with given fields: ctx, metaField, metaValue, states, chainID func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindTxesByMetaFieldAndStates(ctx context.Context, metaField string, metaValue string, states []txmgrtypes.TxState, chainID *big.Int) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, metaField, metaValue, states, chainID) diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 0a50bb66385..3fbdb852f8b 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -10,10 +10,10 @@ import ( "time" "github.com/google/uuid" + nullv4 "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" @@ -55,6 +55,9 @@ type TxManager[ FindTxesWithMetaFieldByReceiptBlockNum(ctx context.Context, metaField string, blockNum int64, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) // Find transactions loaded with transaction attempts and receipts by transaction IDs and states FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error) + FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error) + CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) } type reset struct { @@ -576,6 +579,18 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWi return } +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error) { + return b.txStore.FindEarliestUnconfirmedBroadcastTime(ctx, b.chainID) +} + +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error) { + return b.txStore.FindEarliestUnconfirmedTxAttemptBlock(ctx, b.chainID) +} + +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) { + return b.txStore.CountTransactionsByState(ctx, state, b.chainID) +} + type NullTxManager[ CHAIN_ID types.ID, HEAD types.Head[BLOCK_HASH], @@ -642,3 +657,15 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Fin func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { return txes, errors.New(n.ErrMsg) } + +func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error) { + return nullv4.Time{}, errors.New(n.ErrMsg) +} + +func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error) { + return nullv4.Int{}, errors.New(n.ErrMsg) +} + +func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) { + return count, errors.New(n.ErrMsg) +} diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 0a7738fd68a..df1528a4c24 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -9,6 +9,8 @@ import ( feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" mock "github.com/stretchr/testify/mock" + null "gopkg.in/guregu/null.v4" + time "time" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" @@ -56,6 +58,30 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { _m.Called() } +// CountTransactionsByState provides a mock function with given fields: ctx, state, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { + ret := _m.Called(ctx, state, chainID) + + var r0 uint32 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) (uint32, error)); ok { + return rf(ctx, state, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) uint32); ok { + r0 = rf(ctx, state, chainID) + } else { + r0 = ret.Get(0).(uint32) + } + + if rf, ok := ret.Get(1).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) error); ok { + r1 = rf(ctx, state, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // CountUnconfirmedTransactions provides a mock function with given fields: ctx, fromAddress, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountUnconfirmedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (uint32, error) { ret := _m.Called(ctx, fromAddress, chainID) @@ -142,6 +168,54 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInPro return r0 } +// FindEarliestUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Time + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (null.Time, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) null.Time); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Time) + } + + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FindEarliestUnconfirmedTxAttemptBlock provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Int + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (null.Int, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) null.Int); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Int) + } + + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindLatestSequence provides a mock function with given fields: ctx, fromAddress, chainId func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLatestSequence(ctx context.Context, fromAddress ADDR, chainId CHAIN_ID) (SEQ, error) { ret := _m.Called(ctx, fromAddress, chainId) diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 251135795fd..57ecf28d589 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -6,6 +6,7 @@ import ( "time" "github.com/google/uuid" + "gopkg.in/guregu/null.v4" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" "github.com/smartcontractkit/chainlink/v2/common/types" @@ -64,6 +65,7 @@ type TransactionStore[ FEE feetypes.Fee, ] interface { CountUnconfirmedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (count uint32, err error) + CountTransactionsByState(ctx context.Context, state TxState, chainID CHAIN_ID) (count uint32, err error) CountUnstartedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (count uint32, err error) CreateTransaction(ctx context.Context, txRequest TxRequest[ADDR, TX_HASH], chainID CHAIN_ID) (tx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) DeleteInProgressAttempt(ctx context.Context, attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error @@ -79,6 +81,8 @@ type TransactionStore[ FindTxWithSequence(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindNextUnstartedTransactionFromAddress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) + FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) (txs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index e36ca611e16..730809e8dda 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1131,6 +1131,39 @@ ORDER BY nonce ASC return etxs, pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed") } +func (o *evmTxStore) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (broadcastAt nullv4.Time, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + err = qq.Transaction(func(tx pg.Queryer) error { + if err = qq.QueryRowContext(ctx, `SELECT min(initial_broadcast_at) FROM evm.txes WHERE state = 'unconfirmed' AND evm_chain_id = $1`, chainID.String()).Scan(&broadcastAt); err != nil { + return fmt.Errorf("failed to query for unconfirmed eth_tx count: %w", err) + } + return nil + }, pg.OptReadOnlyTx()) + return broadcastAt, err +} + +func (o *evmTxStore) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID *big.Int) (earliestUnconfirmedTxBlock nullv4.Int, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + err = qq.Transaction(func(tx pg.Queryer) error { + err = qq.QueryRowContext(ctx, ` +SELECT MIN(broadcast_before_block_num) FROM evm.tx_attempts +JOIN evm.txes ON evm.txes.id = evm.tx_attempts.eth_tx_id +WHERE evm.txes.state = 'unconfirmed' +AND evm_chain_id = $1`, chainID.String()).Scan(&earliestUnconfirmedTxBlock) + if err != nil { + return fmt.Errorf("failed to query for earliest unconfirmed tx block: %w", err) + } + return nil + }, pg.OptReadOnlyTx()) + return earliestUnconfirmedTxBlock, err +} + func (o *evmTxStore) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID *big.Int) (finalized bool, err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) @@ -1733,6 +1766,20 @@ func (o *evmTxStore) CountUnconfirmedTransactions(ctx context.Context, fromAddre return o.countTransactionsWithState(ctx, fromAddress, txmgr.TxUnconfirmed, chainID) } +// CountTransactionsByState returns the number of transactions with any fromAddress in the given state +func (o *evmTxStore) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID *big.Int) (count uint32, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + err = qq.Get(&count, `SELECT count(*) FROM evm.txes WHERE state = $1 AND evm_chain_id = $2`, + state, chainID.String()) + if err != nil { + return 0, fmt.Errorf("failed to CountTransactionsByState: %w", err) + } + return count, nil +} + // CountUnstartedTransactions returns the number of unconfirmed transactions func (o *evmTxStore) CountUnstartedTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (count uint32, err error) { return o.countTransactionsWithState(ctx, fromAddress, txmgr.TxUnstarted, chainID) diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index e68641735ee..0b4287b6f6c 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -847,6 +847,61 @@ func TestORM_FindTransactionsConfirmedInBlockRange(t *testing.T) { }) } +func TestORM_FindEarliestUnconfirmedBroadcastTime(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + + t.Run("no unconfirmed eth txes", func(t *testing.T) { + broadcastAt, err := txStore.FindEarliestUnconfirmedBroadcastTime(testutils.Context(t), ethClient.ConfiguredChainID()) + require.NoError(t, err) + require.False(t, broadcastAt.Valid) + }) + + t.Run("verify broadcast time", func(t *testing.T) { + tx := cltest.MustInsertUnconfirmedEthTx(t, txStore, 123, fromAddress) + broadcastAt, err := txStore.FindEarliestUnconfirmedBroadcastTime(testutils.Context(t), ethClient.ConfiguredChainID()) + require.NoError(t, err) + require.True(t, broadcastAt.Ptr().Equal(*tx.BroadcastAt)) + }) +} + +func TestORM_FindEarliestUnconfirmedTxAttemptBlock(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + _, fromAddress2 := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + + t.Run("no earliest unconfirmed tx block", func(t *testing.T) { + earliestBlock, err := txStore.FindEarliestUnconfirmedTxAttemptBlock(testutils.Context(t), ethClient.ConfiguredChainID()) + require.NoError(t, err) + require.False(t, earliestBlock.Valid) + }) + + t.Run("verify earliest unconfirmed tx block", func(t *testing.T) { + var blockNum int64 = 2 + tx := mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 123, blockNum, time.Now(), fromAddress) + _ = mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 123, blockNum, time.Now().Add(time.Minute), fromAddress2) + err := txStore.UpdateTxsUnconfirmed(testutils.Context(t), []int64{tx.ID}) + require.NoError(t, err) + + earliestBlock, err := txStore.FindEarliestUnconfirmedTxAttemptBlock(testutils.Context(t), ethClient.ConfiguredChainID()) + require.NoError(t, err) + require.True(t, earliestBlock.Valid) + require.Equal(t, blockNum, earliestBlock.Int64) + }) +} + func TestORM_SaveInsufficientEthAttempt(t *testing.T) { t.Parallel() @@ -1511,6 +1566,27 @@ func TestORM_CountUnconfirmedTransactions(t *testing.T) { assert.Equal(t, int(count), 3) } +func TestORM_CountTransactionsByState(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, nil) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + + _, fromAddress1 := cltest.MustInsertRandomKey(t, ethKeyStore) + _, fromAddress2 := cltest.MustInsertRandomKey(t, ethKeyStore) + _, fromAddress3 := cltest.MustInsertRandomKey(t, ethKeyStore) + + cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress1) + cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 1, fromAddress2) + cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 2, fromAddress3) + + count, err := txStore.CountTransactionsByState(testutils.Context(t), txmgrcommon.TxUnconfirmed, &cltest.FixtureChainID) + require.NoError(t, err) + assert.Equal(t, int(count), 3) +} + func TestORM_CountUnstartedTransactions(t *testing.T) { t.Parallel() diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index 00efc1add98..bf3088b2aa1 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -14,6 +14,8 @@ import ( mock "github.com/stretchr/testify/mock" + null "gopkg.in/guregu/null.v4" + time "time" types "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" @@ -59,6 +61,30 @@ func (_m *EvmTxStore) Close() { _m.Called() } +// CountTransactionsByState provides a mock function with given fields: ctx, state, chainID +func (_m *EvmTxStore) CountTransactionsByState(ctx context.Context, state types.TxState, chainID *big.Int) (uint32, error) { + ret := _m.Called(ctx, state, chainID) + + var r0 uint32 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, types.TxState, *big.Int) (uint32, error)); ok { + return rf(ctx, state, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, types.TxState, *big.Int) uint32); ok { + r0 = rf(ctx, state, chainID) + } else { + r0 = ret.Get(0).(uint32) + } + + if rf, ok := ret.Get(1).(func(context.Context, types.TxState, *big.Int) error); ok { + r1 = rf(ctx, state, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // CountUnconfirmedTransactions provides a mock function with given fields: ctx, fromAddress, chainID func (_m *EvmTxStore) CountUnconfirmedTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (uint32, error) { ret := _m.Called(ctx, fromAddress, chainID) @@ -145,6 +171,54 @@ func (_m *EvmTxStore) DeleteInProgressAttempt(ctx context.Context, attempt types return r0 } +// FindEarliestUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (null.Time, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Time + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (null.Time, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) null.Time); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Time) + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FindEarliestUnconfirmedTxAttemptBlock provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID *big.Int) (null.Int, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Int + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (null.Int, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) null.Int); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Int) + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindLatestSequence provides a mock function with given fields: ctx, fromAddress, chainId func (_m *EvmTxStore) FindLatestSequence(ctx context.Context, fromAddress common.Address, chainId *big.Int) (evmtypes.Nonce, error) { ret := _m.Called(ctx, fromAddress, chainId) diff --git a/core/internal/cltest/mocks.go b/core/internal/cltest/mocks.go index d49e94557e3..073b3ba246c 100644 --- a/core/internal/cltest/mocks.go +++ b/core/internal/cltest/mocks.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + "github.com/jmoiron/sqlx" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -409,6 +411,18 @@ func NewLegacyChainsWithMockChain(t testing.TB, ethClient evmclient.Client, cfg } +func NewLegacyChainsWithMockChainAndTxManager(t testing.TB, ethClient evmclient.Client, cfg legacyevm.AppConfig, txm txmgr.TxManager) legacyevm.LegacyChainContainer { + ch := new(evmmocks.Chain) + ch.On("Client").Return(ethClient) + ch.On("Logger").Return(logger.TestLogger(t)) + scopedCfg := evmtest.NewChainScopedConfig(t, cfg) + ch.On("ID").Return(scopedCfg.EVM().ChainID()) + ch.On("Config").Return(scopedCfg) + ch.On("TxManager").Return(txm) + + return NewLegacyChainsWithChain(ch, cfg) +} + func NewLegacyChainsWithChain(ch legacyevm.Chain, cfg legacyevm.AppConfig) legacyevm.LegacyChainContainer { m := map[string]legacyevm.Chain{ch.ID().String(): ch} return legacyevm.NewLegacyChains(m, cfg.EVMConfigs()) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 3d06789e7bc..1c9cd106ba9 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -238,11 +238,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { globalLogger.Info("DatabaseBackup: periodic database backups are disabled. To enable automatic backups, set Database.Backup.Mode=lite or Database.Backup.Mode=full") } - srvcs = append(srvcs, eventBroadcaster, mailMon) - srvcs = append(srvcs, relayerChainInterops.Services()...) - promReporter := promreporter.NewPromReporter(db.DB, globalLogger) - srvcs = append(srvcs, promReporter) - // pool must be started before all relayers and stopped after them srvcs = append(srvcs, opts.MercuryPool) @@ -254,6 +249,11 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("no evm chains found") } + srvcs = append(srvcs, eventBroadcaster, mailMon) + srvcs = append(srvcs, relayerChainInterops.Services()...) + promReporter := promreporter.NewPromReporter(db.DB, legacyEVMChains, globalLogger) + srvcs = append(srvcs, promReporter) + // Initialize Local Users ORM and Authentication Provider specified in config // BasicAdminUsersORM is initialized and required regardless of separate Authentication Provider localAdminUsersORM := localauth.NewORM(db, cfg.WebServer().SessionTimeout().Duration(), globalLogger, cfg.Database(), auditLogger) diff --git a/core/services/promreporter/prom_reporter.go b/core/services/promreporter/prom_reporter.go index d115b1022c1..3e1444a6da1 100644 --- a/core/services/promreporter/prom_reporter.go +++ b/core/services/promreporter/prom_reporter.go @@ -3,18 +3,22 @@ package promreporter import ( "context" "database/sql" + "fmt" "math/big" "sync" "time" + txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/multierr" - "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/chainlink-common/pkg/services" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -24,6 +28,7 @@ type ( promReporter struct { services.StateMachine db *sql.DB + chains legacyevm.LegacyChainContainer lggr logger.Logger backend PrometheusBackend newHeads *utils.Mailbox[*evmtypes.Head] @@ -86,7 +91,7 @@ func (defaultBackend) SetPipelineTaskRunsQueued(n int) { promPipelineRunsQueued.Set(float64(n)) } -func NewPromReporter(db *sql.DB, lggr logger.Logger, opts ...interface{}) *promReporter { +func NewPromReporter(db *sql.DB, chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, opts ...interface{}) *promReporter { var backend PrometheusBackend = defaultBackend{} period := 15 * time.Second for _, opt := range opts { @@ -101,6 +106,7 @@ func NewPromReporter(db *sql.DB, lggr logger.Logger, opts ...interface{}) *promR chStop := make(chan struct{}) return &promReporter{ db: db, + chains: chainContainer, lggr: lggr.Named("PromReporter"), backend: backend, newHeads: utils.NewSingleMailbox[*evmtypes.Head](), @@ -161,6 +167,14 @@ func (pr *promReporter) eventLoop() { } } +func (pr *promReporter) getTxm(evmChainID *big.Int) (txmgr.TxManager, error) { + chain, err := pr.chains.Get(evmChainID.String()) + if err != nil { + return nil, fmt.Errorf("failed to get chain: %w", err) + } + return chain.TxManager(), nil +} + func (pr *promReporter) reportHeadMetrics(ctx context.Context, head *evmtypes.Head) { evmChainID := head.EVMChainID.ToInt() err := multierr.Combine( @@ -175,40 +189,49 @@ func (pr *promReporter) reportHeadMetrics(ctx context.Context, head *evmtypes.He } func (pr *promReporter) reportPendingEthTxes(ctx context.Context, evmChainID *big.Int) (err error) { - var unconfirmed int64 - if err := pr.db.QueryRowContext(ctx, `SELECT count(*) FROM evm.txes WHERE state = 'unconfirmed' AND evm_chain_id = $1`, evmChainID.String()).Scan(&unconfirmed); err != nil { - return errors.Wrap(err, "failed to query for unconfirmed eth_tx count") + txm, err := pr.getTxm(evmChainID) + if err != nil { + return fmt.Errorf("failed to get txm: %w", err) + } + + unconfirmed, err := txm.CountTransactionsByState(ctx, txmgrcommon.TxUnconfirmed) + if err != nil { + return fmt.Errorf("failed to query for unconfirmed eth_tx count: %w", err) } - pr.backend.SetUnconfirmedTransactions(evmChainID, unconfirmed) + pr.backend.SetUnconfirmedTransactions(evmChainID, int64(unconfirmed)) return nil } func (pr *promReporter) reportMaxUnconfirmedAge(ctx context.Context, evmChainID *big.Int) (err error) { - var broadcastAt null.Time - now := time.Now() - if err := pr.db.QueryRowContext(ctx, `SELECT min(initial_broadcast_at) FROM evm.txes WHERE state = 'unconfirmed' AND evm_chain_id = $1`, evmChainID.String()).Scan(&broadcastAt); err != nil { - return errors.Wrap(err, "failed to query for unconfirmed eth_tx count") + txm, err := pr.getTxm(evmChainID) + if err != nil { + return fmt.Errorf("failed to get txm: %w", err) } + + broadcastAt, err := txm.FindEarliestUnconfirmedBroadcastTime(ctx) + if err != nil { + return fmt.Errorf("failed to query for min broadcast time: %w", err) + } + var seconds float64 if broadcastAt.Valid { - nanos := now.Sub(broadcastAt.ValueOrZero()) - seconds = float64(nanos) / 1000000000 + seconds = time.Since(broadcastAt.ValueOrZero()).Seconds() } pr.backend.SetMaxUnconfirmedAge(evmChainID, seconds) return nil } func (pr *promReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head *evmtypes.Head) (err error) { - var earliestUnconfirmedTxBlock null.Int - err = pr.db.QueryRowContext(ctx, ` -SELECT MIN(broadcast_before_block_num) FROM evm.tx_attempts -JOIN evm.txes ON evm.txes.id = evm.tx_attempts.eth_tx_id -WHERE evm.txes.state = 'unconfirmed' -AND evm_chain_id = $1 -AND evm.txes.state = 'unconfirmed'`, head.EVMChainID.String()).Scan(&earliestUnconfirmedTxBlock) + txm, err := pr.getTxm(head.EVMChainID.ToInt()) if err != nil { - return errors.Wrap(err, "failed to query for min broadcast_before_block_num") + return fmt.Errorf("failed to get txm: %w", err) } + + earliestUnconfirmedTxBlock, err := txm.FindEarliestUnconfirmedTxAttemptBlock(ctx) + if err != nil { + return fmt.Errorf("failed to query for earliest unconfirmed tx block: %w", err) + } + var blocksUnconfirmed int64 if !earliestUnconfirmedTxBlock.IsZero() { blocksUnconfirmed = head.Number - earliestUnconfirmedTxBlock.ValueOrZero() diff --git a/core/services/promreporter/prom_reporter_test.go b/core/services/promreporter/prom_reporter_test.go index 1f15d94418d..54e3a5d3fab 100644 --- a/core/services/promreporter/prom_reporter_test.go +++ b/core/services/promreporter/prom_reporter_test.go @@ -6,15 +6,21 @@ import ( "testing" "time" + "github.com/jmoiron/sqlx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/promreporter" @@ -25,12 +31,38 @@ func newHead() evmtypes.Head { return evmtypes.Head{Number: 42, EVMChainID: utils.NewBigI(0)} } +func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainContainer { + config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t) + keyStore := cltest.NewKeyStore(t, db, dbConfig).Eth() + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) + lggr := logger.TestLogger(t) + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000) + + txm, err := txmgr.NewTxm( + db, + evmConfig, + evmConfig.GasEstimator(), + evmConfig.Transactions(), + dbConfig, + dbConfig.Listener(), + ethClient, + lggr, + lp, + keyStore, + estimator) + require.NoError(t, err) + + cfg := configtest.NewGeneralConfig(t, nil) + return cltest.NewLegacyChainsWithMockChainAndTxManager(t, ethClient, cfg, txm) +} + func Test_PromReporter_OnNewLongestChain(t *testing.T) { t.Run("with nothing in the database", func(t *testing.T) { - d := pgtest.NewSqlDB(t) + db := pgtest.NewSqlxDB(t) backend := mocks.NewPrometheusBackend(t) - reporter := promreporter.NewPromReporter(d, logger.TestLogger(t), backend, 10*time.Millisecond) + reporter := promreporter.NewPromReporter(db.DB, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond) var subscribeCalls atomic.Int32 @@ -74,7 +106,7 @@ func Test_PromReporter_OnNewLongestChain(t *testing.T) { subscribeCalls.Add(1) }). Return() - reporter := promreporter.NewPromReporter(db.DB, logger.TestLogger(t), backend, 10*time.Millisecond) + reporter := promreporter.NewPromReporter(db.DB, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond) require.NoError(t, reporter.Start(testutils.Context(t))) defer func() { assert.NoError(t, reporter.Close()) }() @@ -91,11 +123,10 @@ func Test_PromReporter_OnNewLongestChain(t *testing.T) { t.Run("with unfinished pipeline task runs", func(t *testing.T) { db := pgtest.NewSqlxDB(t) - pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_task_runs_pipeline_run_id_fkey DEFERRED`) backend := mocks.NewPrometheusBackend(t) - reporter := promreporter.NewPromReporter(db.DB, logger.TestLogger(t), backend, 10*time.Millisecond) + reporter := promreporter.NewPromReporter(db.DB, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond) cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 1) cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 1) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 929fec5a3e6..6af224157a7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -56,6 +56,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `PromReporter` no longer directly reads txm related status from the db, and instead uses the txStore API. - `L2Suggested` mode is now called `SuggestedPrice` - Console logs will now escape (non-whitespace) control characters