Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

take pruning logic out of create_transaction logic #11845

Merged
merged 17 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions common/txmgr/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func NewSendEveryStrategy() txmgrtypes.TxStrategy {
type SendEveryStrategy struct{}

func (SendEveryStrategy) Subject() uuid.NullUUID { return uuid.NullUUID{} }
func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (int64, error) {
return 0, nil
func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) ([]int64, error) {
return nil, nil
}

var _ txmgrtypes.TxStrategy = DropOldestStrategy{}
Expand All @@ -56,14 +56,14 @@ func (s DropOldestStrategy) Subject() uuid.NullUUID {
return uuid.NullUUID{UUID: s.subject, Valid: true}
}

func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (n int64, err error) {
func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (ids []int64, err error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.queryTimeout)
defer cancel()

n, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize, s.subject)
ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize, s.subject)
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return 0, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err)
return ids, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err)
}
return
}
23 changes: 23 additions & 0 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,9 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
if err != nil {
return tx, err
}
if err := b.pruneQueue(ctx, txRequest, tx); err != nil {
return tx, fmt.Errorf("CreateTransaction failed to prune queue: %w", err)
}

// Trigger the Broadcaster to check for new transaction
b.broadcaster.Trigger(txRequest.FromAddress)
Expand Down Expand Up @@ -559,6 +562,9 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative
if err != nil {
return etx, fmt.Errorf("SendNativeToken failed to insert tx: %w", err)
}
if err := b.pruneQueue(ctx, txRequest, etx); err != nil {
return etx, fmt.Errorf("SendNativeToken failed to prune queue: %w", err)
}
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved

// Trigger the Broadcaster to check for new transaction
b.broadcaster.Trigger(from)
Expand Down Expand Up @@ -675,3 +681,20 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Fin
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) {
return count, errors.New(n.ErrMsg)
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueue(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore)
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
if len(pruned) > 0 {
b.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", len(pruned)),
"fromAddress", txRequest.FromAddress,
"toAddress", txRequest.ToAddress,
"meta", txRequest.Meta,
"subject", txRequest.Strategy.Subject(),
"replacementID", etx.ID)
}

return nil
}
12 changes: 7 additions & 5 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions common/txmgr/types/mocks/tx_strategy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type TxStrategy interface {
// PruneQueue is called after tx insertion
// It accepts the service responsible for deleting
// unstarted txs and deletion options
PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (n int64, err error)
PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (ids []int64, err error)
}

type TxAttemptState int8
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type TxHistoryReaper[CHAIN_ID types.ID] interface {
}

type UnstartedTxQueuePruner interface {
PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error)
PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error)
}

// R is the raw unparsed transaction receipt
Expand Down
18 changes: 6 additions & 12 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,28 +1841,20 @@ RETURNING "txes".*
if err != nil {
return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert evm tx")
}
var pruned int64
pruned, err = txRequest.Strategy.PruneQueue(ctx, o)
if err != nil {
return pkgerrors.Wrap(err, "CreateEthTransaction failed to prune evm.txes")
}
if pruned > 0 {
o.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", pruned), "fromAddress", txRequest.FromAddress, "toAddress", txRequest.ToAddress, "meta", txRequest.Meta, "subject", txRequest.Strategy.Subject(), "replacementID", dbEtx.ID)
}
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
var etx Tx
dbEtx.ToTx(&etx)
return etx, err
}

func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error) {
func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
err = qq.Transaction(func(tx pg.Queryer) error {
res, err := qq.Exec(`
err := qq.Select(&ids, `
DELETE FROM evm.txes
WHERE state = 'unstarted' AND subject = $1 AND
id < (
Expand All @@ -1873,11 +1865,13 @@ id < (
ORDER BY id DESC
LIMIT $3
) numbers
)`, subject, subject, queueSize)
) RETURNING id`, subject, subject, queueSize)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return pkgerrors.Wrap(err, "DeleteUnstartedEthTx failed")
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
}
n, err = res.RowsAffected()
return err
})
return
Expand Down
3 changes: 0 additions & 3 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"
)
Expand Down Expand Up @@ -1723,7 +1722,6 @@ func TestORM_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
ToAddress: toAddress,
Expand Down Expand Up @@ -1780,7 +1778,6 @@ func TestORM_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
ToAddress: toAddress,
Expand Down
12 changes: 7 additions & 5 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions core/chains/evm/txmgr/strategies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func Test_SendEveryStrategy(t *testing.T) {

assert.Equal(t, uuid.NullUUID{}, s.Subject())

n, err := s.PruneQueue(testutils.Context(t), nil)
ids, err := s.PruneQueue(testutils.Context(t), nil)
assert.NoError(t, err)
assert.Equal(t, int64(0), n)
assert.Equal(t, 0, len(ids))
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
}

func Test_DropOldestStrategy_Subject(t *testing.T) {
Expand All @@ -47,9 +47,9 @@ func Test_DropOldestStrategy_PruneQueue(t *testing.T) {

t.Run("calls PrineUnstartedTxQueue for the given subject and queueSize, ignoring fromAddress", func(t *testing.T) {
strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize, queryTimeout)
mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize, subject, mock.Anything, mock.Anything).Once().Return(int64(2), nil)
n, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore)
mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize, subject, mock.Anything, mock.Anything).Once().Return([]int64{1, 2}, nil)
ids, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore)
require.NoError(t, err)
assert.Equal(t, int64(2), n)
assert.Equal(t, []int64{1, 2}, ids)
})
}
12 changes: 8 additions & 4 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestTxm_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)
evmConfig.MaxQueued = uint64(1)
etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, otherKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: evmFromAddress,
Expand All @@ -430,7 +430,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, thisKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: evmFromAddress,
Expand All @@ -451,7 +451,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 0, 42, thisKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

evmConfig.MaxQueued = uint64(1)
etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
Expand Down Expand Up @@ -799,6 +799,10 @@ func mustCreateUnstartedTx(t testing.TB, txStore txmgr.EvmTxStore, fromAddress c
func mustCreateUnstartedTxFromEvmTxRequest(t testing.TB, txStore txmgr.EvmTxStore, txRequest txmgr.TxRequest, chainID *big.Int) (tx txmgr.Tx) {
tx, err := txStore.CreateTransaction(testutils.Context(t), txRequest, chainID)
require.NoError(t, err)

_, err = txRequest.Strategy.PruneQueue(testutils.Context(t), txStore)
require.NoError(t, err)

return tx
}

Expand Down
2 changes: 1 addition & 1 deletion core/web/eth_keys_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) {
subject := uuid.New()
strategy := commontxmmocks.NewTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(nil, nil)
_, err := chain.TxManager().CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: addr,
ToAddress: testutils.NewAddress(),
Expand Down
Loading