diff --git a/.changeset/itchy-bugs-clean.md b/.changeset/itchy-bugs-clean.md index a09117f4ed9..beeed8ace1e 100644 --- a/.changeset/itchy-bugs-clean.md +++ b/.changeset/itchy-bugs-clean.md @@ -2,4 +2,4 @@ "chainlink": minor --- -Added a finalizer component to the TXM to mark transactions as finalized #internal +Introduced finalized transaction state. Added a finalizer component to the TXM to mark transactions as finalized. #internal diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index a5c2af0d4d7..a9e30ffff1e 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -1100,7 +1100,6 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar "txID", etx.ID, "attemptID", attempt.ID, "nReceipts", len(attempt.Receipts), - "finalized", etx.Finalized, "id", "confirmer", } diff --git a/common/txmgr/models.go b/common/txmgr/models.go index dd121a2c7c4..ca5e7d4f251 100644 --- a/common/txmgr/models.go +++ b/common/txmgr/models.go @@ -11,4 +11,5 @@ const ( TxUnconfirmed = txmgrtypes.TxState("unconfirmed") TxConfirmed = txmgrtypes.TxState("confirmed") TxConfirmedMissingReceipt = txmgrtypes.TxState("confirmed_missing_receipt") + TxFinalized = txmgrtypes.TxState("finalized") ) diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 1ceb4df3424..077e663edae 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -659,12 +659,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransac // Return unconfirmed for ConfirmedMissingReceipt since a receipt is required to determine if it is finalized return commontypes.Unconfirmed, nil case TxConfirmed: - if tx.Finalized { - // Return finalized if tx receipt's block is equal or older than the latest finalized block - return commontypes.Finalized, nil - } - // Return unconfirmed if tx receipt's block is newer than the latest finalized block + // Return unconfirmed for confirmed transactions because they are not yet finalized return commontypes.Unconfirmed, nil + case TxFinalized: + return commontypes.Finalized, nil case TxFatalError: // Use an ErrorClassifier to determine if the transaction is considered Fatal txErr := b.newErrorClassifier(tx.GetError()) diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 9087491dc03..e3a2ab7aefa 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -196,12 +196,12 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInPro return r0 } -// FindConfirmedTxesAwaitingFinalization provides a mock function with given fields: ctx, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindConfirmedTxesAwaitingFinalization(ctx context.Context, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { +// FindConfirmedTxes provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindConfirmedTxes(ctx context.Context, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, chainID) if len(ret) == 0 { - panic("no return value specified for FindConfirmedTxesAwaitingFinalization") + panic("no return value specified for FindConfirmedTxes") } var r0 []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] diff --git a/common/txmgr/types/tx.go b/common/txmgr/types/tx.go index 72c13b450ff..ee5e23a3bf9 100644 --- a/common/txmgr/types/tx.go +++ b/common/txmgr/types/tx.go @@ -215,7 +215,6 @@ type Tx[ InitialBroadcastAt *time.Time CreatedAt time.Time State TxState - Finalized bool TxAttempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] `json:"-"` // Marshalled TxMeta // Used for additional context around transactions which you want to log diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 487b1b0ba82..72461e76100 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -84,7 +84,7 @@ type TransactionStore[ FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) - FindConfirmedTxesAwaitingFinalization(ctx context.Context, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + FindConfirmedTxes(ctx context.Context, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetAbandonedTransactionsByBatch(ctx context.Context, chainID CHAIN_ID, enabledAddrs []ADDR, offset, limit uint) (txs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index 73b4c946cb3..6b107b222a6 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -2828,50 +2828,6 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { assert.Equal(t, txmgrtypes.TxAttemptBroadcast, attempt.State) assert.Len(t, attempt.Receipts, 1) }) - - t.Run("unconfirms, unfinalizes, and rebroadcasts finalized transactions that have receipts within head height of the chain but not included in the chain", func(t *testing.T) { - nonce := evmtypes.Nonce(8) - broadcast := time.Now() - tx := &txmgr.Tx{ - Sequence: &nonce, - FromAddress: fromAddress, - EncodedPayload: []byte{1, 2, 3}, - State: txmgrcommon.TxConfirmed, - BroadcastAt: &broadcast, - InitialBroadcastAt: &broadcast, - Finalized: true, - } - err := txStore.InsertTx(ctx, tx) - require.NoError(t, err) - etx, err := txStore.FindTxWithAttempts(ctx, tx.ID) - require.NoError(t, err) - attempt := cltest.NewLegacyEthTxAttempt(t, etx.ID) - broadcastBeforeBlockNum := int64(1) - attempt.BroadcastBeforeBlockNum = &broadcastBeforeBlockNum - attempt.State = txmgrtypes.TxAttemptBroadcast - err = txStore.InsertTxAttempt(ctx, &attempt) - require.NoError(t, err) - // Include one within head height but a different block hash - mustInsertEthReceipt(t, txStore, head.Parent.Number, testutils.NewHash(), attempt.Hash) - - ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { - atx, signErr := txmgr.GetGethSignedTx(attempt.SignedRawTx) - require.NoError(t, signErr) - // Keeps gas price and nonce the same - return atx.GasPrice().Cmp(tx.GasPrice()) == 0 && atx.Nonce() == tx.Nonce() - }), fromAddress).Return(commonclient.Successful, nil).Once() - - // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head)) - - etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) - require.NoError(t, err) - assert.Equal(t, txmgrcommon.TxUnconfirmed, etx.State) - require.Len(t, etx.TxAttempts, 1) - attempt = etx.TxAttempts[0] - assert.Equal(t, txmgrtypes.TxAttemptBroadcast, attempt.State) - assert.Equal(t, false, etx.Finalized) - }) } func TestEthConfirmer_ForceRebroadcast(t *testing.T) { diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 0b97bf78770..c1c05dbcebf 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -181,7 +181,6 @@ type DbEthTx struct { // InitialBroadcastAt is recorded once, the first ever time this eth_tx is sent CreatedAt time.Time State txmgrtypes.TxState - Finalized bool // Marshalled EvmTxMeta // Used for additional context around transactions which you want to log // at send time. @@ -212,7 +211,6 @@ func (db *DbEthTx) FromTx(tx *Tx) { db.BroadcastAt = tx.BroadcastAt db.CreatedAt = tx.CreatedAt db.State = tx.State - db.Finalized = tx.Finalized db.Meta = tx.Meta db.Subject = tx.Subject db.PipelineTaskRunID = tx.PipelineTaskRunID @@ -247,7 +245,6 @@ func (db DbEthTx) ToTx(tx *Tx) { tx.BroadcastAt = db.BroadcastAt tx.CreatedAt = db.CreatedAt tx.State = db.State - tx.Finalized = db.Finalized tx.Meta = db.Meta tx.Subject = db.Subject tx.PipelineTaskRunID = db.PipelineTaskRunID @@ -532,8 +529,8 @@ func (o *evmTxStore) InsertTx(ctx context.Context, etx *Tx) error { if etx.CreatedAt == (time.Time{}) { etx.CreatedAt = time.Now() } - const insertEthTxSQL = `INSERT INTO evm.txes (nonce, from_address, to_address, encoded_payload, value, gas_limit, error, broadcast_at, initial_broadcast_at, created_at, state, meta, subject, pipeline_task_run_id, min_confirmations, evm_chain_id, transmit_checker, idempotency_key, signal_callback, callback_completed, finalized) VALUES ( -:nonce, :from_address, :to_address, :encoded_payload, :value, :gas_limit, :error, :broadcast_at, :initial_broadcast_at, :created_at, :state, :meta, :subject, :pipeline_task_run_id, :min_confirmations, :evm_chain_id, :transmit_checker, :idempotency_key, :signal_callback, :callback_completed, :finalized + const insertEthTxSQL = `INSERT INTO evm.txes (nonce, from_address, to_address, encoded_payload, value, gas_limit, error, broadcast_at, initial_broadcast_at, created_at, state, meta, subject, pipeline_task_run_id, min_confirmations, evm_chain_id, transmit_checker, idempotency_key, signal_callback, callback_completed) VALUES ( +:nonce, :from_address, :to_address, :encoded_payload, :value, :gas_limit, :error, :broadcast_at, :initial_broadcast_at, :created_at, :state, :meta, :subject, :pipeline_task_run_id, :min_confirmations, :evm_chain_id, :transmit_checker, :idempotency_key, :signal_callback, :callback_completed ) RETURNING *` var dbTx DbEthTx dbTx.FromTx(etx) @@ -1119,13 +1116,11 @@ func updateEthTxAttemptUnbroadcast(ctx context.Context, orm *evmTxStore, attempt return pkgerrors.Wrap(err, "updateEthTxAttemptUnbroadcast failed") } -// Ensure to mark the transaction as not finalized in case there is a finality violation and a "finalized" transaction -// has been considered re-org'd out func updateEthTxUnconfirm(ctx context.Context, orm *evmTxStore, etx Tx) error { if etx.State != txmgr.TxConfirmed { - return errors.New("expected eth_tx state to be confirmed") + return errors.New("expected tx state to be confirmed") } - _, err := orm.q.ExecContext(ctx, `UPDATE evm.txes SET state = 'unconfirmed', finalized = false WHERE id = $1`, etx.ID) + _, err := orm.q.ExecContext(ctx, `UPDATE evm.txes SET state = 'unconfirmed' WHERE id = $1`, etx.ID) return pkgerrors.Wrap(err, "updateEthTxUnconfirm failed") } @@ -1884,8 +1879,7 @@ USING old_enough_receipts, evm.tx_attempts WHERE evm.tx_attempts.eth_tx_id = evm.txes.id AND evm.tx_attempts.hash = old_enough_receipts.tx_hash AND evm.txes.created_at < $2 -AND evm.txes.state = 'confirmed' -AND evm.txes.finalized = true +AND evm.txes.state = 'finalized' AND evm_chain_id = $3`, limit, timeThreshold, chainID.String()) if err != nil { return count, pkgerrors.Wrap(err, "ReapTxes failed to delete old confirmed evm.txes") @@ -2047,13 +2041,13 @@ func (o *evmTxStore) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, return err } -// Returns all confirmed transactions not yet marked as finalized -func (o *evmTxStore) FindConfirmedTxesAwaitingFinalization(ctx context.Context, chainID *big.Int) (txes []*Tx, err error) { +// Returns all confirmed transactions +func (o *evmTxStore) FindConfirmedTxes(ctx context.Context, chainID *big.Int) (txes []*Tx, err error) { var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) defer cancel() err = o.Transact(ctx, true, func(orm *evmTxStore) error { - sql := "SELECT * FROM evm.txes WHERE state = 'confirmed' AND finalized = false AND evm_chain_id = $1" + sql := "SELECT * FROM evm.txes WHERE state = 'confirmed' AND evm_chain_id = $1" var dbEtxs []DbEthTx err = o.q.SelectContext(ctx, &dbEtxs, sql, chainID.String()) if len(dbEtxs) == 0 { @@ -2077,7 +2071,7 @@ func (o *evmTxStore) UpdateTxesFinalized(ctx context.Context, etxIDs []int64, ch var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) defer cancel() - sql := "UPDATE evm.txes SET finalized = true WHERE id = ANY($1) AND evm_chain_id = $2" + sql := "UPDATE evm.txes SET state = 'finalized' WHERE id = ANY($1) AND evm_chain_id = $2" _, err := o.q.ExecContext(ctx, sql, pq.Array(etxIDs), chainId.String()) return err } diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index dde8e047bfd..fee2fc2d377 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -1864,7 +1864,7 @@ func TestORM_FindTransactionsByState(t *testing.T) { mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 3, 100) mustInsertFatalErrorEthTx(t, txStore, fromAddress) - txs, err := txStore.FindConfirmedTxesAwaitingFinalization(ctx, testutils.FixtureChainID) + txs, err := txStore.FindConfirmedTxes(ctx, testutils.FixtureChainID) require.NoError(t, err) require.Len(t, txs, 1) } @@ -1895,23 +1895,6 @@ func TestORM_UpdateTxesFinalized(t *testing.T) { require.NoError(t, err) etx, err := txStore.FindTxWithAttempts(ctx, tx.ID) require.NoError(t, err) - require.True(t, etx.Finalized) - }) - t.Run("fails to finalize an unconfirmed transaction", func(t *testing.T) { - nonce := evmtypes.Nonce(1) - tx := &txmgr.Tx{ - Sequence: &nonce, - FromAddress: fromAddress, - EncodedPayload: []byte{1, 2, 3}, - State: txmgrcommon.TxUnconfirmed, - BroadcastAt: &broadcast, - InitialBroadcastAt: &broadcast, - } - err := txStore.InsertTx(ctx, tx) - require.NoError(t, err) - err = txStore.UpdateTxesFinalized(ctx, []int64{tx.ID}, testutils.FixtureChainID) - // Fails due to chk_eth_txes_state_finalized constraint - // Tx Store is poisoned after this - require.ErrorContains(t, err, "chk_eth_txes_state_finalized") + require.Equal(t, txmgrcommon.TxFinalized, etx.State) }) } diff --git a/core/chains/evm/txmgr/finalizer.go b/core/chains/evm/txmgr/finalizer.go index 08e3c8d8f42..94c54e534fa 100644 --- a/core/chains/evm/txmgr/finalizer.go +++ b/core/chains/evm/txmgr/finalizer.go @@ -24,7 +24,7 @@ var _ Finalizer = (*evmFinalizer)(nil) const processHeadTimeout = 10 * time.Minute type finalizerTxStore interface { - FindConfirmedTxesAwaitingFinalization(ctx context.Context, chainID *big.Int) ([]*Tx, error) + FindConfirmedTxes(ctx context.Context, chainID *big.Int) ([]*Tx, error) UpdateTxesFinalized(ctx context.Context, txs []int64, chainId *big.Int) error } @@ -152,7 +152,7 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized f.lggr.Debugw("processing latest finalized head", "block num", latestFinalizedHead.BlockNumber(), "block hash", latestFinalizedHead.BlockHash(), "earliest block num in chain", earliestBlockNumInChain) // Retrieve all confirmed transactions, loaded with attempts and receipts - unfinalizedTxs, err := f.txStore.FindConfirmedTxesAwaitingFinalization(ctx, f.chainId) + unfinalizedTxs, err := f.txStore.FindConfirmedTxes(ctx, f.chainId) if err != nil { return fmt.Errorf("failed to retrieve confirmed transactions: %w", err) } @@ -162,10 +162,6 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized receiptBlockHashToTx := make(map[common.Hash][]*Tx) // Find transactions with receipt block nums older than the latest finalized block num and block hashes still in chain for _, tx := range unfinalizedTxs { - // Only consider transactions not already marked as finalized - if tx.Finalized { - continue - } receipt := tx.GetReceipt() if receipt == nil || receipt.IsZero() || receipt.IsUnmined() { f.lggr.AssumptionViolationw("invalid receipt found for confirmed transaction", "tx", tx, "receipt", receipt) diff --git a/core/chains/evm/txmgr/finalizer_test.go b/core/chains/evm/txmgr/finalizer_test.go index 24dc97b4cbd..50d46a61c08 100644 --- a/core/chains/evm/txmgr/finalizer_test.go +++ b/core/chains/evm/txmgr/finalizer_test.go @@ -74,7 +74,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { require.NoError(t, err) tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID) require.NoError(t, err) - require.Equal(t, false, tx.Finalized) + require.Equal(t, txmgrcommon.TxConfirmed, tx.State) }) t.Run("returns not finalized for tx with receipt re-org'd out", func(t *testing.T) { @@ -101,7 +101,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { require.NoError(t, err) tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID) require.NoError(t, err) - require.Equal(t, false, tx.Finalized) + require.Equal(t, txmgrcommon.TxConfirmed, tx.State) }) t.Run("returns finalized for tx with receipt in a finalized block", func(t *testing.T) { @@ -118,7 +118,6 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { State: txmgrcommon.TxConfirmed, BroadcastAt: &broadcast, InitialBroadcastAt: &broadcast, - Finalized: true, } attemptHash := insertTxAndAttemptWithIdempotencyKey(t, txStore, tx, idempotencyKey) // Insert receipt for finalized block num @@ -129,7 +128,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { require.NoError(t, err) tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID) require.NoError(t, err) - require.Equal(t, true, tx.Finalized) + require.Equal(t, txmgrcommon.TxFinalized, tx.State) }) t.Run("returns finalized for tx with receipt older than block history depth", func(t *testing.T) { @@ -192,7 +191,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { require.NoError(t, err) tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID) require.NoError(t, err) - require.Equal(t, true, tx.Finalized) + require.Equal(t, txmgrcommon.TxFinalized, tx.State) }) t.Run("returns error if failed to retrieve latest head in headtracker", func(t *testing.T) { diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index 454d6ab8f20..a8683505b5c 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -199,12 +199,12 @@ func (_m *EvmTxStore) DeleteInProgressAttempt(ctx context.Context, attempt types return r0 } -// FindConfirmedTxesAwaitingFinalization provides a mock function with given fields: ctx, chainID -func (_m *EvmTxStore) FindConfirmedTxesAwaitingFinalization(ctx context.Context, chainID *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { +// FindConfirmedTxes provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) FindConfirmedTxes(ctx context.Context, chainID *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { ret := _m.Called(ctx, chainID) if len(ret) == 0 { - panic("no return value specified for FindConfirmedTxesAwaitingFinalization") + panic("no return value specified for FindConfirmedTxes") } var r0 []*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] diff --git a/core/chains/evm/txmgr/reaper_test.go b/core/chains/evm/txmgr/reaper_test.go index eaa7eecb252..a8fe81ae1b9 100644 --- a/core/chains/evm/txmgr/reaper_test.go +++ b/core/chains/evm/txmgr/reaper_test.go @@ -83,7 +83,7 @@ func TestReaper_ReapTxes(t *testing.T) { cltest.AssertCount(t, db, "evm.txes", 1) }) - t.Run("deletes confirmed evm.txes marked as finalized that exceed the age threshold", func(t *testing.T) { + t.Run("deletes finalized evm.txes that exceed the age threshold", func(t *testing.T) { tc := &reaperConfig{reaperThreshold: 1 * time.Hour} r := newReaper(t, txStore, tc) @@ -100,7 +100,7 @@ func TestReaper_ReapTxes(t *testing.T) { // Didn't delete because eth_tx although old enough, was not marked as finalized cltest.AssertCount(t, db, "evm.txes", 1) - pgtest.MustExec(t, db, `UPDATE evm.txes SET finalized=true`) + pgtest.MustExec(t, db, `UPDATE evm.txes SET state='finalized'`) err = r.ReapTxes(42) assert.NoError(t, err) diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index b0e80b396bf..5f932db8720 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -702,7 +702,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) { require.Equal(t, commontypes.Unconfirmed, state) }) - t.Run("returns unconfirmed for confirmed state not marked as finalized", func(t *testing.T) { + t.Run("returns unconfirmed for confirmed state", func(t *testing.T) { idempotencyKey := uuid.New().String() _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) nonce := evmtypes.Nonce(0) @@ -716,7 +716,6 @@ func TestTxm_GetTransactionStatus(t *testing.T) { State: txmgrcommon.TxConfirmed, BroadcastAt: &broadcast, InitialBroadcastAt: &broadcast, - Finalized: false, // Set to false by default in DB but here for explicitness } err := txStore.InsertTx(ctx, tx) require.NoError(t, err) @@ -732,7 +731,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) { require.Equal(t, commontypes.Unconfirmed, state) }) - t.Run("returns finalized for confirmed state marked as finalized", func(t *testing.T) { + t.Run("returns finalized for finalized state", func(t *testing.T) { idempotencyKey := uuid.New().String() _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) nonce := evmtypes.Nonce(0) @@ -743,10 +742,9 @@ func TestTxm_GetTransactionStatus(t *testing.T) { FromAddress: fromAddress, EncodedPayload: []byte{1, 2, 3}, FeeLimit: feeLimit, - State: txmgrcommon.TxConfirmed, + State: txmgrcommon.TxFinalized, BroadcastAt: &broadcast, InitialBroadcastAt: &broadcast, - Finalized: true, } err := txStore.InsertTx(ctx, tx) require.NoError(t, err) diff --git a/core/store/migrate/migrate_test.go b/core/store/migrate/migrate_test.go index 8a7d1628a4c..7a03273551d 100644 --- a/core/store/migrate/migrate_test.go +++ b/core/store/migrate/migrate_test.go @@ -600,3 +600,11 @@ func BenchmarkBackfillingRecordsWithMigration202(b *testing.B) { require.NoError(b, err) } } + +func TestRollback_247_TxStateEnumUpdate(t *testing.T) { + _, db := heavyweight.FullTestDBV2(t, nil) + err := goose.DownTo(db.DB, migrationDir, 54) + require.NoError(t, err) + err = goose.UpTo(db.DB, migrationDir, 247) + require.NoError(t, err) +} diff --git a/core/store/migrate/migrations/0247_add_tx_finalized_column.sql b/core/store/migrate/migrations/0247_add_tx_finalized_column.sql deleted file mode 100644 index 5f2c5c5ffb2..00000000000 --- a/core/store/migrate/migrations/0247_add_tx_finalized_column.sql +++ /dev/null @@ -1,15 +0,0 @@ --- +goose Up --- +goose StatementBegin -ALTER TABLE evm.txes ADD COLUMN finalized boolean NOT NULL DEFAULT false; -ALTER TABLE evm.txes ADD CONSTRAINT chk_eth_txes_state_finalized CHECK ( - state <> 'confirmed'::evm.txes_state AND finalized = false - OR - state = 'confirmed'::evm.txes_state -) NOT VALID; --- +goose StatementEnd - --- +goose Down --- +goose StatementBegin -ALTER TABLE evm.txes DROP CONSTRAINT chk_eth_txes_state_finalized; -ALTER TABLE evm.txes DROP COLUMN finalized; --- +goose StatementEnd diff --git a/core/store/migrate/migrations/0247_add_tx_finalized_state.sql b/core/store/migrate/migrations/0247_add_tx_finalized_state.sql new file mode 100644 index 00000000000..dcfe8eec734 --- /dev/null +++ b/core/store/migrate/migrations/0247_add_tx_finalized_state.sql @@ -0,0 +1,135 @@ +-- +goose Up +-- Creating new column and enum instead of just adding new value to the existing enum so the migration changes match the rollback logic +-- Otherwise, migration will complain about mismatching column order + +-- +goose StatementBegin +-- Rename the existing enum with finalized state to mark it as old +ALTER TYPE evm.txes_state RENAME TO txes_state_old; + +-- Create new enum without finalized state +CREATE TYPE evm.txes_state AS ENUM ( + 'unstarted', + 'in_progress', + 'fatal_error', + 'unconfirmed', + 'confirmed_missing_receipt', + 'confirmed', + 'finalized' +); + +-- Add a new state column with the new enum type to the txes table +ALTER TABLE evm.txes ADD COLUMN state_new evm.txes_state; + +-- Copy data from the old column to the new +UPDATE evm.txes SET state_new = state::text::evm.txes_state; + +-- Drop constraints referring to old enum type on the old state column +ALTER TABLE evm.txes ALTER COLUMN state DROP DEFAULT; +ALTER TABLE evm.txes DROP CONSTRAINT chk_eth_txes_fsm; +DROP INDEX IF EXISTS idx_eth_txes_state_from_address_evm_chain_id; +DROP INDEX IF EXISTS idx_eth_txes_min_unconfirmed_nonce_for_key_evm_chain_id; +DROP INDEX IF EXISTS idx_only_one_in_progress_tx_per_account_id_per_evm_chain_id; +DROP INDEX IF EXISTS idx_eth_txes_unstarted_subject_id_evm_chain_id; + +-- Drop the old state column +ALTER TABLE evm.txes DROP state; + +-- Drop the old enum type +DROP TYPE evm.txes_state_old; + +-- Rename the new column name state to replace the old column +ALTER TABLE evm.txes RENAME state_new TO state; + +-- Reset the state column's default +ALTER TABLE evm.txes ALTER COLUMN state SET DEFAULT 'unstarted'::evm.txes_state, ALTER COLUMN state SET NOT NULL; + +-- Recreate constraint with finalized state +ALTER TABLE evm.txes ADD CONSTRAINT chk_eth_txes_fsm CHECK ( + state = 'unstarted'::evm.txes_state AND nonce IS NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL + OR + state = 'in_progress'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL + OR + state = 'fatal_error'::evm.txes_state AND error IS NOT NULL + OR + state = 'unconfirmed'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + OR + state = 'confirmed'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + OR + state = 'confirmed_missing_receipt'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + OR + state = 'finalized'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL +) NOT VALID; + +-- Recreate index to include finalized state +CREATE INDEX idx_eth_txes_state_from_address_evm_chain_id ON evm.txes(evm_chain_id, from_address, state) WHERE state <> 'confirmed'::evm.txes_state AND state <> 'finalized'::evm.txes_state; +CREATE INDEX idx_eth_txes_min_unconfirmed_nonce_for_key_evm_chain_id ON evm.txes(evm_chain_id, from_address, nonce) WHERE state = 'unconfirmed'::evm.txes_state; +CREATE UNIQUE INDEX idx_only_one_in_progress_tx_per_account_id_per_evm_chain_id ON evm.txes(evm_chain_id, from_address) WHERE state = 'in_progress'::evm.txes_state; +CREATE INDEX idx_eth_txes_unstarted_subject_id_evm_chain_id ON evm.txes(evm_chain_id, subject, id) WHERE subject IS NOT NULL AND state = 'unstarted'::evm.txes_state; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin + +-- Rename the existing enum with finalized state to mark it as old +ALTER TYPE evm.txes_state RENAME TO txes_state_old; + +-- Create new enum without finalized state +CREATE TYPE evm.txes_state AS ENUM ( + 'unstarted', + 'in_progress', + 'fatal_error', + 'unconfirmed', + 'confirmed_missing_receipt', + 'confirmed' +); + +-- Add a new state column with the new enum type to the txes table +ALTER TABLE evm.txes ADD COLUMN state_new evm.txes_state; + +-- Update all transactions with finalized state to confirmed in the old state column +UPDATE evm.txes SET state = 'confirmed'::evm.txes_state_old WHERE state = 'finalized'::evm.txes_state_old; + +-- Copy data from the old column to the new +UPDATE evm.txes SET state_new = state::text::evm.txes_state; + +-- Drop constraints referring to old enum type on the old state column +ALTER TABLE evm.txes ALTER COLUMN state DROP DEFAULT; +ALTER TABLE evm.txes DROP CONSTRAINT chk_eth_txes_fsm; +DROP INDEX IF EXISTS idx_eth_txes_state_from_address_evm_chain_id; +DROP INDEX IF EXISTS idx_eth_txes_min_unconfirmed_nonce_for_key_evm_chain_id; +DROP INDEX IF EXISTS idx_only_one_in_progress_tx_per_account_id_per_evm_chain_id; +DROP INDEX IF EXISTS idx_eth_txes_unstarted_subject_id_evm_chain_id; + +-- Drop the old state column +ALTER TABLE evm.txes DROP state; + +-- Drop the old enum type +DROP TYPE evm.txes_state_old; + +-- Rename the new column name state to replace the old column +ALTER TABLE evm.txes RENAME state_new TO state; + +-- Reset the state column's default +ALTER TABLE evm.txes ALTER COLUMN state SET DEFAULT 'unstarted'::evm.txes_state, ALTER COLUMN state SET NOT NULL; + +-- Recereate constraint without finalized state +ALTER TABLE evm.txes ADD CONSTRAINT chk_eth_txes_fsm CHECK ( + state = 'unstarted'::evm.txes_state AND nonce IS NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL + OR + state = 'in_progress'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL + OR + state = 'fatal_error'::evm.txes_state AND error IS NOT NULL + OR + state = 'unconfirmed'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + OR + state = 'confirmed'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + OR + state = 'confirmed_missing_receipt'::evm.txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL +) NOT VALID; + +-- Recreate index with new enum type +CREATE INDEX idx_eth_txes_state_from_address_evm_chain_id ON evm.txes(evm_chain_id, from_address, state) WHERE state <> 'confirmed'::evm.txes_state; +CREATE INDEX idx_eth_txes_min_unconfirmed_nonce_for_key_evm_chain_id ON evm.txes(evm_chain_id, from_address, nonce) WHERE state = 'unconfirmed'::evm.txes_state; +CREATE UNIQUE INDEX idx_only_one_in_progress_tx_per_account_id_per_evm_chain_id ON evm.txes(evm_chain_id, from_address) WHERE state = 'in_progress'::evm.txes_state; +CREATE INDEX idx_eth_txes_unstarted_subject_id_evm_chain_id ON evm.txes(evm_chain_id, subject, id) WHERE subject IS NOT NULL AND state = 'unstarted'::evm.txes_state; +-- +goose StatementEnd