Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: step-3-04-ReapTxHistory #12234

Merged
merged 8 commits into from
Apr 1, 2024
85 changes: 68 additions & 17 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@
}

// countTransactionsByState returns the number of transactions that are in the given state
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int {

Check failure on line 132 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).countTransactionsByState is unused (unused)
return 0
}

// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {

Check failure on line 137 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxWithIdempotencyKey is unused (unused)
return nil
}

Expand All @@ -143,7 +143,7 @@
// If no txIDs are provided, all transactions in the given states are considered.
// If no txStates are provided, all transactions are considered.
// Any transaction states that are unknown will cause a panic.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState(

Check failure on line 146 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).applyToTxsByState is unused (unused)
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand Down Expand Up @@ -189,7 +189,7 @@
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts(

Check failure on line 192 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxAttempts is unused (unused)
txStates []txmgrtypes.TxState,
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
Expand Down Expand Up @@ -244,24 +244,63 @@
}

// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) {

Check failure on line 247 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).pruneUnstartedTxQueue is unused (unused)
}

// reapConfirmedTxs removes confirmed transactions that are older than the given time threshold.
// It also removes confirmed transactions that are older than the given block number threshold.
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) reapConfirmedTxs(minBlockNumberToKeep int64, timeThreshold time.Time) {
as.Lock()
defer as.Unlock()

for _, tx := range as.confirmedTxs {
if len(tx.TxAttempts) == 0 {
continue
}
if tx.CreatedAt.After(timeThreshold) {
continue
}

for i := 0; i < len(tx.TxAttempts); i++ {
if len(tx.TxAttempts[i].Receipts) == 0 {
continue
}
if tx.TxAttempts[i].Receipts[0].GetBlockNumber() == nil || tx.TxAttempts[i].Receipts[0].GetBlockNumber().Int64() >= minBlockNumberToKeep {
continue
}
as._deleteTx(tx.ID)
}
}
}

// reapFatalErroredTxs removes fatal errored transactions that are older than the given time threshold.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) reapFatalErroredTxs(timeThreshold time.Time) {
as.Lock()
defer as.Unlock()

for _, tx := range as.fatalErroredTxs {
if tx.CreatedAt.After(timeThreshold) {
continue
}
as._deleteTx(tx.ID)
}
}

// deleteTxs removes the transactions with the given IDs from the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txIDs ...int64) {

Check failure on line 290 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).deleteTxs is unused (unused)
as.Lock()
defer as.Unlock()

as._deleteTxs(txs...)
as._deleteTxs(txIDs...)
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 298 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekNextUnstartedTx is unused (unused)
return nil, nil
}

// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 303 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekInProgressTx is unused (unused)
return nil, nil
}

Expand Down Expand Up @@ -415,7 +454,7 @@
return nil
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _applyToTxs(

Check failure on line 457 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._applyToTxs is unused (unused)
txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand Down Expand Up @@ -464,22 +503,34 @@
return txs
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
for _, tx := range txs {
if tx.IdempotencyKey != nil {
delete(as.idempotencyKeyToTx, *tx.IdempotencyKey)
}
txID := tx.ID
if as.inprogressTx != nil && as.inprogressTx.ID == txID {
as.inprogressTx = nil
}
delete(as.allTxs, txID)
delete(as.unconfirmedTxs, txID)
delete(as.confirmedMissingReceiptTxs, txID)
delete(as.confirmedTxs, txID)
delete(as.fatalErroredTxs, txID)
as.unstartedTxs.RemoveTxByID(txID)
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txIDs ...int64) {

Check failure on line 506 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._deleteTxs is unused (unused)
for _, txID := range txIDs {
as._deleteTx(txID)
}
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTx(txID int64) {
tx, ok := as.allTxs[txID]
if !ok {
return
}

for i := 0; i < len(tx.TxAttempts); i++ {
txAttemptHash := tx.TxAttempts[i].Hash
delete(as.attemptHashToTxAttempt, txAttemptHash)
}
if tx.IdempotencyKey != nil {
delete(as.idempotencyKeyToTx, *tx.IdempotencyKey)
}
if as.inprogressTx != nil && as.inprogressTx.ID == txID {
as.inprogressTx = nil
}
as.unstartedTxs.RemoveTxByID(txID)
delete(as.unconfirmedTxs, txID)
delete(as.confirmedMissingReceiptTxs, txID)
delete(as.confirmedTxs, txID)
delete(as.fatalErroredTxs, txID)
delete(as.allTxs, txID)
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _moveTxToFatalError(
Expand Down
17 changes: 17 additions & 0 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,23 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prune
}

func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error {
if ms.chainID.String() != chainID.String() {
panic("invalid chain ID")
DylanTinianov marked this conversation as resolved.
Show resolved Hide resolved
}

// Persist to persistent storage
if err := ms.persistentTxStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID); err != nil {
return err
}

// Update in memory store
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
for _, as := range ms.addressStates {
as.reapConfirmedTxs(minBlockNumberToKeep, timeThreshold)
as.reapFatalErroredTxs(timeThreshold)
}

return nil
}
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(_ context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) {
Expand Down
143 changes: 143 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr"
Expand All @@ -16,12 +17,154 @@ import (
evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_ReapTxHistory(t *testing.T) {
t.Parallel()

t.Run("reap all confirmed txs", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx_0 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 7, 1, fromAddress)
r_0 := mustInsertEthReceipt(t, persistentStore, 1, utils.NewHash(), inTx_0.TxAttempts[0].Hash)
inTx_0.TxAttempts[0].Receipts = append(inTx_0.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_0))
inTx_1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 8, 2, fromAddress)
r_1 := mustInsertEthReceipt(t, persistentStore, 2, utils.NewHash(), inTx_1.TxAttempts[0].Hash)
inTx_1.TxAttempts[0].Receipts = append(inTx_1.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_1))
inTx_2 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 9, 3, fromAddress)
r_2 := mustInsertEthReceipt(t, persistentStore, 3, utils.NewHash(), inTx_2.TxAttempts[0].Hash)
inTx_2.TxAttempts[0].Receipts = append(inTx_2.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_2))
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_0))
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_1))
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_2))

minBlockNumberToKeep := int64(3)
timeThreshold := inTx_2.CreatedAt
expErr := persistentStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID)
actErr := inMemoryStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID)
require.NoError(t, expErr)
require.NoError(t, actErr)

fn := func(tx *evmtxmgr.Tx) bool { return true }
// Check that the transactions were reaped in persistent store
expTx_0, err := persistentStore.FindTxWithAttempts(ctx, inTx_0.ID)
require.Error(t, err)
require.Equal(t, int64(0), expTx_0.ID)
expTx_1, err := persistentStore.FindTxWithAttempts(ctx, inTx_1.ID)
require.Error(t, err)
require.Equal(t, int64(0), expTx_1.ID)
// Check that the transactions were reaped in in-memory store
actTxs_0 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_0.ID)
require.Equal(t, 0, len(actTxs_0))
actTxs_1 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_1.ID)
require.Equal(t, 0, len(actTxs_1))

// Check that the transaction was not reaped
expTx_2, err := persistentStore.FindTxWithAttempts(ctx, inTx_2.ID)
require.NoError(t, err)
require.Equal(t, inTx_2.ID, expTx_2.ID)
actTxs_2 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_2.ID)
require.Equal(t, 1, len(actTxs_2))
assertTxEqual(t, expTx_2, actTxs_2[0])
})
t.Run("reap all fatal error txs", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx_0 := cltest.NewEthTx(fromAddress)
inTx_0.Error = null.StringFrom("something exploded")
inTx_0.State = commontxmgr.TxFatalError
inTx_0.CreatedAt = time.Unix(1000, 0)
require.NoError(t, persistentStore.InsertTx(ctx, &inTx_0))
inTx_1 := cltest.NewEthTx(fromAddress)
inTx_1.Error = null.StringFrom("something exploded")
inTx_1.State = commontxmgr.TxFatalError
inTx_1.CreatedAt = time.Unix(2000, 0)
require.NoError(t, persistentStore.InsertTx(ctx, &inTx_1))
inTx_2 := cltest.NewEthTx(fromAddress)
inTx_2.Error = null.StringFrom("something exploded")
inTx_2.State = commontxmgr.TxFatalError
inTx_2.CreatedAt = time.Unix(3000, 0)
require.NoError(t, persistentStore.InsertTx(ctx, &inTx_2))
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_0))
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_1))
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_2))

minBlockNumberToKeep := int64(3)
timeThreshold := time.Unix(2500, 0) // Only reap txs created before this time
expErr := persistentStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID)
actErr := inMemoryStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID)
require.NoError(t, expErr)
require.NoError(t, actErr)

fn := func(tx *evmtxmgr.Tx) bool { return true }
// Check that the transactions were reaped in persistent store
expTx_0, err := persistentStore.FindTxWithAttempts(ctx, inTx_0.ID)
require.Error(t, err)
require.Equal(t, int64(0), expTx_0.ID)
expTx_1, err := persistentStore.FindTxWithAttempts(ctx, inTx_1.ID)
require.Error(t, err)
require.Equal(t, int64(0), expTx_1.ID)
// Check that the transactions were reaped in in-memory store
actTxs_0 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_0.ID)
require.Equal(t, 0, len(actTxs_0))
actTxs_1 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_1.ID)
require.Equal(t, 0, len(actTxs_1))

// Check that the transaction was not reaped
expTx_2, err := persistentStore.FindTxWithAttempts(ctx, inTx_2.ID)
require.NoError(t, err)
require.Equal(t, inTx_2.ID, expTx_2.ID)
actTxs_2 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_2.ID)
require.Equal(t, 1, len(actTxs_2))
assertTxEqual(t, expTx_2, actTxs_2[0])
})
}

func TestInMemoryStore_MarkOldTxesMissingReceiptAsErrored(t *testing.T) {
t.Parallel()
blockNum := int64(10)
Expand Down
Loading