From 66be6d3cb610afdf9d143cfde3161ec37a639582 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 26 Feb 2024 21:25:00 -0500 Subject: [PATCH 1/7] implement UpdateTxFatalError --- common/txmgr/inmemory_store.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index cd783a25210..b82d6ff0be0 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -178,6 +178,37 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR // UpdateTxFatalError updates a transaction to fatal_error. func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxFatalError(ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + if tx.State != TxInProgress && tx.State != TxUnstarted { + return fmt.Errorf("update_tx_fatal_error: can only transition to fatal_error from in_progress, transaction is currently %s", tx.State) + } + if !tx.Error.Valid { + return fmt.Errorf("update_tx_fatal_error: expected error field to be set") + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[tx.FromAddress] + if !ok { + return fmt.Errorf("update_tx_fatal_error: %w", ErrAddressNotFound) + } + + // Persist to persistent storage + if err := ms.txStore.UpdateTxFatalError(ctx, tx); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } + + // Update in memory store + switch tx.State { + case TxInProgress: + if err := as.MoveInProgressToFatalError(tx.Error); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } + case TxUnstarted: + if err := as.MoveUnstartedToFatalError(tx.ID, tx.Error); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } + } + return nil } From b971ee651c37b7f6c94547f759812e1c4a9c4d71 Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 27 Feb 2024 15:33:30 -0500 Subject: [PATCH 2/7] implement SaveReplacementInProgressAttempt --- common/txmgr/inmemory_store.go | 36 ++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index cd783a25210..dea20161dad 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -173,6 +173,42 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR oldAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) error { + if oldAttempt.State != txmgrtypes.TxAttemptInProgress || replacementAttempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("save_replacement_in_progress_attempt: expected attempts to be in_progress") + } + if oldAttempt.ID == 0 { + return fmt.Errorf("save_replacement_in_progress_attempt: expected oldattempt to have an ID") + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[oldAttempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("save_replacement_in_progress_attempt: %w", ErrAddressNotFound) + } + + // Persist to persistent storage + if err := ms.txStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, replacementAttempt); err != nil { + return fmt.Errorf("save_replacement_in_progress_attempt: %w", err) + } + + // Update in memory store + tx, err := as.PeekInProgressTx() + if tx == nil { + return fmt.Errorf("save_replacement_in_progress_attempt: %w", err) + } + + var found bool + for i := 0; i < len(tx.TxAttempts); i++ { + if tx.TxAttempts[i].ID == oldAttempt.ID { + tx.TxAttempts[i] = *replacementAttempt + found = true + } + } + if !found { + tx.TxAttempts = append(tx.TxAttempts, *replacementAttempt) + } + return nil } From afe628079d743dd6a50a18841bfbc0bf72aa59e3 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 13 Mar 2024 11:00:42 -0400 Subject: [PATCH 3/7] initial implementation of SaveReplacementInProgressAttempt --- common/txmgr/inmemory_store.go | 8 ++- .../evm/txmgr/evm_inmemory_store_test.go | 62 +++++++++++++++++++ 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 9f6373a68ab..757a6ee0507 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "sync" "time" "github.com/google/uuid" @@ -46,7 +47,8 @@ type inMemoryStore[ keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + addressStatesLock sync.RWMutex + addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] } // NewInMemoryStore returns a new inMemoryStore @@ -186,12 +188,12 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR } // Persist to persistent storage - if err := ms.txStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, replacementAttempt); err != nil { + if err := ms.persistentTxStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, replacementAttempt); err != nil { return fmt.Errorf("save_replacement_in_progress_attempt: %w", err) } // Update in memory store - tx, err := as.PeekInProgressTx() + tx, err := as.peekInProgressTx() if tx == nil { return fmt.Errorf("save_replacement_in_progress_attempt: %w", err) } diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index a102ee1c996..b6a858a2c99 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -1,14 +1,76 @@ package txmgr_test import ( + "context" + "math/big" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + + evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_SaveReplacementInProgressAttempt(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := context.Background() + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + t.Run("successfully replace tx attempt", func(t *testing.T) { + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 123, fromAddress) + oldAttempt := inTx.TxAttempts[0] + newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + err := inMemoryStore.SaveReplacementInProgressAttempt( + testutils.Context(t), + oldAttempt, + &newAttempt, + ) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(inTx.ID) + require.NoError(t, err) + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + assertTxEqual(t, expTx, actTx) + assert.Equal(t, txmgrtypes.TxAttemptBroadcast, actTx.TxAttempts[0].State) + }) +} + // assertTxEqual asserts that two transactions are equal func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { assert.Equal(t, exp.ID, act.ID) From 5af210628f8aaf2011c3bae26759a4a514551a9c Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 13 Mar 2024 16:24:24 -0400 Subject: [PATCH 4/7] update tests for SaveReplacementInProgressAttempt --- common/txmgr/address_state.go | 55 ++++++++- common/txmgr/inmemory_store.go | 41 +++---- .../evm/txmgr/evm_inmemory_store_test.go | 105 ++++++++++++++---- 3 files changed, 156 insertions(+), 45 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 791b16b2d42..9e488d789b6 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -1,6 +1,7 @@ package txmgr import ( + "errors" "fmt" "sync" "time" @@ -244,14 +245,64 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) delete as._deleteTxs(txs...) } +// deleteTxAttempts removes the attempts with the given IDs from the address state. +// It removes the attempts from the hash lookup map and from the transaction. +// If an attempt is not found in the hash lookup map, it is ignored. +// If a transaction is not found in the allTxs map, it is ignored. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxAttempts(txAttempts ...txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + as.Lock() + defer as.Unlock() + + for _, txAttempt := range txAttempts { + // remove the attempt from the hash lookup map + delete(as.attemptHashToTxAttempt, txAttempt.Hash) + // remove the attempt from the transaction + if tx := as.allTxs[txAttempt.TxID]; tx != nil { + var removeIndex int + for i := 0; i < len(tx.TxAttempts); i++ { + if tx.TxAttempts[i].ID == txAttempt.ID { + removeIndex = i + break + } + } + tx.TxAttempts = append(tx.TxAttempts[:removeIndex], tx.TxAttempts[removeIndex+1:]...) + } + } +} + +// addTxAttempt adds the given attempt to the transaction which matches its TxID. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxAttempts(txAttempts ...txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + as.Lock() + defer as.Unlock() + + var errs error + for _, txAttempt := range txAttempts { + tx := as.allTxs[txAttempt.TxID] + if tx == nil { + errs = errors.Join(errs, fmt.Errorf("no transaction with ID %d", txAttempt.TxID)) + continue + } + + // add the attempt to the transaction + if tx.TxAttempts == nil { + tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + } + tx.TxAttempts = append(tx.TxAttempts, txAttempt) + // add the attempt to the hash lookup map + as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt + } + + return errs +} + // peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } // peekInProgressTx returns the in-progress transaction without removing it from the in-progress state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { - return nil, nil +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + return nil } // addTxToUnstarted adds the given transaction to the unstarted queue. diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 757a6ee0507..e48fb068e61 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -174,17 +174,29 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR replacementAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) error { if oldAttempt.State != txmgrtypes.TxAttemptInProgress || replacementAttempt.State != txmgrtypes.TxAttemptInProgress { - return fmt.Errorf("save_replacement_in_progress_attempt: expected attempts to be in_progress") + return fmt.Errorf("expected attempts to be in_progress") } if oldAttempt.ID == 0 { - return fmt.Errorf("save_replacement_in_progress_attempt: expected oldattempt to have an ID") + return fmt.Errorf("expected oldAttempt to have an ID") } ms.addressStatesLock.RLock() defer ms.addressStatesLock.RUnlock() - as, ok := ms.addressStates[oldAttempt.Tx.FromAddress] - if !ok { - return fmt.Errorf("save_replacement_in_progress_attempt: %w", ErrAddressNotFound) + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return true + } + var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + var tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + for _, vas := range ms.addressStates { + txs := vas.findTxs(nil, filter, oldAttempt.TxID) + if len(txs) == 1 { + tx = &txs[0] + as = vas + break + } + } + if tx == nil { + return fmt.Errorf("save_replacement_in_progress_attempt: %w", ErrTxnNotFound) } // Persist to persistent storage @@ -193,20 +205,11 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR } // Update in memory store - tx, err := as.peekInProgressTx() - if tx == nil { - return fmt.Errorf("save_replacement_in_progress_attempt: %w", err) - } - - var found bool - for i := 0; i < len(tx.TxAttempts); i++ { - if tx.TxAttempts[i].ID == oldAttempt.ID { - tx.TxAttempts[i] = *replacementAttempt - found = true - } - } - if !found { - tx.TxAttempts = append(tx.TxAttempts, *replacementAttempt) + // delete the old attempt + as.deleteTxAttempts(oldAttempt) + // add the new attempt + if err := as.addTxAttempts(*replacementAttempt); err != nil { + return fmt.Errorf("save_replacement_in_progress_attempt: failed to add a replacement transaction attempt: %w", err) } return nil diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index b6a858a2c99..9b20ed5da0d 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -25,35 +25,35 @@ import ( func TestInMemoryStore_SaveReplacementInProgressAttempt(t *testing.T) { t.Parallel() - db := pgtest.NewSqlxDB(t) - _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) - persistentStore := cltest.NewTestTxStore(t, db, dbcfg) - kst := cltest.NewKeyStore(t, db, dbcfg) - _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) - - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - lggr := logger.TestSugared(t) - chainID := ethClient.ConfiguredChainID() - ctx := context.Background() - - inMemoryStore, err := commontxmgr.NewInMemoryStore[ - *big.Int, - common.Address, common.Hash, common.Hash, - *evmtypes.Receipt, - evmtypes.Nonce, - evmgas.EvmFee, - ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) - require.NoError(t, err) - t.Run("successfully replace tx attempt", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := context.Background() + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + // Insert a transaction into persistent store inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 123, fromAddress) - oldAttempt := inTx.TxAttempts[0] - newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID) // Insert the transaction into the in-memory store require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) - err := inMemoryStore.SaveReplacementInProgressAttempt( + oldAttempt := inTx.TxAttempts[0] + newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID) + err = inMemoryStore.SaveReplacementInProgressAttempt( testutils.Context(t), oldAttempt, &newAttempt, @@ -67,7 +67,64 @@ func TestInMemoryStore_SaveReplacementInProgressAttempt(t *testing.T) { require.Equal(t, 1, len(actTxs)) actTx := actTxs[0] assertTxEqual(t, expTx, actTx) - assert.Equal(t, txmgrtypes.TxAttemptBroadcast, actTx.TxAttempts[0].State) + assert.Equal(t, txmgrtypes.TxAttemptInProgress, actTx.TxAttempts[0].State) + assert.Equal(t, newAttempt.Hash, actTx.TxAttempts[0].Hash) + assert.NotEqual(t, oldAttempt.ID, actTx.TxAttempts[0].ID) + }) + + t.Run("error parity for in-memory vs persistent store", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := context.Background() + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 124, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + oldAttempt := inTx.TxAttempts[0] + newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID) + + t.Run("error when old attempt is not in progress", func(t *testing.T) { + oldAttempt.State = txmgrtypes.TxAttemptBroadcast + expErr := persistentStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt) + actErr := inMemoryStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.State = txmgrtypes.TxAttemptInProgress + }) + + t.Run("error when new attempt is not in progress", func(t *testing.T) { + newAttempt.State = txmgrtypes.TxAttemptBroadcast + expErr := persistentStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt) + actErr := inMemoryStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt) + assert.Equal(t, expErr, actErr) + newAttempt.State = txmgrtypes.TxAttemptInProgress + }) + + t.Run("error when old attempt id is 0", func(t *testing.T) { + originalID := oldAttempt.ID + oldAttempt.ID = 0 + expErr := persistentStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt) + actErr := inMemoryStore.SaveReplacementInProgressAttempt(testutils.Context(t), oldAttempt, &newAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.ID = originalID + }) }) } From cbacd55f3a47dc59dc93fbce7e79ffc869c865d8 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 20 Mar 2024 23:06:33 -0400 Subject: [PATCH 5/7] fix linter issue --- common/txmgr/address_state.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 9e488d789b6..46f96ab8bfd 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -276,7 +276,8 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxA defer as.Unlock() var errs error - for _, txAttempt := range txAttempts { + for i := 0; i < len(txAttempts); i++ { + txAttempt := txAttempts[i] tx := as.allTxs[txAttempt.TxID] if tx == nil { errs = errors.Join(errs, fmt.Errorf("no transaction with ID %d", txAttempt.TxID)) From 9978e9ba4f3530d2b470acfe92fc41efca9469fb Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 3 Apr 2024 14:48:31 -0400 Subject: [PATCH 6/7] address comments --- common/txmgr/address_state.go | 30 ++++++++++++------------------ common/txmgr/inmemory_store.go | 2 +- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 1d73e8a58f1..17d7dc0fe28 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -1,7 +1,6 @@ package txmgr import ( - "errors" "fmt" "sync" "time" @@ -330,29 +329,24 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) delete } // addTxAttempt adds the given attempt to the transaction which matches its TxID. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxAttempts(txAttempts ...txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxAttempt(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { as.Lock() defer as.Unlock() - var errs error - for i := 0; i < len(txAttempts); i++ { - txAttempt := txAttempts[i] - tx := as.allTxs[txAttempt.TxID] - if tx == nil { - errs = errors.Join(errs, fmt.Errorf("no transaction with ID %d", txAttempt.TxID)) - continue - } + tx, ok := as.allTxs[txAttempt.TxID] + if !ok || tx == nil { + return fmt.Errorf("no transaction with ID %d", txAttempt.TxID) + } - // add the attempt to the transaction - if tx.TxAttempts == nil { - tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} - } - tx.TxAttempts = append(tx.TxAttempts, txAttempt) - // add the attempt to the hash lookup map - as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt + // add the attempt to the transaction + if tx.TxAttempts == nil { + tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} } + tx.TxAttempts = append(tx.TxAttempts, txAttempt) + // add the attempt to the hash lookup map + as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt - return errs + return nil } // peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 2b13ba058ad..7bcac570888 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -219,7 +219,7 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR // delete the old attempt as.deleteTxAttempt(oldAttempt.TxID, oldAttempt.ID) // add the new attempt - if err := as.addTxAttempts(*replacementAttempt); err != nil { + if err := as.addTxAttempt(*replacementAttempt); err != nil { return fmt.Errorf("save_replacement_in_progress_attempt: failed to add a replacement transaction attempt: %w", err) } From 4a48dc42e5bd3169c842440b1cd717ff09089c93 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 4 Apr 2024 09:17:28 -0400 Subject: [PATCH 7/7] check if transaction exists --- common/txmgr/inmemory_store.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 38c41c8fd08..f66e32f83cc 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -208,6 +208,9 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat if !ok { return fmt.Errorf("update_tx_fatal_error: %w", ErrAddressNotFound) } + if !as.hasTx(tx.ID) { + return fmt.Errorf("update_tx_fatal_error: %w: %q", ErrTxnNotFound, tx.ID) + } // Persist to persistent storage if err := ms.persistentTxStore.UpdateTxFatalError(ctx, tx); err != nil {