Skip to content

Commit

Permalink
Upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
dimriou committed Dec 13, 2024
1 parent 5b73b73 commit 08f4851
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 13 deletions.
27 changes: 27 additions & 0 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/url"
"slices"
"strconv"
"time"

"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/pelletier/go-toml/v2"
Expand Down Expand Up @@ -451,6 +452,20 @@ func (c *Chain) ValidateConfig() (err error) {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "GasEstimator.BumpThreshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if Transactions.AutoPurge.MinAttempts is set for %s", chainType)})
}
}
case chaintype.ChainDualBroadcast:
if c.Transactions.AutoPurge.DetectionApiUrl == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.DetectionApiUrl", Msg: fmt.Sprintf("must be set for %s", chainType)})
}
if c.Transactions.AutoPurge.Threshold == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.Threshold", Msg: fmt.Sprintf("needs to be set if auto-purge feature is enabled for %s", chainType)})
} else if *c.Transactions.AutoPurge.Threshold == 0 {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "Transactions.AutoPurge.Threshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if auto-purge feature is enabled for %s", chainType)})
}
if c.TxmV2.Enabled != nil && *c.TxmV2.Enabled {
if c.TxmV2.CustomURL == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.CustomURL", Msg: fmt.Sprintf("must be set for %s", chainType)})
}
}
default:
// Bump Threshold is required because the stuck tx heuristic relies on a minimum number of bump attempts to exist
if c.GasEstimator.BumpThreshold == nil {
Expand Down Expand Up @@ -494,6 +509,18 @@ func (t *TxmV2) setFrom(f *TxmV2) {
}
}

func (t *TxmV2) ValidateConfig() (err error) {
if t.Enabled != nil && *t.Enabled {
if t.BlockTime == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.BlockTime", Msg: "must be set if txmv2 feature is enabled"})
}
if t.BlockTime.Duration() < 2*time.Second {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "TxmV2.BlockTime", Msg: "must be equal to or greater than 2 seconds"})
}
}
return
}

type Transactions struct {
ForwardersEnabled *bool
MaxInFlight *uint32
Expand Down
5 changes: 5 additions & 0 deletions core/chains/evm/txm/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type OrchestratorTxStore interface {
}

type OrchestratorKeystore interface {
CheckEnabled(ctx context.Context, address common.Address, chainID *big.Int) error
EnabledAddressesForChain(ctx context.Context, chainID *big.Int) (addresses []common.Address, err error)
}

Expand Down Expand Up @@ -175,6 +176,10 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) CreateTransaction(ctx context.Context,
if wrappedTx != nil {
o.lggr.Infof("Found Tx with IdempotencyKey: %v. Returning existing Tx without creating a new one.", *wrappedTx.IdempotencyKey)
} else {
if kErr := o.keystore.CheckEnabled(ctx, request.FromAddress, o.chainID); kErr != nil {
return tx, fmt.Errorf("cannot send transaction from %s on chain ID %s: %w", request.FromAddress, o.chainID.String(), kErr)
}

var pipelineTaskRunID uuid.NullUUID
if request.PipelineTaskRunID != nil {
pipelineTaskRunID.UUID = *request.PipelineTaskRunID
Expand Down
30 changes: 20 additions & 10 deletions core/chains/evm/txm/stuck_tx_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,23 @@ type StuckTxDetectorConfig struct {
}

type stuckTxDetector struct {
lggr logger.Logger
chainType chaintype.ChainType
config StuckTxDetectorConfig
lggr logger.Logger
chainType chaintype.ChainType
config StuckTxDetectorConfig
lastPurgeMap map[common.Address]time.Time
}

func NewStuckTxDetector(lggr logger.Logger, chaintype chaintype.ChainType, config StuckTxDetectorConfig) *stuckTxDetector {
return &stuckTxDetector{
lggr: lggr,
chainType: chaintype,
config: config,
lggr: lggr,
chainType: chaintype,
config: config,
lastPurgeMap: make(map[common.Address]time.Time),
}
}

func (s *stuckTxDetector) DetectStuckTransaction(ctx context.Context, tx *types.Transaction) (bool, error) {
switch s.chainType {
// TODO: rename
case chaintype.ChainDualBroadcast:
result, err := s.dualBroadcastDetection(ctx, tx)
if result || err != nil {
Expand All @@ -50,11 +51,20 @@ func (s *stuckTxDetector) DetectStuckTransaction(ctx context.Context, tx *types.
}
}

// timeBasedDetection marks a transaction if all the following conditions are met:
// - LastBroadcastAt is not nil
// - Time since last broadcast is above the threshold
// - Time since last purge is above threshold
//
// NOTE: Potentially we can use a subset of threhsold for last purge check, because the transaction would have already been broadcasted to the mempool
// so it is more likely to be picked up compared to a transaction that hasn't been broadcasted before. This would avoid slowing down TXM for sebsequent transactions
// in case the current one is stuck.
func (s *stuckTxDetector) timeBasedDetection(tx *types.Transaction) bool {
threshold := (s.config.BlockTime * time.Duration(s.config.StuckTxBlockThreshold))
if tx.LastBroadcastAt != nil && time.Since(*tx.LastBroadcastAt) > threshold {
s.lggr.Debugf("TxID: %v last broadcast was: %v which is more than the max configured duration: %v. Transaction is now considered stuck and will be purged.",
tx.ID, tx.LastBroadcastAt, threshold)
if tx.LastBroadcastAt != nil && min(time.Since(*tx.LastBroadcastAt), time.Since(s.lastPurgeMap[tx.FromAddress])) > threshold {
s.lggr.Debugf("TxID: %v last broadcast was: %v and last purge: %v which is more than the max configured duration: %v. Transaction is now considered stuck and will be purged.",
tx.ID, tx.LastBroadcastAt, s.lastPurgeMap[tx.FromAddress], threshold)
s.lastPurgeMap[tx.FromAddress] = time.Now()
return true
}
return false
Expand Down
80 changes: 80 additions & 0 deletions core/chains/evm/txm/stuck_tx_detector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package txm

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txm/types"
)

func TestTimeBasedDetection(t *testing.T) {
t.Parallel()

t.Run("returns false if transaction is not stuck", func(t *testing.T) {
config := StuckTxDetectorConfig{
BlockTime: 10 * time.Second,
StuckTxBlockThreshold: 5,
}
fromAddress := testutils.NewAddress()
s := NewStuckTxDetector(logger.Test(t), "", config)

// No previous broadcast
tx := &types.Transaction{
ID: 1,
LastBroadcastAt: nil,
FromAddress: fromAddress,
}
assert.False(t, s.timeBasedDetection(tx))
// Not enough time has passed since last broadcast
now := time.Now()
tx.LastBroadcastAt = &now
assert.False(t, s.timeBasedDetection(tx))
// Not enough time has passed since last purge
tx.LastBroadcastAt = &time.Time{}
s.lastPurgeMap[fromAddress] = now
assert.False(t, s.timeBasedDetection(tx))
})

t.Run("returns true if transaction is stuck", func(t *testing.T) {
config := StuckTxDetectorConfig{
BlockTime: 10 * time.Second,
StuckTxBlockThreshold: 5,
}
fromAddress := testutils.NewAddress()
s := NewStuckTxDetector(logger.Test(t), "", config)

tx := &types.Transaction{
ID: 1,
LastBroadcastAt: &time.Time{},
FromAddress: fromAddress,
}
assert.True(t, s.timeBasedDetection(tx))
})

t.Run("marks first tx as stuck, updates purge time for address, and returns false for the second tx with the same broadcast time", func(t *testing.T) {
config := StuckTxDetectorConfig{
BlockTime: 1 * time.Second,
StuckTxBlockThreshold: 10,
}
fromAddress := testutils.NewAddress()
s := NewStuckTxDetector(logger.Test(t), "", config)

tx1 := &types.Transaction{
ID: 1,
LastBroadcastAt: &time.Time{},
FromAddress: fromAddress,
}
tx2 := &types.Transaction{
ID: 2,
LastBroadcastAt: &time.Time{},
FromAddress: fromAddress,
}
assert.True(t, s.timeBasedDetection(tx1))
assert.False(t, s.timeBasedDetection(tx2))
})
}
6 changes: 3 additions & 3 deletions core/chains/evm/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (t *Txm) Trigger(address common.Address) {
}

func (t *Txm) Abandon(address common.Address) error {
// TODO: restart txm
t.lggr.Infof("Dropping unstarted and unconfirmed transactions for address: %v", address)
return t.txStore.AbandonPendingTransactions(context.TODO(), address)
}
Expand Down Expand Up @@ -313,7 +314,7 @@ func (t *Txm) broadcastTransaction(ctx context.Context, address common.Address)
t.setNonce(address, nonce+1)

if err := t.createAndSendAttempt(ctx, tx, address); err != nil {
return true, err
return false, err
}
}
}
Expand Down Expand Up @@ -352,8 +353,7 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio
return err
}
if pendingNonce <= *tx.Nonce {
t.lggr.Debugf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d", tx.ID, pendingNonce, *tx.Nonce)
return nil
return fmt.Errorf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d. TxErr: %w", tx.ID, pendingNonce, *tx.Nonce, txErr)
}
}

Expand Down

0 comments on commit 08f4851

Please sign in to comment.