Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements #15695

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/url"
"slices"
"strconv"
"time"

"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/pelletier/go-toml/v2"
Expand Down Expand Up @@ -451,6 +452,20 @@ func (c *Chain) ValidateConfig() (err error) {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "GasEstimator.BumpThreshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if Transactions.AutoPurge.MinAttempts is set for %s", chainType)})
}
}
case chaintype.ChainDualBroadcast:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please update the PR description? Right now it just says "improvements", whereas, at least it takes care of the ChainDualBroadcast chain type.

Please, also specify what is the old vs new behavior after this PR is merged.

if c.Transactions.AutoPurge.DetectionApiUrl == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.DetectionApiUrl", Msg: fmt.Sprintf("must be set for %s", chainType)})
}
if c.Transactions.AutoPurge.Threshold == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.Threshold", Msg: fmt.Sprintf("needs to be set if auto-purge feature is enabled for %s", chainType)})
} else if *c.Transactions.AutoPurge.Threshold == 0 {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "Transactions.AutoPurge.Threshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if auto-purge feature is enabled for %s", chainType)})
}
if c.TxmV2.Enabled != nil && *c.TxmV2.Enabled {
if c.TxmV2.CustomURL == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.CustomURL", Msg: fmt.Sprintf("must be set for %s", chainType)})
}
}
default:
// Bump Threshold is required because the stuck tx heuristic relies on a minimum number of bump attempts to exist
if c.GasEstimator.BumpThreshold == nil {
Expand Down Expand Up @@ -494,6 +509,19 @@ func (t *TxmV2) setFrom(f *TxmV2) {
}
}

func (t *TxmV2) ValidateConfig() (err error) {
if t.Enabled != nil && *t.Enabled {
if t.BlockTime == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.BlockTime", Msg: "must be set if txmv2 feature is enabled"})
return
}
if t.BlockTime.Duration() < 2*time.Second {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "TxmV2.BlockTime", Msg: "must be equal to or greater than 2 seconds"})
}
}
return
}

type Transactions struct {
ForwardersEnabled *bool
MaxInFlight *uint32
Expand Down
15 changes: 8 additions & 7 deletions core/chains/evm/txm/attempt_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ type AttemptBuilderKeystore interface {
}

type attemptBuilder struct {
chainID *big.Int
priceMax *assets.Wei
gas.EvmFeeEstimator
keystore AttemptBuilderKeystore
chainID *big.Int
priceMaxKey func(common.Address) *assets.Wei
keystore AttemptBuilderKeystore
}

func NewAttemptBuilder(chainID *big.Int, priceMax *assets.Wei, estimator gas.EvmFeeEstimator, keystore AttemptBuilderKeystore) *attemptBuilder {
func NewAttemptBuilder(chainID *big.Int, priceMaxKey func(common.Address) *assets.Wei, estimator gas.EvmFeeEstimator, keystore AttemptBuilderKeystore) *attemptBuilder {
return &attemptBuilder{
chainID: chainID,
priceMax: priceMax,
priceMaxKey: priceMaxKey,
EvmFeeEstimator: estimator,
keystore: keystore,
}
}

func (a *attemptBuilder) NewAttempt(ctx context.Context, lggr logger.Logger, tx *types.Transaction, dynamic bool) (*types.Attempt, error) {
fee, estimatedGasLimit, err := a.EvmFeeEstimator.GetFee(ctx, tx.Data, tx.SpecifiedGasLimit, a.priceMax, &tx.FromAddress, &tx.ToAddress)
fee, estimatedGasLimit, err := a.EvmFeeEstimator.GetFee(ctx, tx.Data, tx.SpecifiedGasLimit, a.priceMaxKey(tx.FromAddress), &tx.FromAddress, &tx.ToAddress)
if err != nil {
return nil, err
}
Expand All @@ -48,7 +48,7 @@ func (a *attemptBuilder) NewAttempt(ctx context.Context, lggr logger.Logger, tx
}

func (a *attemptBuilder) NewBumpAttempt(ctx context.Context, lggr logger.Logger, tx *types.Transaction, previousAttempt types.Attempt) (*types.Attempt, error) {
bumpedFee, bumpedFeeLimit, err := a.EvmFeeEstimator.BumpFee(ctx, previousAttempt.Fee, tx.SpecifiedGasLimit, a.priceMax, nil)
bumpedFee, bumpedFeeLimit, err := a.EvmFeeEstimator.BumpFee(ctx, previousAttempt.Fee, tx.SpecifiedGasLimit, a.priceMaxKey(tx.FromAddress), nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,6 +114,7 @@ func (a *attemptBuilder) newLegacyAttempt(ctx context.Context, tx *types.Transac
Fee: gas.EvmFee{GasPrice: gasPrice},
Hash: signedTx.Hash(),
GasLimit: estimatedGasLimit,
Type: evmtypes.LegacyTxType,
SignedTransaction: signedTx,
}

Expand Down
93 changes: 93 additions & 0 deletions core/chains/evm/txm/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package txm

import (
"context"
"fmt"
"math/big"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/metrics"
)

var (
promNumBroadcastedTxs = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "txm_num_broadcasted_transactions",
Help: "Total number of successful broadcasted transactions.",
}, []string{"chainID"})
promNumConfirmedTxs = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "txm_num_confirmed_transactions",
Help: "Total number of confirmed transactions. Note that this can happen multiple times per transaction in the case of re-orgs or when filling the nonce for untracked transactions.",
}, []string{"chainID"})
promNumNonceGaps = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "txm_num_nonce_gaps",
Help: "Total number of nonce gaps created that the transaction manager had to fill.",
}, []string{"chainID"})
promTimeUntilTxConfirmed = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "txm_time_until_tx_confirmed",
Help: "The amount of time elapsed from a transaction being broadcast to being included in a block.",
}, []string{"chainID"})
)

type txmMetrics struct {
metrics.Labeler
chainID *big.Int
numBroadcastedTxs metric.Int64Counter
numConfirmedTxs metric.Int64Counter
numNonceGaps metric.Int64Counter
timeUntilTxConfirmed metric.Float64Histogram
}

func NewTxmMetrics(chainID *big.Int) (*txmMetrics, error) {
numBroadcastedTxs, err := beholder.GetMeter().Int64Counter("txm_num_broadcasted_transactions")
if err != nil {
return nil, fmt.Errorf("failed to register broadcasted txs number: %w", err)
}

numConfirmedTxs, err := beholder.GetMeter().Int64Counter("txm_num_confirmed_transactions")
if err != nil {
return nil, fmt.Errorf("failed to register confirmed txs number: %w", err)
}

numNonceGaps, err := beholder.GetMeter().Int64Counter("txm_num_nonce_gaps")
if err != nil {
return nil, fmt.Errorf("failed to register nonce gaps number: %w", err)
}

timeUntilTxConfirmed, err := beholder.GetMeter().Float64Histogram("txm_time_until_tx_confirmed")
if err != nil {
return nil, fmt.Errorf("failed to register time until tx confirmed: %w", err)
}

return &txmMetrics{
chainID: chainID,
Labeler: metrics.NewLabeler().With("chainID", chainID.String()),
numBroadcastedTxs: numBroadcastedTxs,
numConfirmedTxs: numConfirmedTxs,
numNonceGaps: numNonceGaps,
timeUntilTxConfirmed: timeUntilTxConfirmed,
}, nil
}

func (m *txmMetrics) IncrementNumBroadcastedTxs(ctx context.Context) {
promNumBroadcastedTxs.WithLabelValues(m.chainID.String()).Add(float64(1))
m.numBroadcastedTxs.Add(ctx, 1)
}

func (m *txmMetrics) IncrementNumConfirmedTxs(ctx context.Context, confirmedTransactions int) {
promNumConfirmedTxs.WithLabelValues(m.chainID.String()).Add(float64(confirmedTransactions))
m.numConfirmedTxs.Add(ctx, int64(confirmedTransactions))
}

func (m *txmMetrics) IncrementNumNonceGaps(ctx context.Context) {
promNumNonceGaps.WithLabelValues(m.chainID.String()).Add(float64(1))
m.numNonceGaps.Add(ctx, 1)
}

func (m *txmMetrics) RecordTimeUntilTxConfirmed(ctx context.Context, duration float64) {
promTimeUntilTxConfirmed.WithLabelValues(m.chainID.String()).Observe(duration)
m.timeUntilTxConfirmed.Record(ctx, duration)
}
23 changes: 15 additions & 8 deletions core/chains/evm/txm/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import (
type OrchestratorTxStore interface {
Add(addresses ...common.Address) error
FetchUnconfirmedTransactionAtNonceWithCount(context.Context, uint64, common.Address) (*txmtypes.Transaction, int, error)
FindTxWithIdempotencyKey(context.Context, *string) (*txmtypes.Transaction, error)
FindTxWithIdempotencyKey(context.Context, string) (*txmtypes.Transaction, error)
}

type OrchestratorKeystore interface {
CheckEnabled(ctx context.Context, address common.Address, chainID *big.Int) error
EnabledAddressesForChain(ctx context.Context, chainID *big.Int) (addresses []common.Address, err error)
}

Expand Down Expand Up @@ -120,15 +121,15 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) Close() (merr error) {
merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop ForwarderManager: %w", err))
}
}
if err := o.txm.Close(); err != nil {
merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop Txm: %w", err))
}
if err := o.attemptBuilder.Close(); err != nil {
// TODO: hacky fix for DualBroadcast
if !strings.Contains(err.Error(), "already been stopped") {
merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop AttemptBuilder: %w", err))
}
}
if err := o.txm.Close(); err != nil {
merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop Txm: %w", err))
}
return merr
})
}
Expand Down Expand Up @@ -172,14 +173,20 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) OnNewLongestChain(ctx context.Context,

func (o *Orchestrator[BLOCK_HASH, HEAD]) CreateTransaction(ctx context.Context, request txmgrtypes.TxRequest[common.Address, common.Hash]) (tx txmgrtypes.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], err error) {
var wrappedTx *txmtypes.Transaction
wrappedTx, err = o.txStore.FindTxWithIdempotencyKey(ctx, request.IdempotencyKey)
if err != nil {
return
if request.IdempotencyKey != nil {
wrappedTx, err = o.txStore.FindTxWithIdempotencyKey(ctx, *request.IdempotencyKey)
if err != nil {
return
}
}

if wrappedTx != nil {
o.lggr.Infof("Found Tx with IdempotencyKey: %v. Returning existing Tx without creating a new one.", *wrappedTx.IdempotencyKey)
} else {
if kErr := o.keystore.CheckEnabled(ctx, request.FromAddress, o.chainID); kErr != nil {
return tx, fmt.Errorf("cannot send transaction from %s on chain ID %s: %w", request.FromAddress, o.chainID.String(), kErr)
}

var pipelineTaskRunID uuid.NullUUID
if request.PipelineTaskRunID != nil {
pipelineTaskRunID.UUID = *request.PipelineTaskRunID
Expand Down Expand Up @@ -324,7 +331,7 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) GetForwarderForEOAOCR2Feeds(ctx context

func (o *Orchestrator[BLOCK_HASH, HEAD]) GetTransactionStatus(ctx context.Context, transactionID string) (status commontypes.TransactionStatus, err error) {
// Loads attempts and receipts in the transaction
tx, err := o.txStore.FindTxWithIdempotencyKey(ctx, &transactionID)
tx, err := o.txStore.FindTxWithIdempotencyKey(ctx, transactionID)
if err != nil || tx == nil {
return status, fmt.Errorf("failed to find transaction with IdempotencyKey %s: %w", transactionID, err)
}
Expand Down
Loading
Loading