Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

take pruning logic out of create_transaction logic #11845

Merged
merged 17 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions common/txmgr/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func NewSendEveryStrategy() txmgrtypes.TxStrategy {
type SendEveryStrategy struct{}

func (SendEveryStrategy) Subject() uuid.NullUUID { return uuid.NullUUID{} }
func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (int64, error) {
return 0, nil
func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) ([]int64, error) {
return nil, nil
}

var _ txmgrtypes.TxStrategy = DropOldestStrategy{}
Expand All @@ -56,14 +56,15 @@ func (s DropOldestStrategy) Subject() uuid.NullUUID {
return uuid.NullUUID{UUID: s.subject, Valid: true}
}

func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (n int64, err error) {
func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (ids []int64, err error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.queryTimeout)
defer cancel()

n, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize, s.subject)
// NOTE: We prune one less than the queue size to prevent the queue from exceeding the max queue size. Which could occur if a new transaction is added to the queue right after we prune.
ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize-1, s.subject)
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return 0, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err)
return ids, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err)
}
return
}
37 changes: 35 additions & 2 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
return tx, fmt.Errorf("Txm#CreateTransaction: %w", err)
}

tx, err = b.txStore.CreateTransaction(ctx, txRequest, b.chainID)
tx, err = b.pruneQueueAndCreateTxn(ctx, txRequest, b.chainID)
if err != nil {
return tx, err
}
Expand Down Expand Up @@ -562,7 +562,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative
FeeLimit: gasLimit,
Strategy: NewSendEveryStrategy(),
}
etx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID)
etx, err = b.pruneQueueAndCreateTxn(ctx, txRequest, chainID)
if err != nil {
return etx, fmt.Errorf("SendNativeToken failed to insert tx: %w", err)
}
Expand Down Expand Up @@ -682,3 +682,36 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Fin
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) {
return count, errors.New(n.ErrMsg)
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueueAndCreateTxn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurred to me during our call that both PruneQueue and createTransaction touch the queue, but there's no locking on this function. As a result, its possible there could be a race condition that occurs when multiple pruneQueueAndCreateTxn calls are done at the same time. For example, if thread A calls pruneQueue and then gets pre-empted, then thread B could run pruneQueueAndCreate putting the queueSize to the max queue size, and then when thread A finishes, it could then call createTransaction when the queueSize is at maximum, thereby allowing the queue to grow beyond the max queue size.

My question here is: do we need a lock to guard the interactions with the queue within this function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's a good point!

We do have a constraint in our other loops where only 1 thread is operating for each fromAddress. Thus no thread safety issues.
However the CreateTransaction method is called by the client, and can be called concurrently.
So the situation you described can potentially happen.
Note that the only harm in this case would be that our queue will momentarily grow to a larger size. As soon as there's yet another call from client, that prune will again cut down the queue size to right values.

So this isn't a big problem, except that theoretically we could have a larger queue.

I won't mind adding a new lock to prevent this situation, but even if we don't, it won't break/affect anything, according to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch patrick. Added the Lock

ctx context.Context,
txRequest txmgrtypes.TxRequest[ADDR, TX_HASH],
chainID CHAIN_ID,
) (
tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
err error,
) {
pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore)
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return tx, err
}
if len(pruned) > 0 {
b.logger.Warnw(fmt.Sprintf("Pruned %d old unstarted transactions", len(pruned)),
"subject", txRequest.Strategy.Subject(),
"pruned-tx-ids", pruned,
)
}

tx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID)
if err != nil {
return tx, err
}
b.logger.Debugw("Created transaction",
"fromAddress", txRequest.FromAddress,
"toAddress", txRequest.ToAddress,
"meta", txRequest.Meta,
"transactionID", tx.ID,
)

return tx, nil
}
12 changes: 7 additions & 5 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions common/txmgr/types/mocks/tx_strategy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type TxStrategy interface {
// PruneQueue is called after tx insertion
// It accepts the service responsible for deleting
// unstarted txs and deletion options
PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (n int64, err error)
PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (ids []int64, err error)
}

type TxAttemptState int8
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type TxHistoryReaper[CHAIN_ID types.ID] interface {
}

type UnstartedTxQueuePruner interface {
PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error)
PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error)
}

// R is the raw unparsed transaction receipt
Expand Down
20 changes: 7 additions & 13 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,28 +1841,20 @@ RETURNING "txes".*
if err != nil {
return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert evm tx")
}
var pruned int64
pruned, err = txRequest.Strategy.PruneQueue(ctx, o)
if err != nil {
return pkgerrors.Wrap(err, "CreateEthTransaction failed to prune evm.txes")
}
if pruned > 0 {
o.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", pruned), "fromAddress", txRequest.FromAddress, "toAddress", txRequest.ToAddress, "meta", txRequest.Meta, "subject", txRequest.Strategy.Subject(), "replacementID", dbEtx.ID)
}
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
var etx Tx
dbEtx.ToTx(&etx)
return etx, err
}

func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error) {
func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
err = qq.Transaction(func(tx pg.Queryer) error {
res, err := qq.Exec(`
err := qq.Select(&ids, `
DELETE FROM evm.txes
WHERE state = 'unstarted' AND subject = $1 AND
id < (
Expand All @@ -1873,11 +1865,13 @@ id < (
ORDER BY id DESC
LIMIT $3
) numbers
)`, subject, subject, queueSize)
) RETURNING id`, subject, subject, queueSize)
if err != nil {
return pkgerrors.Wrap(err, "DeleteUnstartedEthTx failed")
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return fmt.Errorf("PruneUnstartedTxQueue failed: %w", err)
}
n, err = res.RowsAffected()
return err
})
return
Expand Down
11 changes: 4 additions & 7 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"
)
Expand Down Expand Up @@ -1723,7 +1722,6 @@ func TestORM_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
ToAddress: toAddress,
Expand Down Expand Up @@ -1780,7 +1778,6 @@ func TestORM_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
ToAddress: toAddress,
Expand Down Expand Up @@ -1816,22 +1813,22 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {
evmtest.NewEthClientMockWithDefaultChain(t)
_, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore)

t.Run("does not prune if queue has not exceeded capacity", func(t *testing.T) {
t.Run("does not prune if queue has not exceeded capacity-1", func(t *testing.T) {
subject1 := uuid.New()
strategy1 := txmgrcommon.NewDropOldestStrategy(subject1, uint32(5), cfg.Database().DefaultQueryTimeout())
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy1))
}
AssertCountPerSubject(t, txStore, int64(5), subject1)
AssertCountPerSubject(t, txStore, int64(4), subject1)
})

t.Run("prunes if queue has exceeded capacity", func(t *testing.T) {
t.Run("prunes if queue has exceeded capacity-1", func(t *testing.T) {
subject2 := uuid.New()
strategy2 := txmgrcommon.NewDropOldestStrategy(subject2, uint32(3), cfg.Database().DefaultQueryTimeout())
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy2))
}
AssertCountPerSubject(t, txStore, int64(3), subject2)
AssertCountPerSubject(t, txStore, int64(2), subject2)
})
}

Expand Down
12 changes: 7 additions & 5 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions core/chains/evm/txmgr/strategies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func Test_SendEveryStrategy(t *testing.T) {

assert.Equal(t, uuid.NullUUID{}, s.Subject())

n, err := s.PruneQueue(testutils.Context(t), nil)
ids, err := s.PruneQueue(testutils.Context(t), nil)
assert.NoError(t, err)
assert.Equal(t, int64(0), n)
assert.Len(t, ids, 0)
}

func Test_DropOldestStrategy_Subject(t *testing.T) {
Expand All @@ -47,9 +47,9 @@ func Test_DropOldestStrategy_PruneQueue(t *testing.T) {

t.Run("calls PrineUnstartedTxQueue for the given subject and queueSize, ignoring fromAddress", func(t *testing.T) {
strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize, queryTimeout)
mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize, subject, mock.Anything, mock.Anything).Once().Return(int64(2), nil)
n, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore)
mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize-1, subject, mock.Anything, mock.Anything).Once().Return([]int64{1, 2}, nil)
patrick-dowell marked this conversation as resolved.
Show resolved Hide resolved
ids, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore)
require.NoError(t, err)
assert.Equal(t, int64(2), n)
assert.Equal(t, []int64{1, 2}, ids)
})
}
12 changes: 8 additions & 4 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestTxm_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)
evmConfig.MaxQueued = uint64(1)
etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, otherKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: evmFromAddress,
Expand All @@ -430,7 +430,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, thisKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: evmFromAddress,
Expand All @@ -451,7 +451,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 0, 42, thisKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

evmConfig.MaxQueued = uint64(1)
etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
Expand Down Expand Up @@ -799,6 +799,10 @@ func mustCreateUnstartedTx(t testing.TB, txStore txmgr.EvmTxStore, fromAddress c
func mustCreateUnstartedTxFromEvmTxRequest(t testing.TB, txStore txmgr.EvmTxStore, txRequest txmgr.TxRequest, chainID *big.Int) (tx txmgr.Tx) {
tx, err := txStore.CreateTransaction(testutils.Context(t), txRequest, chainID)
require.NoError(t, err)

_, err = txRequest.Strategy.PruneQueue(testutils.Context(t), txStore)
require.NoError(t, err)

return tx
}

Expand Down
2 changes: 1 addition & 1 deletion core/web/eth_keys_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) {
subject := uuid.New()
strategy := commontxmmocks.NewTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(nil, nil)
_, err := chain.TxManager().CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: addr,
ToAddress: testutils.NewAddress(),
Expand Down
Loading