From 0b2b537ea7edfcf75c3009ca0139c61d5e38ae59 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 22 Jan 2024 15:08:58 -0500 Subject: [PATCH 01/10] take pruning logic out of create_transaction logic --- common/txmgr/txmgr.go | 23 +++++++++++++++++++++++ core/chains/evm/txmgr/evm_tx_store.go | 8 -------- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 358554947cc..84c03cad6aa 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -519,6 +519,9 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran if err != nil { return tx, err } + if err := b.pruneQueue(ctx, txRequest, tx); err != nil { + return tx, fmt.Errorf("CreateTransaction failed to prune queue: %w", err) + } // Trigger the Broadcaster to check for new transaction b.broadcaster.Trigger(txRequest.FromAddress) @@ -559,12 +562,32 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative if err != nil { return etx, fmt.Errorf("SendNativeToken failed to insert tx: %w", err) } + if err := b.pruneQueue(ctx, txRequest, etx); err != nil { + return etx, fmt.Errorf("SendNativeToken failed to prune queue: %w", err) + } // Trigger the Broadcaster to check for new transaction b.broadcaster.Trigger(from) return etx, nil } +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueue(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore) + if err != nil { + return err + } + if pruned > 0 { + b.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", pruned), + "fromAddress", txRequest.FromAddress, + "toAddress", txRequest.ToAddress, + "meta", txRequest.Meta, + "subject", txRequest.Strategy.Subject(), + "replacementID", etx.ID) + } + + return nil +} + func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesByMetaFieldAndStates(ctx context.Context, metaField string, metaValue string, states []txmgrtypes.TxState, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { txes, err = b.txStore.FindTxesByMetaFieldAndStates(ctx, metaField, metaValue, states, chainID) return diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index add4d915809..062f88cb2e2 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1841,14 +1841,6 @@ RETURNING "txes".* if err != nil { return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert evm tx") } - var pruned int64 - pruned, err = txRequest.Strategy.PruneQueue(ctx, o) - if err != nil { - return pkgerrors.Wrap(err, "CreateEthTransaction failed to prune evm.txes") - } - if pruned > 0 { - o.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", pruned), "fromAddress", txRequest.FromAddress, "toAddress", txRequest.ToAddress, "meta", txRequest.Meta, "subject", txRequest.Strategy.Subject(), "replacementID", dbEtx.ID) - } return nil }) var etx Tx From 257153ed1b52767503a2c26f4f24b76bcf88feb4 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 22 Jan 2024 16:28:05 -0500 Subject: [PATCH 02/10] change prune queue function signature to return array of ids rather than count --- common/txmgr/strategies.go | 10 +++--- common/txmgr/txmgr.go | 34 ++++++++++----------- common/txmgr/types/mocks/tx_store.go | 12 +++++--- common/txmgr/types/mocks/tx_strategy.go | 12 +++++--- common/txmgr/types/tx.go | 2 +- common/txmgr/types/tx_store.go | 2 +- core/chains/evm/txmgr/evm_tx_store.go | 10 +++--- core/chains/evm/txmgr/evm_tx_store_test.go | 3 -- core/chains/evm/txmgr/mocks/evm_tx_store.go | 12 +++++--- core/chains/evm/txmgr/strategies_test.go | 10 +++--- core/chains/evm/txmgr/txmgr_test.go | 12 +++++--- core/web/eth_keys_controller_test.go | 2 +- 12 files changed, 65 insertions(+), 56 deletions(-) diff --git a/common/txmgr/strategies.go b/common/txmgr/strategies.go index faba2ba97bc..aa4abf650c5 100644 --- a/common/txmgr/strategies.go +++ b/common/txmgr/strategies.go @@ -32,8 +32,8 @@ func NewSendEveryStrategy() txmgrtypes.TxStrategy { type SendEveryStrategy struct{} func (SendEveryStrategy) Subject() uuid.NullUUID { return uuid.NullUUID{} } -func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (int64, error) { - return 0, nil +func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) ([]int64, error) { + return nil, nil } var _ txmgrtypes.TxStrategy = DropOldestStrategy{} @@ -56,14 +56,14 @@ func (s DropOldestStrategy) Subject() uuid.NullUUID { return uuid.NullUUID{UUID: s.subject, Valid: true} } -func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (n int64, err error) { +func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (ids []int64, err error) { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, s.queryTimeout) defer cancel() - n, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize, s.subject) + ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize, s.subject) if err != nil { - return 0, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err) + return ids, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err) } return } diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 84c03cad6aa..61a46bfda99 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -571,23 +571,6 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative return etx, nil } -func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueue(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { - pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore) - if err != nil { - return err - } - if pruned > 0 { - b.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", pruned), - "fromAddress", txRequest.FromAddress, - "toAddress", txRequest.ToAddress, - "meta", txRequest.Meta, - "subject", txRequest.Strategy.Subject(), - "replacementID", etx.ID) - } - - return nil -} - func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesByMetaFieldAndStates(ctx context.Context, metaField string, metaValue string, states []txmgrtypes.TxState, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { txes, err = b.txStore.FindTxesByMetaFieldAndStates(ctx, metaField, metaValue, states, chainID) return @@ -698,3 +681,20 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Fin func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) { return count, errors.New(n.ErrMsg) } + +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueue(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore) + if err != nil { + return err + } + if len(pruned) > 0 { + b.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", len(pruned)), + "fromAddress", txRequest.FromAddress, + "toAddress", txRequest.ToAddress, + "meta", txRequest.Meta, + "subject", txRequest.Strategy.Subject(), + "replacementID", etx.ID) + } + + return nil +} diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 16c20df31d7..353f398316d 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -937,22 +937,24 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PreloadTxes } // PruneUnstartedTxQueue provides a mock function with given fields: ctx, queueSize, subject -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (int64, error) { +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) ([]int64, error) { ret := _m.Called(ctx, queueSize, subject) if len(ret) == 0 { panic("no return value specified for PruneUnstartedTxQueue") } - var r0 int64 + var r0 []int64 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) (int64, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) ([]int64, error)); ok { return rf(ctx, queueSize, subject) } - if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) int64); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) []int64); ok { r0 = rf(ctx, queueSize, subject) } else { - r0 = ret.Get(0).(int64) + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } } if rf, ok := ret.Get(1).(func(context.Context, uint32, uuid.UUID) error); ok { diff --git a/common/txmgr/types/mocks/tx_strategy.go b/common/txmgr/types/mocks/tx_strategy.go index 7992c3fe05f..92d4b7da569 100644 --- a/common/txmgr/types/mocks/tx_strategy.go +++ b/common/txmgr/types/mocks/tx_strategy.go @@ -17,22 +17,24 @@ type TxStrategy struct { } // PruneQueue provides a mock function with given fields: ctx, pruneService -func (_m *TxStrategy) PruneQueue(ctx context.Context, pruneService types.UnstartedTxQueuePruner) (int64, error) { +func (_m *TxStrategy) PruneQueue(ctx context.Context, pruneService types.UnstartedTxQueuePruner) ([]int64, error) { ret := _m.Called(ctx, pruneService) if len(ret) == 0 { panic("no return value specified for PruneQueue") } - var r0 int64 + var r0 []int64 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, types.UnstartedTxQueuePruner) (int64, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, types.UnstartedTxQueuePruner) ([]int64, error)); ok { return rf(ctx, pruneService) } - if rf, ok := ret.Get(0).(func(context.Context, types.UnstartedTxQueuePruner) int64); ok { + if rf, ok := ret.Get(0).(func(context.Context, types.UnstartedTxQueuePruner) []int64); ok { r0 = rf(ctx, pruneService) } else { - r0 = ret.Get(0).(int64) + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } } if rf, ok := ret.Get(1).(func(context.Context, types.UnstartedTxQueuePruner) error); ok { diff --git a/common/txmgr/types/tx.go b/common/txmgr/types/tx.go index 0f5d651ae29..31f568b99a2 100644 --- a/common/txmgr/types/tx.go +++ b/common/txmgr/types/tx.go @@ -30,7 +30,7 @@ type TxStrategy interface { // PruneQueue is called after tx insertion // It accepts the service responsible for deleting // unstarted txs and deletion options - PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (n int64, err error) + PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (ids []int64, err error) } type TxAttemptState int8 diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 57ecf28d589..742a1740033 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -114,7 +114,7 @@ type TxHistoryReaper[CHAIN_ID types.ID] interface { } type UnstartedTxQueuePruner interface { - PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error) + PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error) } // R is the raw unparsed transaction receipt diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 062f88cb2e2..b78bd79e667 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1848,13 +1848,13 @@ RETURNING "txes".* return etx, err } -func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error) { +func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() qq := o.q.WithOpts(pg.WithParentCtx(ctx)) err = qq.Transaction(func(tx pg.Queryer) error { - res, err := qq.Exec(` + err := qq.Select(&ids, ` DELETE FROM evm.txes WHERE state = 'unstarted' AND subject = $1 AND id < ( @@ -1865,11 +1865,13 @@ id < ( ORDER BY id DESC LIMIT $3 ) numbers -)`, subject, subject, queueSize) +) RETURNING id`, subject, subject, queueSize) if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil + } return pkgerrors.Wrap(err, "DeleteUnstartedEthTx failed") } - n, err = res.RowsAffected() return err }) return diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index b5da5527448..ed5d53cf05a 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -27,7 +27,6 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v4" ) @@ -1723,7 +1722,6 @@ func TestORM_CreateTransaction(t *testing.T) { subject := uuid.New() strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true}) - strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil) etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: fromAddress, ToAddress: toAddress, @@ -1780,7 +1778,6 @@ func TestORM_CreateTransaction(t *testing.T) { subject := uuid.New() strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true}) - strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil) etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: fromAddress, ToAddress: toAddress, diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index a9a7023ac1f..9690bf9728d 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -1058,22 +1058,24 @@ func (_m *EvmTxStore) PreloadTxes(ctx context.Context, attempts []types.TxAttemp } // PruneUnstartedTxQueue provides a mock function with given fields: ctx, queueSize, subject -func (_m *EvmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (int64, error) { +func (_m *EvmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) ([]int64, error) { ret := _m.Called(ctx, queueSize, subject) if len(ret) == 0 { panic("no return value specified for PruneUnstartedTxQueue") } - var r0 int64 + var r0 []int64 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) (int64, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) ([]int64, error)); ok { return rf(ctx, queueSize, subject) } - if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) int64); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) []int64); ok { r0 = rf(ctx, queueSize, subject) } else { - r0 = ret.Get(0).(int64) + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } } if rf, ok := ret.Get(1).(func(context.Context, uint32, uuid.UUID) error); ok { diff --git a/core/chains/evm/txmgr/strategies_test.go b/core/chains/evm/txmgr/strategies_test.go index 765b43e78f2..d38b03d9dd2 100644 --- a/core/chains/evm/txmgr/strategies_test.go +++ b/core/chains/evm/txmgr/strategies_test.go @@ -21,9 +21,9 @@ func Test_SendEveryStrategy(t *testing.T) { assert.Equal(t, uuid.NullUUID{}, s.Subject()) - n, err := s.PruneQueue(testutils.Context(t), nil) + ids, err := s.PruneQueue(testutils.Context(t), nil) assert.NoError(t, err) - assert.Equal(t, int64(0), n) + assert.Equal(t, 0, len(ids)) } func Test_DropOldestStrategy_Subject(t *testing.T) { @@ -47,9 +47,9 @@ func Test_DropOldestStrategy_PruneQueue(t *testing.T) { t.Run("calls PrineUnstartedTxQueue for the given subject and queueSize, ignoring fromAddress", func(t *testing.T) { strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize, queryTimeout) - mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize, subject, mock.Anything, mock.Anything).Once().Return(int64(2), nil) - n, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore) + mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize, subject, mock.Anything, mock.Anything).Once().Return([]int64{1, 2}, nil) + ids, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore) require.NoError(t, err) - assert.Equal(t, int64(2), n) + assert.Equal(t, []int64{1, 2}, ids) }) } diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 6fafff1a5c1..0e28f2948ee 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -121,7 +121,7 @@ func TestTxm_CreateTransaction(t *testing.T) { subject := uuid.New() strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true}) - strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil) + strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil) evmConfig.MaxQueued = uint64(1) etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: fromAddress, @@ -406,7 +406,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) { mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, otherKey.Address) strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{}) - strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil) + strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil) etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: evmFromAddress, @@ -430,7 +430,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) { mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, thisKey.Address) strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{}) - strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil) + strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil) etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: evmFromAddress, @@ -451,7 +451,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) { cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 0, 42, thisKey.Address) strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{}) - strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil) + strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil) evmConfig.MaxQueued = uint64(1) etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ @@ -799,6 +799,10 @@ func mustCreateUnstartedTx(t testing.TB, txStore txmgr.EvmTxStore, fromAddress c func mustCreateUnstartedTxFromEvmTxRequest(t testing.TB, txStore txmgr.EvmTxStore, txRequest txmgr.TxRequest, chainID *big.Int) (tx txmgr.Tx) { tx, err := txStore.CreateTransaction(testutils.Context(t), txRequest, chainID) require.NoError(t, err) + + _, err = txRequest.Strategy.PruneQueue(testutils.Context(t), txStore) + require.NoError(t, err) + return tx } diff --git a/core/web/eth_keys_controller_test.go b/core/web/eth_keys_controller_test.go index 739af5820c9..a9be5517bcc 100644 --- a/core/web/eth_keys_controller_test.go +++ b/core/web/eth_keys_controller_test.go @@ -401,7 +401,7 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) { subject := uuid.New() strategy := commontxmmocks.NewTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true}) - strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil) + strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(nil, nil) _, err := chain.TxManager().CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: addr, ToAddress: testutils.NewAddress(), From d4e5a03a472aa1dc7046ebd319cd27477700ff91 Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 23 Jan 2024 11:05:39 -0500 Subject: [PATCH 03/10] refactor txmgr a little --- common/txmgr/txmgr.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 61a46bfda99..246ad167409 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -515,13 +515,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran return tx, fmt.Errorf("Txm#CreateTransaction: %w", err) } - tx, err = b.txStore.CreateTransaction(ctx, txRequest, b.chainID) + tx, err = b.createTxnAndPruneQueue(ctx, txRequest, b.chainID) if err != nil { return tx, err } - if err := b.pruneQueue(ctx, txRequest, tx); err != nil { - return tx, fmt.Errorf("CreateTransaction failed to prune queue: %w", err) - } // Trigger the Broadcaster to check for new transaction b.broadcaster.Trigger(txRequest.FromAddress) @@ -558,13 +555,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative FeeLimit: gasLimit, Strategy: NewSendEveryStrategy(), } - etx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID) + etx, err = b.createTxnAndPruneQueue(ctx, txRequest, chainID) if err != nil { return etx, fmt.Errorf("SendNativeToken failed to insert tx: %w", err) } - if err := b.pruneQueue(ctx, txRequest, etx); err != nil { - return etx, fmt.Errorf("SendNativeToken failed to prune queue: %w", err) - } // Trigger the Broadcaster to check for new transaction b.broadcaster.Trigger(from) @@ -682,10 +676,21 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Cou return count, errors.New(n.ErrMsg) } -func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueue(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) createTxnAndPruneQueue( + ctx context.Context, + txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], + chainID CHAIN_ID, +) ( + txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + error, +) { + tx, err := b.txStore.CreateTransaction(ctx, txRequest, chainID) + if err != nil { + return tx, err + } pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore) if err != nil { - return err + return tx, err } if len(pruned) > 0 { b.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", len(pruned)), @@ -693,8 +698,8 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueue "toAddress", txRequest.ToAddress, "meta", txRequest.Meta, "subject", txRequest.Strategy.Subject(), - "replacementID", etx.ID) + "replacementID", tx.ID) } - return nil + return tx, nil } From 939d610a17466cb70cd77cc32bc77cd7c30fe902 Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 23 Jan 2024 14:11:42 -0500 Subject: [PATCH 04/10] change order of pruning and creating transaction --- common/txmgr/txmgr.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 246ad167409..3a3ccfa2442 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -681,24 +681,35 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) createTxnA txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], chainID CHAIN_ID, ) ( - txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], - error, + tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + err error, ) { - tx, err := b.txStore.CreateTransaction(ctx, txRequest, chainID) + pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore) if err != nil { return tx, err } - pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore) + + tx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID) if err != nil { + if len(pruned) > 0 { + b.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", len(pruned)), + "fromAddress", txRequest.FromAddress, + "toAddress", txRequest.ToAddress, + "meta", txRequest.Meta, + "subject", txRequest.Strategy.Subject(), + ) + } return tx, err } + if len(pruned) > 0 { - b.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", len(pruned)), + b.logger.Warnw("Dropped transaction replaced by new transaction", "fromAddress", txRequest.FromAddress, "toAddress", txRequest.ToAddress, "meta", txRequest.Meta, "subject", txRequest.Strategy.Subject(), - "replacementID", tx.ID) + "replacementID", tx.ID, + ) } return tx, nil From fc2bbc5cf93050925e1fb1d8a9056bf26bcb4ea5 Mon Sep 17 00:00:00 2001 From: Jim W Date: Tue, 23 Jan 2024 14:31:16 -0500 Subject: [PATCH 05/10] Apply suggestions from code review Co-authored-by: amit-momin <108959691+amit-momin@users.noreply.github.com> --- core/chains/evm/txmgr/evm_tx_store.go | 2 +- core/chains/evm/txmgr/strategies_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index b78bd79e667..ee0b147a8d0 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1870,7 +1870,7 @@ id < ( if errors.Is(err, sql.ErrNoRows) { return nil } - return pkgerrors.Wrap(err, "DeleteUnstartedEthTx failed") + return fmt.Errorf("DeleteUnstartedEthTx failed: %w", err) } return err }) diff --git a/core/chains/evm/txmgr/strategies_test.go b/core/chains/evm/txmgr/strategies_test.go index d38b03d9dd2..343c1f734ae 100644 --- a/core/chains/evm/txmgr/strategies_test.go +++ b/core/chains/evm/txmgr/strategies_test.go @@ -23,7 +23,7 @@ func Test_SendEveryStrategy(t *testing.T) { ids, err := s.PruneQueue(testutils.Context(t), nil) assert.NoError(t, err) - assert.Equal(t, 0, len(ids)) + assert.Len(t, ids, 0) } func Test_DropOldestStrategy_Subject(t *testing.T) { From 13d35dd4ef67f4b4676a8f4bd649661e27504d91 Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 23 Jan 2024 16:24:42 -0500 Subject: [PATCH 06/10] address comments --- common/txmgr/strategies.go | 2 +- common/txmgr/txmgr.go | 30 +++++++++++---------------- core/chains/evm/txmgr/evm_tx_store.go | 2 +- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/common/txmgr/strategies.go b/common/txmgr/strategies.go index aa4abf650c5..f134dbcbbb4 100644 --- a/common/txmgr/strategies.go +++ b/common/txmgr/strategies.go @@ -61,7 +61,7 @@ func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrty ctx, cancel = context.WithTimeout(ctx, s.queryTimeout) defer cancel() - ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize, s.subject) + ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize-1, s.subject) if err != nil { return ids, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err) } diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 3a3ccfa2442..60d3058fda1 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -688,29 +688,23 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) createTxnA if err != nil { return tx, err } + if len(pruned) > 0 { + b.logger.Warnw(fmt.Sprintf("Pruned %d old unstarted transactions", len(pruned)), + "subject", txRequest.Strategy.Subject(), + "pruned", pruned, + ) + } tx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID) if err != nil { - if len(pruned) > 0 { - b.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", len(pruned)), - "fromAddress", txRequest.FromAddress, - "toAddress", txRequest.ToAddress, - "meta", txRequest.Meta, - "subject", txRequest.Strategy.Subject(), - ) - } return tx, err } - - if len(pruned) > 0 { - b.logger.Warnw("Dropped transaction replaced by new transaction", - "fromAddress", txRequest.FromAddress, - "toAddress", txRequest.ToAddress, - "meta", txRequest.Meta, - "subject", txRequest.Strategy.Subject(), - "replacementID", tx.ID, - ) - } + b.logger.Debugw("Created transaction", + "fromAddress", txRequest.FromAddress, + "toAddress", txRequest.ToAddress, + "meta", txRequest.Meta, + "replacementID", tx.ID, + ) return tx, nil } diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index ee0b147a8d0..ae986acee27 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1870,7 +1870,7 @@ id < ( if errors.Is(err, sql.ErrNoRows) { return nil } - return fmt.Errorf("DeleteUnstartedEthTx failed: %w", err) + return fmt.Errorf("PruneUnstartedTxQueue failed: %w", err) } return err }) From 8052d49ea60ac570fd41664edb4806abf4e0ea28 Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 23 Jan 2024 17:18:28 -0500 Subject: [PATCH 07/10] fix tests --- core/chains/evm/txmgr/evm_tx_store_test.go | 8 ++++---- core/chains/evm/txmgr/strategies_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index ed5d53cf05a..35d684727d1 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -1813,22 +1813,22 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) { evmtest.NewEthClientMockWithDefaultChain(t) _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) - t.Run("does not prune if queue has not exceeded capacity", func(t *testing.T) { + t.Run("does not prune if queue has not exceeded capacity-1", func(t *testing.T) { subject1 := uuid.New() strategy1 := txmgrcommon.NewDropOldestStrategy(subject1, uint32(5), cfg.Database().DefaultQueryTimeout()) for i := 0; i < 5; i++ { mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy1)) } - AssertCountPerSubject(t, txStore, int64(5), subject1) + AssertCountPerSubject(t, txStore, int64(4), subject1) }) - t.Run("prunes if queue has exceeded capacity", func(t *testing.T) { + t.Run("prunes if queue has exceeded capacity-1", func(t *testing.T) { subject2 := uuid.New() strategy2 := txmgrcommon.NewDropOldestStrategy(subject2, uint32(3), cfg.Database().DefaultQueryTimeout()) for i := 0; i < 5; i++ { mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy2)) } - AssertCountPerSubject(t, txStore, int64(3), subject2) + AssertCountPerSubject(t, txStore, int64(2), subject2) }) } diff --git a/core/chains/evm/txmgr/strategies_test.go b/core/chains/evm/txmgr/strategies_test.go index 343c1f734ae..19f5f197289 100644 --- a/core/chains/evm/txmgr/strategies_test.go +++ b/core/chains/evm/txmgr/strategies_test.go @@ -47,7 +47,7 @@ func Test_DropOldestStrategy_PruneQueue(t *testing.T) { t.Run("calls PrineUnstartedTxQueue for the given subject and queueSize, ignoring fromAddress", func(t *testing.T) { strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize, queryTimeout) - mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize, subject, mock.Anything, mock.Anything).Once().Return([]int64{1, 2}, nil) + mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize-1, subject, mock.Anything, mock.Anything).Once().Return([]int64{1, 2}, nil) ids, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore) require.NoError(t, err) assert.Equal(t, []int64{1, 2}, ids) From 55d9ace386a876a05bfbe44b46624f9bdd9384c6 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 24 Jan 2024 15:19:00 -0500 Subject: [PATCH 08/10] address comments --- common/txmgr/strategies.go | 1 + common/txmgr/txmgr.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/common/txmgr/strategies.go b/common/txmgr/strategies.go index f134dbcbbb4..2fff3b7febc 100644 --- a/common/txmgr/strategies.go +++ b/common/txmgr/strategies.go @@ -61,6 +61,7 @@ func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrty ctx, cancel = context.WithTimeout(ctx, s.queryTimeout) defer cancel() + // NOTE: We prune one less than the queue size because we will be adding a new transaction to the queue right after this PruneQueue call ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize-1, s.subject) if err != nil { return ids, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err) diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 33b3af2e3e8..bc4cc02e1f8 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -698,7 +698,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) createTxnA if len(pruned) > 0 { b.logger.Warnw(fmt.Sprintf("Pruned %d old unstarted transactions", len(pruned)), "subject", txRequest.Strategy.Subject(), - "pruned", pruned, + "pruned-tx-ids", pruned, ) } @@ -710,7 +710,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) createTxnA "fromAddress", txRequest.FromAddress, "toAddress", txRequest.ToAddress, "meta", txRequest.Meta, - "replacementID", tx.ID, + "transactionID", tx.ID, ) return tx, nil From 676b07cd23be050c1a04891e86b500d8d13d4eb0 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 24 Jan 2024 17:43:06 -0500 Subject: [PATCH 09/10] address comments --- common/txmgr/strategies.go | 2 +- common/txmgr/txmgr.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/common/txmgr/strategies.go b/common/txmgr/strategies.go index 2fff3b7febc..3772e6d1d20 100644 --- a/common/txmgr/strategies.go +++ b/common/txmgr/strategies.go @@ -61,7 +61,7 @@ func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrty ctx, cancel = context.WithTimeout(ctx, s.queryTimeout) defer cancel() - // NOTE: We prune one less than the queue size because we will be adding a new transaction to the queue right after this PruneQueue call + // NOTE: We prune one less than the queue size to prevent the queue from exceeding the max queue size. Which could occur if a new transaction is added to the queue right after we prune. ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize-1, s.subject) if err != nil { return ids, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err) diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index bc4cc02e1f8..583ee217d2f 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -522,7 +522,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran return tx, fmt.Errorf("Txm#CreateTransaction: %w", err) } - tx, err = b.createTxnAndPruneQueue(ctx, txRequest, b.chainID) + tx, err = b.pruneQueueAndCreateTxn(ctx, txRequest, b.chainID) if err != nil { return tx, err } @@ -562,7 +562,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative FeeLimit: gasLimit, Strategy: NewSendEveryStrategy(), } - etx, err = b.createTxnAndPruneQueue(ctx, txRequest, chainID) + etx, err = b.pruneQueueAndCreateTxn(ctx, txRequest, chainID) if err != nil { return etx, fmt.Errorf("SendNativeToken failed to insert tx: %w", err) } @@ -683,7 +683,7 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Cou return count, errors.New(n.ErrMsg) } -func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) createTxnAndPruneQueue( +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueueAndCreateTxn( ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], chainID CHAIN_ID, From b02600304a4688f5ad2a2918ec495de53e551c95 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 25 Jan 2024 09:21:36 -0500 Subject: [PATCH 10/10] add pruneQueueAndCreateLock --- common/txmgr/txmgr.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 583ee217d2f..3e3fa9a20db 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -82,13 +82,14 @@ type Txm[ FEE feetypes.Fee, ] struct { services.StateMachine - logger logger.SugaredLogger - txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - config txmgrtypes.TransactionManagerChainConfig - txConfig txmgrtypes.TransactionManagerTransactionsConfig - keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] - chainID CHAIN_ID - checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + logger logger.SugaredLogger + txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + config txmgrtypes.TransactionManagerChainConfig + txConfig txmgrtypes.TransactionManagerTransactionsConfig + keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] + chainID CHAIN_ID + checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + pruneQueueAndCreateLock sync.Mutex chHeads chan HEAD trigger chan ADDR @@ -691,6 +692,9 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueue tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, ) { + b.pruneQueueAndCreateLock.Lock() + defer b.pruneQueueAndCreateLock.Unlock() + pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore) if err != nil { return tx, err