Skip to content

Commit

Permalink
Added finalized state to replace finalized column
Browse files Browse the repository at this point in the history
  • Loading branch information
amit-momin committed Jul 9, 2024
1 parent 599edac commit 911d342
Show file tree
Hide file tree
Showing 18 changed files with 177 additions and 126 deletions.
2 changes: 1 addition & 1 deletion .changeset/itchy-bugs-clean.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
"chainlink": minor
---

Added a finalizer component to the TXM to mark transactions as finalized #internal
Introduced finalized transaction state. Added a finalizer component to the TXM to mark transactions as finalized. #internal
1 change: 0 additions & 1 deletion common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,6 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar
"txID", etx.ID,
"attemptID", attempt.ID,
"nReceipts", len(attempt.Receipts),
"finalized", etx.Finalized,
"id", "confirmer",
}

Expand Down
1 change: 1 addition & 0 deletions common/txmgr/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ const (
TxUnconfirmed = txmgrtypes.TxState("unconfirmed")
TxConfirmed = txmgrtypes.TxState("confirmed")
TxConfirmedMissingReceipt = txmgrtypes.TxState("confirmed_missing_receipt")
TxFinalized = txmgrtypes.TxState("finalized")
)
8 changes: 3 additions & 5 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,12 +659,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransac
// Return unconfirmed for ConfirmedMissingReceipt since a receipt is required to determine if it is finalized
return commontypes.Unconfirmed, nil
case TxConfirmed:
if tx.Finalized {
// Return finalized if tx receipt's block is equal or older than the latest finalized block
return commontypes.Finalized, nil
}
// Return unconfirmed if tx receipt's block is newer than the latest finalized block
// Return unconfirmed for confirmed transactions because they are not yet finalized
return commontypes.Unconfirmed, nil
case TxFinalized:
return commontypes.Finalized, nil
case TxFatalError:
// Use an ErrorClassifier to determine if the transaction is considered Fatal
txErr := b.newErrorClassifier(tx.GetError())
Expand Down
6 changes: 3 additions & 3 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ type Tx[
InitialBroadcastAt *time.Time
CreatedAt time.Time
State TxState
Finalized bool
TxAttempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] `json:"-"`
// Marshalled TxMeta
// Used for additional context around transactions which you want to log
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type TransactionStore[
FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error)
FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error)
FindConfirmedTxesAwaitingFinalization(ctx context.Context, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindConfirmedTxes(ctx context.Context, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
GetAbandonedTransactionsByBatch(ctx context.Context, chainID CHAIN_ID, enabledAddrs []ADDR, offset, limit uint) (txs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Expand Down
44 changes: 0 additions & 44 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2828,50 +2828,6 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) {
assert.Equal(t, txmgrtypes.TxAttemptBroadcast, attempt.State)
assert.Len(t, attempt.Receipts, 1)
})

t.Run("unconfirms, unfinalizes, and rebroadcasts finalized transactions that have receipts within head height of the chain but not included in the chain", func(t *testing.T) {
nonce := evmtypes.Nonce(8)
broadcast := time.Now()
tx := &txmgr.Tx{
Sequence: &nonce,
FromAddress: fromAddress,
EncodedPayload: []byte{1, 2, 3},
State: txmgrcommon.TxConfirmed,
BroadcastAt: &broadcast,
InitialBroadcastAt: &broadcast,
Finalized: true,
}
err := txStore.InsertTx(ctx, tx)
require.NoError(t, err)
etx, err := txStore.FindTxWithAttempts(ctx, tx.ID)
require.NoError(t, err)
attempt := cltest.NewLegacyEthTxAttempt(t, etx.ID)
broadcastBeforeBlockNum := int64(1)
attempt.BroadcastBeforeBlockNum = &broadcastBeforeBlockNum
attempt.State = txmgrtypes.TxAttemptBroadcast
err = txStore.InsertTxAttempt(ctx, &attempt)
require.NoError(t, err)
// Include one within head height but a different block hash
mustInsertEthReceipt(t, txStore, head.Parent.Number, testutils.NewHash(), attempt.Hash)

ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool {
atx, signErr := txmgr.GetGethSignedTx(attempt.SignedRawTx)
require.NoError(t, signErr)
// Keeps gas price and nonce the same
return atx.GasPrice().Cmp(tx.GasPrice()) == 0 && atx.Nonce() == tx.Nonce()
}), fromAddress).Return(commonclient.Successful, nil).Once()

// Do the thing
require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head))

etx, err = txStore.FindTxWithAttempts(ctx, etx.ID)
require.NoError(t, err)
assert.Equal(t, txmgrcommon.TxUnconfirmed, etx.State)
require.Len(t, etx.TxAttempts, 1)
attempt = etx.TxAttempts[0]
assert.Equal(t, txmgrtypes.TxAttemptBroadcast, attempt.State)
assert.Equal(t, false, etx.Finalized)
})
}

func TestEthConfirmer_ForceRebroadcast(t *testing.T) {
Expand Down
24 changes: 9 additions & 15 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ type DbEthTx struct {
// InitialBroadcastAt is recorded once, the first ever time this eth_tx is sent
CreatedAt time.Time
State txmgrtypes.TxState
Finalized bool
// Marshalled EvmTxMeta
// Used for additional context around transactions which you want to log
// at send time.
Expand Down Expand Up @@ -212,7 +211,6 @@ func (db *DbEthTx) FromTx(tx *Tx) {
db.BroadcastAt = tx.BroadcastAt
db.CreatedAt = tx.CreatedAt
db.State = tx.State
db.Finalized = tx.Finalized
db.Meta = tx.Meta
db.Subject = tx.Subject
db.PipelineTaskRunID = tx.PipelineTaskRunID
Expand Down Expand Up @@ -247,7 +245,6 @@ func (db DbEthTx) ToTx(tx *Tx) {
tx.BroadcastAt = db.BroadcastAt
tx.CreatedAt = db.CreatedAt
tx.State = db.State
tx.Finalized = db.Finalized
tx.Meta = db.Meta
tx.Subject = db.Subject
tx.PipelineTaskRunID = db.PipelineTaskRunID
Expand Down Expand Up @@ -532,8 +529,8 @@ func (o *evmTxStore) InsertTx(ctx context.Context, etx *Tx) error {
if etx.CreatedAt == (time.Time{}) {
etx.CreatedAt = time.Now()
}
const insertEthTxSQL = `INSERT INTO evm.txes (nonce, from_address, to_address, encoded_payload, value, gas_limit, error, broadcast_at, initial_broadcast_at, created_at, state, meta, subject, pipeline_task_run_id, min_confirmations, evm_chain_id, transmit_checker, idempotency_key, signal_callback, callback_completed, finalized) VALUES (
:nonce, :from_address, :to_address, :encoded_payload, :value, :gas_limit, :error, :broadcast_at, :initial_broadcast_at, :created_at, :state, :meta, :subject, :pipeline_task_run_id, :min_confirmations, :evm_chain_id, :transmit_checker, :idempotency_key, :signal_callback, :callback_completed, :finalized
const insertEthTxSQL = `INSERT INTO evm.txes (nonce, from_address, to_address, encoded_payload, value, gas_limit, error, broadcast_at, initial_broadcast_at, created_at, state, meta, subject, pipeline_task_run_id, min_confirmations, evm_chain_id, transmit_checker, idempotency_key, signal_callback, callback_completed) VALUES (
:nonce, :from_address, :to_address, :encoded_payload, :value, :gas_limit, :error, :broadcast_at, :initial_broadcast_at, :created_at, :state, :meta, :subject, :pipeline_task_run_id, :min_confirmations, :evm_chain_id, :transmit_checker, :idempotency_key, :signal_callback, :callback_completed
) RETURNING *`
var dbTx DbEthTx
dbTx.FromTx(etx)
Expand Down Expand Up @@ -1119,13 +1116,11 @@ func updateEthTxAttemptUnbroadcast(ctx context.Context, orm *evmTxStore, attempt
return pkgerrors.Wrap(err, "updateEthTxAttemptUnbroadcast failed")
}

// Ensure to mark the transaction as not finalized in case there is a finality violation and a "finalized" transaction
// has been considered re-org'd out
func updateEthTxUnconfirm(ctx context.Context, orm *evmTxStore, etx Tx) error {
if etx.State != txmgr.TxConfirmed {
return errors.New("expected eth_tx state to be confirmed")
return errors.New("expected tx state to be confirmed")
}
_, err := orm.q.ExecContext(ctx, `UPDATE evm.txes SET state = 'unconfirmed', finalized = false WHERE id = $1`, etx.ID)
_, err := orm.q.ExecContext(ctx, `UPDATE evm.txes SET state = 'unconfirmed' WHERE id = $1`, etx.ID)
return pkgerrors.Wrap(err, "updateEthTxUnconfirm failed")
}

Expand Down Expand Up @@ -1884,8 +1879,7 @@ USING old_enough_receipts, evm.tx_attempts
WHERE evm.tx_attempts.eth_tx_id = evm.txes.id
AND evm.tx_attempts.hash = old_enough_receipts.tx_hash
AND evm.txes.created_at < $2
AND evm.txes.state = 'confirmed'
AND evm.txes.finalized = true
AND evm.txes.state = 'finalized'
AND evm_chain_id = $3`, limit, timeThreshold, chainID.String())
if err != nil {
return count, pkgerrors.Wrap(err, "ReapTxes failed to delete old confirmed evm.txes")
Expand Down Expand Up @@ -2047,13 +2041,13 @@ func (o *evmTxStore) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context,
return err
}

// Returns all confirmed transactions not yet marked as finalized
func (o *evmTxStore) FindConfirmedTxesAwaitingFinalization(ctx context.Context, chainID *big.Int) (txes []*Tx, err error) {
// Returns all confirmed transactions
func (o *evmTxStore) FindConfirmedTxes(ctx context.Context, chainID *big.Int) (txes []*Tx, err error) {
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()
err = o.Transact(ctx, true, func(orm *evmTxStore) error {
sql := "SELECT * FROM evm.txes WHERE state = 'confirmed' AND finalized = false AND evm_chain_id = $1"
sql := "SELECT * FROM evm.txes WHERE state = 'confirmed' AND evm_chain_id = $1"
var dbEtxs []DbEthTx
err = o.q.SelectContext(ctx, &dbEtxs, sql, chainID.String())
if len(dbEtxs) == 0 {
Expand All @@ -2077,7 +2071,7 @@ func (o *evmTxStore) UpdateTxesFinalized(ctx context.Context, etxIDs []int64, ch
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()
sql := "UPDATE evm.txes SET finalized = true WHERE id = ANY($1) AND evm_chain_id = $2"
sql := "UPDATE evm.txes SET state = 'finalized' WHERE id = ANY($1) AND evm_chain_id = $2"
_, err := o.q.ExecContext(ctx, sql, pq.Array(etxIDs), chainId.String())
return err
}
21 changes: 2 additions & 19 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1864,7 +1864,7 @@ func TestORM_FindTransactionsByState(t *testing.T) {
mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 3, 100)
mustInsertFatalErrorEthTx(t, txStore, fromAddress)

txs, err := txStore.FindConfirmedTxesAwaitingFinalization(ctx, testutils.FixtureChainID)
txs, err := txStore.FindConfirmedTxes(ctx, testutils.FixtureChainID)
require.NoError(t, err)
require.Len(t, txs, 1)
}
Expand Down Expand Up @@ -1895,23 +1895,6 @@ func TestORM_UpdateTxesFinalized(t *testing.T) {
require.NoError(t, err)
etx, err := txStore.FindTxWithAttempts(ctx, tx.ID)
require.NoError(t, err)
require.True(t, etx.Finalized)
})
t.Run("fails to finalize an unconfirmed transaction", func(t *testing.T) {
nonce := evmtypes.Nonce(1)
tx := &txmgr.Tx{
Sequence: &nonce,
FromAddress: fromAddress,
EncodedPayload: []byte{1, 2, 3},
State: txmgrcommon.TxUnconfirmed,
BroadcastAt: &broadcast,
InitialBroadcastAt: &broadcast,
}
err := txStore.InsertTx(ctx, tx)
require.NoError(t, err)
err = txStore.UpdateTxesFinalized(ctx, []int64{tx.ID}, testutils.FixtureChainID)
// Fails due to chk_eth_txes_state_finalized constraint
// Tx Store is poisoned after this
require.ErrorContains(t, err, "chk_eth_txes_state_finalized")
require.Equal(t, txmgrcommon.TxFinalized, etx.State)
})
}
8 changes: 2 additions & 6 deletions core/chains/evm/txmgr/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var _ Finalizer = (*evmFinalizer)(nil)
const processHeadTimeout = 10 * time.Minute

type finalizerTxStore interface {
FindConfirmedTxesAwaitingFinalization(ctx context.Context, chainID *big.Int) ([]*Tx, error)
FindConfirmedTxes(ctx context.Context, chainID *big.Int) ([]*Tx, error)
UpdateTxesFinalized(ctx context.Context, txs []int64, chainId *big.Int) error
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized
f.lggr.Debugw("processing latest finalized head", "block num", latestFinalizedHead.BlockNumber(), "block hash", latestFinalizedHead.BlockHash(), "earliest block num in chain", earliestBlockNumInChain)

// Retrieve all confirmed transactions, loaded with attempts and receipts
unfinalizedTxs, err := f.txStore.FindConfirmedTxesAwaitingFinalization(ctx, f.chainId)
unfinalizedTxs, err := f.txStore.FindConfirmedTxes(ctx, f.chainId)
if err != nil {
return fmt.Errorf("failed to retrieve confirmed transactions: %w", err)
}
Expand All @@ -162,10 +162,6 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized
receiptBlockHashToTx := make(map[common.Hash][]*Tx)
// Find transactions with receipt block nums older than the latest finalized block num and block hashes still in chain
for _, tx := range unfinalizedTxs {
// Only consider transactions not already marked as finalized
if tx.Finalized {
continue
}
receipt := tx.GetReceipt()
if receipt == nil || receipt.IsZero() || receipt.IsUnmined() {
f.lggr.AssumptionViolationw("invalid receipt found for confirmed transaction", "tx", tx, "receipt", receipt)
Expand Down
9 changes: 4 additions & 5 deletions core/chains/evm/txmgr/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) {
require.NoError(t, err)
tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID)
require.NoError(t, err)
require.Equal(t, false, tx.Finalized)
require.Equal(t, txmgrcommon.TxConfirmed, tx.State)
})

t.Run("returns not finalized for tx with receipt re-org'd out", func(t *testing.T) {
Expand All @@ -101,7 +101,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) {
require.NoError(t, err)
tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID)
require.NoError(t, err)
require.Equal(t, false, tx.Finalized)
require.Equal(t, txmgrcommon.TxConfirmed, tx.State)
})

t.Run("returns finalized for tx with receipt in a finalized block", func(t *testing.T) {
Expand All @@ -118,7 +118,6 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) {
State: txmgrcommon.TxConfirmed,
BroadcastAt: &broadcast,
InitialBroadcastAt: &broadcast,
Finalized: true,
}
attemptHash := insertTxAndAttemptWithIdempotencyKey(t, txStore, tx, idempotencyKey)
// Insert receipt for finalized block num
Expand All @@ -129,7 +128,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) {
require.NoError(t, err)
tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID)
require.NoError(t, err)
require.Equal(t, true, tx.Finalized)
require.Equal(t, txmgrcommon.TxFinalized, tx.State)
})

t.Run("returns finalized for tx with receipt older than block history depth", func(t *testing.T) {
Expand Down Expand Up @@ -192,7 +191,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) {
require.NoError(t, err)
tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID)
require.NoError(t, err)
require.Equal(t, true, tx.Finalized)
require.Equal(t, txmgrcommon.TxFinalized, tx.State)
})

t.Run("returns error if failed to retrieve latest head in headtracker", func(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/chains/evm/txmgr/reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestReaper_ReapTxes(t *testing.T) {
cltest.AssertCount(t, db, "evm.txes", 1)
})

t.Run("deletes confirmed evm.txes marked as finalized that exceed the age threshold", func(t *testing.T) {
t.Run("deletes finalized evm.txes that exceed the age threshold", func(t *testing.T) {
tc := &reaperConfig{reaperThreshold: 1 * time.Hour}

r := newReaper(t, txStore, tc)
Expand All @@ -100,7 +100,7 @@ func TestReaper_ReapTxes(t *testing.T) {
// Didn't delete because eth_tx although old enough, was not marked as finalized
cltest.AssertCount(t, db, "evm.txes", 1)

pgtest.MustExec(t, db, `UPDATE evm.txes SET finalized=true`)
pgtest.MustExec(t, db, `UPDATE evm.txes SET state='finalized'`)

err = r.ReapTxes(42)
assert.NoError(t, err)
Expand Down
8 changes: 3 additions & 5 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
require.Equal(t, commontypes.Unconfirmed, state)
})

t.Run("returns unconfirmed for confirmed state not marked as finalized", func(t *testing.T) {
t.Run("returns unconfirmed for confirmed state", func(t *testing.T) {
idempotencyKey := uuid.New().String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
nonce := evmtypes.Nonce(0)
Expand All @@ -716,7 +716,6 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
State: txmgrcommon.TxConfirmed,
BroadcastAt: &broadcast,
InitialBroadcastAt: &broadcast,
Finalized: false, // Set to false by default in DB but here for explicitness
}
err := txStore.InsertTx(ctx, tx)
require.NoError(t, err)
Expand All @@ -732,7 +731,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
require.Equal(t, commontypes.Unconfirmed, state)
})

t.Run("returns finalized for confirmed state marked as finalized", func(t *testing.T) {
t.Run("returns finalized for finalized state", func(t *testing.T) {
idempotencyKey := uuid.New().String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
nonce := evmtypes.Nonce(0)
Expand All @@ -743,10 +742,9 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
FromAddress: fromAddress,
EncodedPayload: []byte{1, 2, 3},
FeeLimit: feeLimit,
State: txmgrcommon.TxConfirmed,
State: txmgrcommon.TxFinalized,
BroadcastAt: &broadcast,
InitialBroadcastAt: &broadcast,
Finalized: true,
}
err := txStore.InsertTx(ctx, tx)
require.NoError(t, err)
Expand Down
8 changes: 8 additions & 0 deletions core/store/migrate/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,11 @@ func BenchmarkBackfillingRecordsWithMigration202(b *testing.B) {
require.NoError(b, err)
}
}

func TestRollback_247_TxStateEnumUpdate(t *testing.T) {
_, db := heavyweight.FullTestDBV2(t, nil)
err := goose.DownTo(db.DB, migrationDir, 54)
require.NoError(t, err)
err = goose.UpTo(db.DB, migrationDir, 247)
require.NoError(t, err)
}
Loading

0 comments on commit 911d342

Please sign in to comment.