diff --git a/core/chains/evm/config/toml/config.go b/core/chains/evm/config/toml/config.go index 0f8b1eceee5..36c7d0f052b 100644 --- a/core/chains/evm/config/toml/config.go +++ b/core/chains/evm/config/toml/config.go @@ -6,6 +6,7 @@ import ( "net/url" "slices" "strconv" + "time" "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/pelletier/go-toml/v2" @@ -451,6 +452,20 @@ func (c *Chain) ValidateConfig() (err error) { err = multierr.Append(err, commonconfig.ErrInvalid{Name: "GasEstimator.BumpThreshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if Transactions.AutoPurge.MinAttempts is set for %s", chainType)}) } } + case chaintype.ChainDualBroadcast: + if c.Transactions.AutoPurge.DetectionApiUrl == nil { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.DetectionApiUrl", Msg: fmt.Sprintf("must be set for %s", chainType)}) + } + if c.Transactions.AutoPurge.Threshold == nil { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.Threshold", Msg: fmt.Sprintf("needs to be set if auto-purge feature is enabled for %s", chainType)}) + } else if *c.Transactions.AutoPurge.Threshold == 0 { + err = multierr.Append(err, commonconfig.ErrInvalid{Name: "Transactions.AutoPurge.Threshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if auto-purge feature is enabled for %s", chainType)}) + } + if c.TxmV2.Enabled != nil && *c.TxmV2.Enabled { + if c.TxmV2.CustomURL == nil { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.CustomURL", Msg: fmt.Sprintf("must be set for %s", chainType)}) + } + } default: // Bump Threshold is required because the stuck tx heuristic relies on a minimum number of bump attempts to exist if c.GasEstimator.BumpThreshold == nil { @@ -494,6 +509,18 @@ func (t *TxmV2) setFrom(f *TxmV2) { } } +func (t *TxmV2) ValidateConfig() (err error) { + if t.Enabled != nil && *t.Enabled { + if t.BlockTime == nil { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.BlockTime", Msg: "must be set if txmv2 feature is enabled"}) + } + if t.BlockTime.Duration() < 2*time.Second { + err = multierr.Append(err, commonconfig.ErrInvalid{Name: "TxmV2.BlockTime", Msg: "must be equal to or greater than 2 seconds"}) + } + } + return +} + type Transactions struct { ForwardersEnabled *bool MaxInFlight *uint32 diff --git a/core/chains/evm/txm/orchestrator.go b/core/chains/evm/txm/orchestrator.go index 914a14f981b..aa8c5e9e2c6 100644 --- a/core/chains/evm/txm/orchestrator.go +++ b/core/chains/evm/txm/orchestrator.go @@ -34,6 +34,7 @@ type OrchestratorTxStore interface { } type OrchestratorKeystore interface { + CheckEnabled(ctx context.Context, address common.Address, chainID *big.Int) error EnabledAddressesForChain(ctx context.Context, chainID *big.Int) (addresses []common.Address, err error) } @@ -175,6 +176,10 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) CreateTransaction(ctx context.Context, if wrappedTx != nil { o.lggr.Infof("Found Tx with IdempotencyKey: %v. Returning existing Tx without creating a new one.", *wrappedTx.IdempotencyKey) } else { + if kErr := o.keystore.CheckEnabled(ctx, request.FromAddress, o.chainID); kErr != nil { + return tx, fmt.Errorf("cannot send transaction from %s on chain ID %s: %w", request.FromAddress, o.chainID.String(), kErr) + } + var pipelineTaskRunID uuid.NullUUID if request.PipelineTaskRunID != nil { pipelineTaskRunID.UUID = *request.PipelineTaskRunID diff --git a/core/chains/evm/txm/stuck_tx_detector.go b/core/chains/evm/txm/stuck_tx_detector.go index 87b78d5cc04..33905ead80c 100644 --- a/core/chains/evm/txm/stuck_tx_detector.go +++ b/core/chains/evm/txm/stuck_tx_detector.go @@ -23,22 +23,23 @@ type StuckTxDetectorConfig struct { } type stuckTxDetector struct { - lggr logger.Logger - chainType chaintype.ChainType - config StuckTxDetectorConfig + lggr logger.Logger + chainType chaintype.ChainType + config StuckTxDetectorConfig + lastPurgeMap map[common.Address]time.Time } func NewStuckTxDetector(lggr logger.Logger, chaintype chaintype.ChainType, config StuckTxDetectorConfig) *stuckTxDetector { return &stuckTxDetector{ - lggr: lggr, - chainType: chaintype, - config: config, + lggr: lggr, + chainType: chaintype, + config: config, + lastPurgeMap: make(map[common.Address]time.Time), } } func (s *stuckTxDetector) DetectStuckTransaction(ctx context.Context, tx *types.Transaction) (bool, error) { switch s.chainType { - // TODO: rename case chaintype.ChainDualBroadcast: result, err := s.dualBroadcastDetection(ctx, tx) if result || err != nil { @@ -50,11 +51,20 @@ func (s *stuckTxDetector) DetectStuckTransaction(ctx context.Context, tx *types. } } +// timeBasedDetection marks a transaction if all the following conditions are met: +// - LastBroadcastAt is not nil +// - Time since last broadcast is above the threshold +// - Time since last purge is above threshold +// +// NOTE: Potentially we can use a subset of threhsold for last purge check, because the transaction would have already been broadcasted to the mempool +// so it is more likely to be picked up compared to a transaction that hasn't been broadcasted before. This would avoid slowing down TXM for sebsequent transactions +// in case the current one is stuck. func (s *stuckTxDetector) timeBasedDetection(tx *types.Transaction) bool { threshold := (s.config.BlockTime * time.Duration(s.config.StuckTxBlockThreshold)) - if tx.LastBroadcastAt != nil && time.Since(*tx.LastBroadcastAt) > threshold { - s.lggr.Debugf("TxID: %v last broadcast was: %v which is more than the max configured duration: %v. Transaction is now considered stuck and will be purged.", - tx.ID, tx.LastBroadcastAt, threshold) + if tx.LastBroadcastAt != nil && min(time.Since(*tx.LastBroadcastAt), time.Since(s.lastPurgeMap[tx.FromAddress])) > threshold { + s.lggr.Debugf("TxID: %v last broadcast was: %v and last purge: %v which is more than the max configured duration: %v. Transaction is now considered stuck and will be purged.", + tx.ID, tx.LastBroadcastAt, s.lastPurgeMap[tx.FromAddress], threshold) + s.lastPurgeMap[tx.FromAddress] = time.Now() return true } return false diff --git a/core/chains/evm/txm/stuck_tx_detector_test.go b/core/chains/evm/txm/stuck_tx_detector_test.go new file mode 100644 index 00000000000..af5a765dcdb --- /dev/null +++ b/core/chains/evm/txm/stuck_tx_detector_test.go @@ -0,0 +1,80 @@ +package txm + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txm/types" +) + +func TestTimeBasedDetection(t *testing.T) { + t.Parallel() + + t.Run("returns false if transaction is not stuck", func(t *testing.T) { + config := StuckTxDetectorConfig{ + BlockTime: 10 * time.Second, + StuckTxBlockThreshold: 5, + } + fromAddress := testutils.NewAddress() + s := NewStuckTxDetector(logger.Test(t), "", config) + + // No previous broadcast + tx := &types.Transaction{ + ID: 1, + LastBroadcastAt: nil, + FromAddress: fromAddress, + } + assert.False(t, s.timeBasedDetection(tx)) + // Not enough time has passed since last broadcast + now := time.Now() + tx.LastBroadcastAt = &now + assert.False(t, s.timeBasedDetection(tx)) + // Not enough time has passed since last purge + tx.LastBroadcastAt = &time.Time{} + s.lastPurgeMap[fromAddress] = now + assert.False(t, s.timeBasedDetection(tx)) + }) + + t.Run("returns true if transaction is stuck", func(t *testing.T) { + config := StuckTxDetectorConfig{ + BlockTime: 10 * time.Second, + StuckTxBlockThreshold: 5, + } + fromAddress := testutils.NewAddress() + s := NewStuckTxDetector(logger.Test(t), "", config) + + tx := &types.Transaction{ + ID: 1, + LastBroadcastAt: &time.Time{}, + FromAddress: fromAddress, + } + assert.True(t, s.timeBasedDetection(tx)) + }) + + t.Run("marks first tx as stuck, updates purge time for address, and returns false for the second tx with the same broadcast time", func(t *testing.T) { + config := StuckTxDetectorConfig{ + BlockTime: 1 * time.Second, + StuckTxBlockThreshold: 10, + } + fromAddress := testutils.NewAddress() + s := NewStuckTxDetector(logger.Test(t), "", config) + + tx1 := &types.Transaction{ + ID: 1, + LastBroadcastAt: &time.Time{}, + FromAddress: fromAddress, + } + tx2 := &types.Transaction{ + ID: 2, + LastBroadcastAt: &time.Time{}, + FromAddress: fromAddress, + } + assert.True(t, s.timeBasedDetection(tx1)) + assert.False(t, s.timeBasedDetection(tx2)) + }) +} diff --git a/core/chains/evm/txm/txm.go b/core/chains/evm/txm/txm.go index 054e31aeb79..bf53e00e81a 100644 --- a/core/chains/evm/txm/txm.go +++ b/core/chains/evm/txm/txm.go @@ -187,6 +187,7 @@ func (t *Txm) Trigger(address common.Address) { } func (t *Txm) Abandon(address common.Address) error { + // TODO: restart txm t.lggr.Infof("Dropping unstarted and unconfirmed transactions for address: %v", address) return t.txStore.AbandonPendingTransactions(context.TODO(), address) } @@ -313,7 +314,7 @@ func (t *Txm) broadcastTransaction(ctx context.Context, address common.Address) t.setNonce(address, nonce+1) if err := t.createAndSendAttempt(ctx, tx, address); err != nil { - return true, err + return false, err } } } @@ -352,8 +353,7 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio return err } if pendingNonce <= *tx.Nonce { - t.lggr.Debugf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d", tx.ID, pendingNonce, *tx.Nonce) - return nil + return fmt.Errorf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d. TxErr: %w", tx.ID, pendingNonce, *tx.Nonce, txErr) } }