Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BCI-3573] - Remove dependence on FinalityDepth in EVM TXM code #13755

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7539c94
Added a finalizer component that assesses confirmed transactions for …
amit-momin Jun 12, 2024
d91d2f2
Moved Finalizer component into EVM code and addressed feedback
amit-momin Jun 24, 2024
c5d1b24
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jun 24, 2024
8c881e0
Fixed linting and renumbered sql migration
amit-momin Jun 24, 2024
36d8c70
Added limit to Finalizer RPC batch calls
amit-momin Jun 25, 2024
43eb5a5
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jun 26, 2024
ff82c1f
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jun 26, 2024
d9daeb2
Cleaned up unneeded code
amit-momin Jun 26, 2024
162a322
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jun 28, 2024
7f145bc
Renumbered sql migration
amit-momin Jun 28, 2024
0ca0ccb
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jun 28, 2024
be914fc
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jul 1, 2024
c90a000
Updated Finalizer to use LatestAndFinalizedBlock method from HeadTracker
amit-momin Jul 1, 2024
9f56b4e
Fixed health check tests and fixed linting
amit-momin Jul 1, 2024
aa8f0dc
Fixed lint error
amit-momin Jul 1, 2024
745a0e0
Fixed lint error
amit-momin Jul 1, 2024
ec6f51d
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jul 1, 2024
0ea4733
removing finality within common/txmgr
Farber98 Jul 3, 2024
fcac24a
remove finality from common/txmgr comment
Farber98 Jul 3, 2024
732bfd7
latestFinalizedBlockNum support within evm/txmgr txstore
Farber98 Jul 3, 2024
3a97140
refactor tests after evm/txmgr txstore changes
Farber98 Jul 3, 2024
0ccedfb
mocks for both common and core/evm
Farber98 Jul 3, 2024
96d630f
Merge branch 'develop' into BCI-3572-utilize-finality-tags-in-evm-txm
Farber98 Jul 3, 2024
f27399a
remove comments that are still referencing to finalityDepth within ev…
Farber98 Jul 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/itchy-bugs-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Added a finalizer component to the TXM to mark transactions as finalized #internal
19 changes: 10 additions & 9 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro
return fmt.Errorf("CheckConfirmedMissingReceipt failed: %w", err)
}

if err := ec.CheckForReceipts(ctx, head.BlockNumber()); err != nil {
if err := ec.CheckForReceipts(ctx, head.BlockNumber(), head.LatestFinalizedHead().BlockNumber()); err != nil {
return fmt.Errorf("CheckForReceipts failed: %w", err)
}

Expand Down Expand Up @@ -395,8 +395,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che
return
}

// CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckForReceipts(ctx context.Context, blockNum int64) error {
// CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number.
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckForReceipts(ctx context.Context, blockNum int64, latestFinalizedBlockNum int64) error {
attempts, err := ec.txStore.FindTxAttemptsRequiringReceiptFetch(ctx, ec.chainID)
if err != nil {
return fmt.Errorf("FindTxAttemptsRequiringReceiptFetch failed: %w", err)
Expand Down Expand Up @@ -443,7 +443,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che
return fmt.Errorf("unable to mark txes as 'confirmed_missing_receipt': %w", err)
}

if err := ec.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, ec.chainConfig.FinalityDepth(), ec.chainID); err != nil {
if err := ec.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, latestFinalizedBlockNum, ec.chainID); err != nil {
return fmt.Errorf("unable to confirm buried unconfirmed txes': %w", err)
}
return nil
Expand Down Expand Up @@ -1000,22 +1000,22 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
}
}

// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes up to the depth
// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes up to the LatestFinalizedHead
// of the given chain and ensures that every one has a receipt with a block hash that is
// in the given chain.
//
// If any of the confirmed transactions does not have a receipt in the chain, it has been
// re-org'd out and will be rebroadcast.
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head types.Head[BLOCK_HASH]) error {
if head.ChainLength() < ec.chainConfig.FinalityDepth() {
if head.ChainLength() < uint32(head.LatestFinalizedHead().BlockNumber()) {
logArgs := []interface{}{
"chainLength", head.ChainLength(), "finalityDepth", ec.chainConfig.FinalityDepth(),
"chainLength", head.ChainLength(), "latestFinalizedHead", head.LatestFinalizedHead().BlockNumber(),
}
if ec.nConsecutiveBlocksChainTooShort > logAfterNConsecutiveBlocksChainTooShort {
warnMsg := "Chain length supplied for re-org detection was shorter than FinalityDepth. Re-org protection is not working properly. This could indicate a problem with the remote RPC endpoint, a compatibility issue with a particular blockchain, a bug with this particular blockchain, heads table being truncated too early, remote node out of sync, or something else. If this happens a lot please raise a bug with the Chainlink team including a log output sample and details of the chain and RPC endpoint you are using."
warnMsg := "Chain length supplied for re-org detection was shorter than LatestFinalizedHead. Re-org protection is not working properly. This could indicate a problem with the remote RPC endpoint, a compatibility issue with a particular blockchain, a bug with this particular blockchain, heads table being truncated too early, remote node out of sync, or something else. If this happens a lot please raise a bug with the Chainlink team including a log output sample and details of the chain and RPC endpoint you are using."
ec.lggr.Warnw(warnMsg, append(logArgs, "nConsecutiveBlocksChainTooShort", ec.nConsecutiveBlocksChainTooShort)...)
} else {
logMsg := "Chain length supplied for re-org detection was shorter than FinalityDepth"
logMsg := "Chain length supplied for re-org detection was shorter than LatestFinalizedHead"
ec.lggr.Debugw(logMsg, append(logArgs, "nConsecutiveBlocksChainTooShort", ec.nConsecutiveBlocksChainTooShort)...)
}
ec.nConsecutiveBlocksChainTooShort++
Expand Down Expand Up @@ -1100,6 +1100,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar
"txID", etx.ID,
"attemptID", attempt.ID,
"nReceipts", len(attempt.Receipts),
"finalized", etx.Finalized,
"id", "confirmer",
}

Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions common/txmgr/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
// Reaper handles periodic database cleanup for Txm
type Reaper[CHAIN_ID types.ID] struct {
store txmgrtypes.TxHistoryReaper[CHAIN_ID]
config txmgrtypes.ReaperChainConfig
txConfig txmgrtypes.ReaperTransactionsConfig
chainID CHAIN_ID
log logger.Logger
Expand All @@ -27,10 +26,9 @@ type Reaper[CHAIN_ID types.ID] struct {
}

// NewReaper instantiates a new reaper object
func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], config txmgrtypes.ReaperChainConfig, txConfig txmgrtypes.ReaperTransactionsConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID] {
func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], txConfig txmgrtypes.ReaperTransactionsConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID] {
r := &Reaper[CHAIN_ID]{
store,
config,
txConfig,
chainID,
logger.Named(lggr, "Reaper"),
Expand Down Expand Up @@ -106,13 +104,12 @@ func (r *Reaper[CHAIN_ID]) ReapTxes(headNum int64) error {
r.log.Debug("Transactions.ReaperThreshold set to 0; skipping ReapTxes")
return nil
}
minBlockNumberToKeep := headNum - int64(r.config.FinalityDepth())
mark := time.Now()
timeThreshold := mark.Add(-threshold)

r.log.Debugw(fmt.Sprintf("reaping old txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold, "minBlockNumberToKeep", minBlockNumberToKeep)
r.log.Debugw(fmt.Sprintf("reaping old txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold)

if err := r.store.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, r.chainID); err != nil {
if err := r.store.ReapTxHistory(ctx, timeThreshold, r.chainID); err != nil {
return err
}

Expand Down
20 changes: 18 additions & 2 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type Txm[
broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
newErrorClassifier NewErrorClassifier
Expand Down Expand Up @@ -145,6 +146,7 @@ func NewTxm[
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD],
newErrorClassifierFunc NewErrorClassifier,
) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
Expand All @@ -167,13 +169,14 @@ func NewTxm[
resender: resender,
tracker: tracker,
newErrorClassifier: newErrorClassifierFunc,
finalizer: finalizer,
}

if txCfg.ResendAfterThreshold() <= 0 {
b.logger.Info("Resender: Disabled")
}
if txCfg.ReaperThreshold() > 0 && txCfg.ReaperInterval() > 0 {
b.reaper = NewReaper[CHAIN_ID](lggr, b.txStore, cfg, txCfg, chainId)
b.reaper = NewReaper[CHAIN_ID](lggr, b.txStore, txCfg, chainId)
} else {
b.logger.Info("TxReaper: Disabled")
}
Expand Down Expand Up @@ -201,6 +204,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
return fmt.Errorf("Txm: Tracker failed to start: %w", err)
}

if err := ms.Start(ctx, b.finalizer); err != nil {
return fmt.Errorf("Txm: Finalizer failed to start: %w", err)
}

b.logger.Info("Txm starting runLoop")
b.wg.Add(1)
go b.runLoop()
Expand Down Expand Up @@ -295,6 +302,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HealthRepo
services.CopyHealth(report, b.broadcaster.HealthReport())
services.CopyHealth(report, b.confirmer.HealthReport())
services.CopyHealth(report, b.txAttemptBuilder.HealthReport())
services.CopyHealth(report, b.finalizer.HealthReport())
})

if b.txConfig.ForwardersEnabled() {
Expand Down Expand Up @@ -417,6 +425,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
case head := <-b.chHeads:
b.confirmer.mb.Deliver(head)
b.tracker.mb.Deliver(head.BlockNumber())
b.finalizer.DeliverLatestHead(head)
case reset := <-b.reset:
// This check prevents the weird edge-case where you can select
// into this block after chStop has already been closed and the
Expand Down Expand Up @@ -448,6 +457,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err)
}
err = b.finalizer.Close()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
return
case <-keysChanged:
// This check prevents the weird edge-case where you can select
Expand Down Expand Up @@ -646,7 +659,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransac
// Return unconfirmed for ConfirmedMissingReceipt since a receipt is required to determine if it is finalized
return commontypes.Unconfirmed, nil
case TxConfirmed:
// TODO: Check for finality and return finalized status
if tx.Finalized {
// Return finalized if tx receipt's block is equal or older than the latest finalized block
return commontypes.Finalized, nil
}
// Return unconfirmed if tx receipt's block is newer than the latest finalized block
return commontypes.Unconfirmed, nil
case TxFatalError:
Expand Down
8 changes: 0 additions & 8 deletions common/txmgr/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import "time"
type TransactionManagerChainConfig interface {
BroadcasterChainConfig
ConfirmerChainConfig
ReaperChainConfig
}

type TransactionManagerFeeConfig interface {
Expand Down Expand Up @@ -74,13 +73,6 @@ type ResenderTransactionsConfig interface {
MaxInFlight() uint32
}

// ReaperConfig is the config subset used by the reaper
//
//go:generate mockery --quiet --name ReaperChainConfig --structname ReaperConfig --output ./mocks/ --case=underscore
type ReaperChainConfig interface {
FinalityDepth() uint32
}

type ReaperTransactionsConfig interface {
ReaperInterval() time.Duration
ReaperThreshold() time.Duration
Expand Down
12 changes: 12 additions & 0 deletions common/txmgr/types/finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package types

import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

type Finalizer[BLOCK_HASH types.Hashable, HEAD types.Head[BLOCK_HASH]] interface {
// interfaces for running the underlying estimator
services.Service
DeliverLatestHead(head HEAD) bool
}
42 changes: 0 additions & 42 deletions common/txmgr/types/mocks/reaper_chain_config.go

This file was deleted.

86 changes: 53 additions & 33 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading