diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index d68b5091011..00522abf229 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -775,12 +775,17 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save // Now we have an errored pipeline even though the tx succeeded. This case // is relatively benign and probably nobody will ever run into it in // practice, but something to be aware of. - if etx.PipelineTaskRunID.Valid && eb.resumeCallback != nil { + if etx.PipelineTaskRunID.Valid && eb.resumeCallback != nil && etx.SignalCallback { err := eb.resumeCallback(etx.PipelineTaskRunID.UUID, nil, errors.Errorf("fatal error while sending transaction: %s", etx.Error.String)) if errors.Is(err, sql.ErrNoRows) { lgr.Debugw("callback missing or already resumed", "etxID", etx.ID) } else if err != nil { return errors.Wrap(err, "failed to resume pipeline") + } else { + // Mark tx as having completed callback + if err := eb.txStore.UpdateTxCallbackCompleted(ctx, etx.PipelineTaskRunID.UUID, eb.chainID); err != nil { + return err + } } } return eb.txStore.UpdateTxFatalError(ctx, etx) diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index 1d7446d9d2d..afb2b3003a1 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -1083,7 +1083,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen // ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error { - receiptsPlus, err := ec.txStore.FindReceiptsPendingConfirmation(ctx, head.BlockNumber(), ec.chainID) + receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID) if err != nil { return err @@ -1105,6 +1105,10 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Res ec.lggr.Debugw("Callback: resuming tx with receipt", "output", output, "taskErr", taskErr, "pipelineTaskRunID", data.ID) if err := ec.resumeCallback(data.ID, output, taskErr); err != nil { + return fmt.Errorf("failed to resume suspended pipeline run: %w", err) + } + // Mark tx as having completed callback + if err := ec.txStore.UpdateTxCallbackCompleted(ctx, data.ID, ec.chainID); err != nil { return err } } diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 7da51de606b..0e344b9b6f9 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -180,32 +180,6 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUns return r0 } -// FindReceiptsPendingConfirmation provides a mock function with given fields: ctx, blockNum, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindReceiptsPendingConfirmation(ctx context.Context, blockNum int64, chainID CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error) { - ret := _m.Called(ctx, blockNum, chainID) - - var r0 []txmgrtypes.ReceiptPlus[R] - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)); ok { - return rf(ctx, blockNum, chainID) - } - if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) []txmgrtypes.ReceiptPlus[R]); ok { - r0 = rf(ctx, blockNum, chainID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]txmgrtypes.ReceiptPlus[R]) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, int64, CHAIN_ID) error); ok { - r1 = rf(ctx, blockNum, chainID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // FindTransactionsConfirmedInBlockRange provides a mock function with given fields: ctx, highBlockNumber, lowBlockNumber, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber int64, lowBlockNumber int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, highBlockNumber, lowBlockNumber, chainID) @@ -388,6 +362,32 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesByM return r0, r1 } +// FindTxesPendingCallback provides a mock function with given fields: ctx, blockNum, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error) { + ret := _m.Called(ctx, blockNum, chainID) + + var r0 []txmgrtypes.ReceiptPlus[R] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)); ok { + return rf(ctx, blockNum, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) []txmgrtypes.ReceiptPlus[R]); ok { + r0 = rf(ctx, blockNum, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]txmgrtypes.ReceiptPlus[R]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, CHAIN_ID) error); ok { + r1 = rf(ctx, blockNum, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindTxesWithAttemptsAndReceiptsByIdsAndState provides a mock function with given fields: ctx, ids, states, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, ids, states, chainID) @@ -814,6 +814,20 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxAtt return r0 } +// UpdateTxCallbackCompleted provides a mock function with given fields: ctx, pipelineTaskRunRid, chainId +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error { + ret := _m.Called(ctx, pipelineTaskRunRid, chainId) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, CHAIN_ID) error); ok { + r0 = rf(ctx, pipelineTaskRunRid, chainId) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // UpdateTxFatalError provides a mock function with given fields: ctx, etx func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxFatalError(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { ret := _m.Called(ctx, etx) diff --git a/common/txmgr/types/tx.go b/common/txmgr/types/tx.go index d95f07afabc..11017bd0325 100644 --- a/common/txmgr/types/tx.go +++ b/common/txmgr/types/tx.go @@ -91,6 +91,9 @@ type TxRequest[ADDR types.Hashable, TX_HASH types.Hashable] struct { // Checker defines the check that should be run before a transaction is submitted on chain. Checker TransmitCheckerSpec[ADDR] + + // Mark tx requiring callback + SignalCallback bool } // TransmitCheckerSpec defines the check that should be performed before a transaction is submitted @@ -217,6 +220,11 @@ type Tx[ // TransmitChecker defines the check that should be performed before a transaction is submitted on // chain. TransmitChecker *datatypes.JSON + + // Marks tx requiring callback + SignalCallback bool + // Marks tx callback as signaled + CallbackCompleted bool } func (e *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetError() error { diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 83cb4b85ee6..c2dfeee4146 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -35,8 +35,10 @@ type TxStore[ TxHistoryReaper[CHAIN_ID] TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE] - // methods for saving & retreiving receipts - FindReceiptsPendingConfirmation(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) + // Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled + FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) + // Update tx to mark that its callback has been signaled + UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error SaveFetchedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) (err error) // additional methods for tx store management @@ -93,6 +95,8 @@ type TransactionStore[ SetBroadcastBeforeBlockNum(ctx context.Context, blockNum int64, chainID CHAIN_ID) error UpdateBroadcastAts(ctx context.Context, now time.Time, etxIDs []int64) error UpdateTxAttemptInProgressToBroadcast(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], NewAttemptState TxAttemptState) error + // Update tx to mark that its callback has been signaled + UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error UpdateTxsUnconfirmed(ctx context.Context, ids []int64) error UpdateTxUnstartedToInProgress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error UpdateTxFatalError(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index fcbc7a1f4c2..460f9629fb8 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1055,6 +1055,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { FeeLimit: gasLimit, State: txmgrcommon.TxUnstarted, PipelineTaskRunID: uuid.NullUUID{UUID: tr.ID, Valid: true}, + SignalCallback: true, } t.Run("with erroring callback bails out", func(t *testing.T) { diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index 1385250a206..3a0d33f7ba0 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -2933,11 +2933,12 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 1, 1, fromAddress) cltest.MustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2 WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) + // Setting both signal_callback and callback_completed to TRUE to simulate a completed pipeline task + // It would only be in a state past suspended if the resume callback was called and callback_completed was set to TRUE + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) err := ec.ResumePendingTaskRuns(testutils.Context(t), &head) require.NoError(t, err) - }) t.Run("doesn't process task runs where the receipt is younger than minConfirmations", func(t *testing.T) { @@ -2952,15 +2953,15 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 2, 1, fromAddress) cltest.MustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2 WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) err := ec.ResumePendingTaskRuns(testutils.Context(t), &head) require.NoError(t, err) - }) t.Run("processes eth_txes with receipts older than minConfirmations", func(t *testing.T) { ch := make(chan interface{}) + nonce := evmtypes.Nonce(3) var err error ec := cltest.NewEthConfirmer(t, txStore, ethClient, evmcfg, ethKeyStore, func(id uuid.UUID, value interface{}, thisErr error) error { err = thisErr @@ -2972,15 +2973,19 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID) pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run.ID) - etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress) + etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress) pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`) receipt := cltest.MustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2 WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) go func() { err2 := ec.ResumePendingTaskRuns(testutils.Context(t), &head) require.NoError(t, err2) + // Retrieve Tx to check if callback completed flag was set to true + updateTx, err3 := txStore.FindTxWithSequence(testutils.Context(t), fromAddress, nonce) + require.NoError(t, err3) + require.Equal(t, true, updateTx.CallbackCompleted) }() select { @@ -3000,6 +3005,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { t.Run("processes eth_txes with receipt older than minConfirmations that reverted", func(t *testing.T) { ch := make(chan interface{}) + nonce := evmtypes.Nonce(4) var err error ec := cltest.NewEthConfirmer(t, txStore, ethClient, evmcfg, ethKeyStore, func(id uuid.UUID, value interface{}, thisErr error) error { err = thisErr @@ -3011,17 +3017,21 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID) pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run.ID) - etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 4, 1, fromAddress) + etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress) pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`) // receipt is not passed through as a value since it reverted and caused an error cltest.MustInsertRevertedEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash) - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2 WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) go func() { err2 := ec.ResumePendingTaskRuns(testutils.Context(t), &head) require.NoError(t, err2) + // Retrieve Tx to check if callback completed flag was set to true + updateTx, err3 := txStore.FindTxWithSequence(testutils.Context(t), fromAddress, nonce) + require.NoError(t, err3) + require.Equal(t, true, updateTx.CallbackCompleted) }() select { @@ -3036,6 +3046,28 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { t.Fatal("no value received") } }) + + t.Run("does not mark callback complete if callback fails", func(t *testing.T) { + nonce := evmtypes.Nonce(5) + ec := cltest.NewEthConfirmer(t, txStore, ethClient, evmcfg, ethKeyStore, func(uuid.UUID, interface{}, error) error { + return errors.New("error") + }) + + run := cltest.MustInsertPipelineRun(t, db) + tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID) + + etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress) + cltest.MustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) + + err := ec.ResumePendingTaskRuns(testutils.Context(t), &head) + require.Error(t, err) + + // Retrieve Tx to check if callback completed flag was left unchanged + updateTx, err := txStore.FindTxWithSequence(testutils.Context(t), fromAddress, nonce) + require.NoError(t, err) + require.Equal(t, false, updateTx.CallbackCompleted) + }) } func ptr[T any](t T) *T { return &t } diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 4db7989b466..c3371fee80b 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -115,7 +115,7 @@ type rawOnchainReceipt = evmtypes.Receipt // Does not map to a single database table. // It's comprised of fields from different tables. type dbReceiptPlus struct { - ID uuid.UUID `db:"id"` + ID uuid.UUID `db:"pipeline_task_run_id"` Receipt evmtypes.Receipt `db:"receipt"` FailOnRevert bool `db:"FailOnRevert"` } @@ -180,6 +180,10 @@ type DbEthTx struct { // chain. TransmitChecker *datatypes.JSON InitialBroadcastAt *time.Time + // Marks tx requiring callback + SignalCallback bool + // Marks tx callback as signaled + CallbackCompleted bool } func (db *DbEthTx) FromTx(tx *Tx) { @@ -200,6 +204,8 @@ func (db *DbEthTx) FromTx(tx *Tx) { db.MinConfirmations = tx.MinConfirmations db.TransmitChecker = tx.TransmitChecker db.InitialBroadcastAt = tx.InitialBroadcastAt + db.SignalCallback = tx.SignalCallback + db.CallbackCompleted = tx.CallbackCompleted if tx.ChainID != nil { db.EVMChainID = *utils.NewBig(tx.ChainID) @@ -233,6 +239,8 @@ func (db DbEthTx) ToTx(tx *Tx) { tx.ChainID = db.EVMChainID.ToInt() tx.TransmitChecker = db.TransmitChecker tx.InitialBroadcastAt = db.InitialBroadcastAt + tx.SignalCallback = db.SignalCallback + tx.CallbackCompleted = db.CallbackCompleted } func dbEthTxsToEvmEthTxs(dbEthTxs []DbEthTx) []Tx { @@ -512,8 +520,8 @@ func (o *evmTxStore) InsertTx(etx *Tx) error { if etx.CreatedAt == (time.Time{}) { etx.CreatedAt = time.Now() } - const insertEthTxSQL = `INSERT INTO evm.txes (nonce, from_address, to_address, encoded_payload, value, gas_limit, error, broadcast_at, initial_broadcast_at, created_at, state, meta, subject, pipeline_task_run_id, min_confirmations, evm_chain_id, transmit_checker, idempotency_key) VALUES ( -:nonce, :from_address, :to_address, :encoded_payload, :value, :gas_limit, :error, :broadcast_at, :initial_broadcast_at, :created_at, :state, :meta, :subject, :pipeline_task_run_id, :min_confirmations, :evm_chain_id, :transmit_checker, :idempotency_key + const insertEthTxSQL = `INSERT INTO evm.txes (nonce, from_address, to_address, encoded_payload, value, gas_limit, error, broadcast_at, initial_broadcast_at, created_at, state, meta, subject, pipeline_task_run_id, min_confirmations, evm_chain_id, transmit_checker, idempotency_key, signal_callback, callback_completed) VALUES ( +:nonce, :from_address, :to_address, :encoded_payload, :value, :gas_limit, :error, :broadcast_at, :initial_broadcast_at, :created_at, :state, :meta, :subject, :pipeline_task_run_id, :min_confirmations, :evm_chain_id, :transmit_checker, :idempotency_key, :signal_callback, :callback_completed ) RETURNING *` var dbTx DbEthTx dbTx.FromTx(etx) @@ -941,25 +949,40 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e return attempts, pkgerrors.Wrap(err, "getInProgressEthTxAttempts failed") } -func (o *evmTxStore) FindReceiptsPendingConfirmation(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) { +// Find confirmed txes requiring callback but have not yet been signaled +func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) { var rs []dbReceiptPlus var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() err = o.q.SelectContext(ctx, &rs, ` - SELECT pipeline_task_runs.id, evm.receipts.receipt, COALESCE((evm.txes.meta->>'FailOnRevert')::boolean, false) "FailOnRevert" FROM pipeline_task_runs - INNER JOIN pipeline_runs ON pipeline_runs.id = pipeline_task_runs.pipeline_run_id - INNER JOIN evm.txes ON evm.txes.pipeline_task_run_id = pipeline_task_runs.id + SELECT evm.txes.pipeline_task_run_id, evm.receipts.receipt, COALESCE((evm.txes.meta->>'FailOnRevert')::boolean, false) "FailOnRevert" FROM evm.txes INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash - WHERE pipeline_runs.state = 'suspended' AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2 + WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE + AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2 `, blockNum, chainID.String()) - + if err != nil { + return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err) + } receiptsPlus = fromDBReceiptsPlus(rs) return } +// Update tx to mark that its callback has been signaled +func (o *evmTxStore) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunId uuid.UUID, chainId *big.Int) error { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + _, err := qq.Exec(`UPDATE evm.txes SET callback_completed = TRUE WHERE pipeline_task_run_id = $1 AND evm_chain_id = $2`, pipelineTaskRunId, chainId.String()) + if err != nil { + return fmt.Errorf("failed to mark callback completed for transaction: %w", err) + } + return nil +} + func (o *evmTxStore) FindLatestSequence(ctx context.Context, fromAddress common.Address, chainId *big.Int) (nonce evmtypes.Nonce, err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) @@ -1661,12 +1684,12 @@ func (o *evmTxStore) CreateTransaction(ctx context.Context, txRequest TxRequest, } } err = tx.Get(&dbEtx, ` -INSERT INTO evm.txes (from_address, to_address, encoded_payload, value, gas_limit, state, created_at, meta, subject, evm_chain_id, min_confirmations, pipeline_task_run_id, transmit_checker, idempotency_key) +INSERT INTO evm.txes (from_address, to_address, encoded_payload, value, gas_limit, state, created_at, meta, subject, evm_chain_id, min_confirmations, pipeline_task_run_id, transmit_checker, idempotency_key, signal_callback) VALUES ( -$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,$9,$10,$11,$12 +$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,$9,$10,$11,$12,$13 ) RETURNING "txes".* -`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.MinConfirmations, txRequest.PipelineTaskRunID, txRequest.Checker, txRequest.IdempotencyKey) +`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.MinConfirmations, txRequest.PipelineTaskRunID, txRequest.Checker, txRequest.IdempotencyKey, txRequest.SignalCallback) if err != nil { return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert evm tx") } diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index ba02f118cf5..f8798f9f836 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -21,6 +21,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/pg" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/store/models" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -617,7 +618,7 @@ func TestORM_GetInProgressTxAttempts(t *testing.T) { assert.Equal(t, etx.TxAttempts[0].ID, attempts[0].ID) } -func TestORM_FindReceiptsPendingConfirmation(t *testing.T) { +func TestORM_FindTxesPendingCallback(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) @@ -645,21 +646,50 @@ func TestORM_FindReceiptsPendingConfirmation(t *testing.T) { minConfirmations := int64(2) - run := cltest.MustInsertPipelineRun(t, db) - tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID) - pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run.ID) - - etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress) + // Suspended run waiting for callback + run1 := cltest.MustInsertPipelineRun(t, db) + tr1 := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run1.ID) + pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run1.ID) + etx1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress) pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`) - attempt := etx.TxAttempts[0] - cltest.MustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt.Hash) - - pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2 WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) - - receiptsPlus, err := txStore.FindReceiptsPendingConfirmation(testutils.Context(t), head.Number, ethClient.ConfiguredChainID()) + attempt1 := etx1.TxAttempts[0] + cltest.MustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr1.ID, minConfirmations, etx1.ID) + + // Callback to pipeline service completed. Should be ignored + run2 := cltest.MustInsertPipelineRunWithStatus(t, db, 0, pipeline.RunStatusCompleted) + tr2 := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run2.ID) + etx2 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 4, 1, fromAddress) + pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": false}'`) + attempt2 := etx2.TxAttempts[0] + cltest.MustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt2.Hash) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr2.ID, minConfirmations, etx2.ID) + + // Suspended run younger than minConfirmations. Should be ignored + run3 := cltest.MustInsertPipelineRun(t, db) + tr3 := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run3.ID) + pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run3.ID) + etx3 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 5, 1, fromAddress) + pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": false}'`) + attempt3 := etx3.TxAttempts[0] + cltest.MustInsertEthReceipt(t, txStore, head.Number, head.Hash, attempt3.Hash) + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr3.ID, minConfirmations, etx3.ID) + + // Tx not marked for callback. Should be ignore + etx4 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 6, 1, fromAddress) + attempt4 := etx4.TxAttempts[0] + cltest.MustInsertEthReceipt(t, txStore, head.Number, head.Hash, attempt4.Hash) + pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx4.ID) + + // Unconfirmed Tx without receipts. Should be ignored + etx5 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 7, 1, fromAddress) + pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx5.ID) + + // Search evm.txes table for tx requiring callback + receiptsPlus, err := txStore.FindTxesPendingCallback(testutils.Context(t), head.Number, ethClient.ConfiguredChainID()) require.NoError(t, err) assert.Len(t, receiptsPlus, 1) - assert.Equal(t, tr.ID, receiptsPlus[0].ID) + assert.Equal(t, tr1.ID, receiptsPlus[0].ID) } func Test_FindTxWithIdempotencyKey(t *testing.T) { @@ -1569,6 +1599,35 @@ func TestORM_CreateTransaction(t *testing.T) { assert.Equal(t, tx1.GetID(), tx2.GetID()) }) + + t.Run("sets signal callback flag", func(t *testing.T) { + subject := uuid.New() + strategy := newMockTxStrategy(t) + strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true}) + strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil) + etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ + FromAddress: fromAddress, + ToAddress: toAddress, + EncodedPayload: payload, + FeeLimit: gasLimit, + Meta: nil, + Strategy: strategy, + SignalCallback: true, + }, ethClient.ConfiguredChainID()) + assert.NoError(t, err) + + assert.Greater(t, etx.ID, int64(0)) + assert.Equal(t, fromAddress, etx.FromAddress) + assert.Equal(t, true, etx.SignalCallback) + + cltest.AssertCount(t, db, "evm.txes", 3) + + var dbEthTx txmgr.DbEthTx + require.NoError(t, db.Get(&dbEthTx, `SELECT * FROM evm.txes ORDER BY id DESC LIMIT 1`)) + + assert.Equal(t, fromAddress, dbEthTx.FromAddress) + assert.Equal(t, true, dbEthTx.SignalCallback) + }) } func TestORM_PruneUnstartedTxQueue(t *testing.T) { diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index 4632a8ae342..f491bda40bb 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -183,32 +183,6 @@ func (_m *EvmTxStore) FindNextUnstartedTransactionFromAddress(ctx context.Contex return r0 } -// FindReceiptsPendingConfirmation provides a mock function with given fields: ctx, blockNum, chainID -func (_m *EvmTxStore) FindReceiptsPendingConfirmation(ctx context.Context, blockNum int64, chainID *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error) { - ret := _m.Called(ctx, blockNum, chainID) - - var r0 []types.ReceiptPlus[*evmtypes.Receipt] - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)); ok { - return rf(ctx, blockNum, chainID) - } - if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) []types.ReceiptPlus[*evmtypes.Receipt]); ok { - r0 = rf(ctx, blockNum, chainID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.ReceiptPlus[*evmtypes.Receipt]) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, int64, *big.Int) error); ok { - r1 = rf(ctx, blockNum, chainID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // FindTransactionsConfirmedInBlockRange provides a mock function with given fields: ctx, highBlockNumber, lowBlockNumber, chainID func (_m *EvmTxStore) FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber int64, lowBlockNumber int64, chainID *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { ret := _m.Called(ctx, highBlockNumber, lowBlockNumber, chainID) @@ -493,6 +467,32 @@ func (_m *EvmTxStore) FindTxesByMetaFieldAndStates(ctx context.Context, metaFiel return r0, r1 } +// FindTxesPendingCallback provides a mock function with given fields: ctx, blockNum, chainID +func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error) { + ret := _m.Called(ctx, blockNum, chainID) + + var r0 []types.ReceiptPlus[*evmtypes.Receipt] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)); ok { + return rf(ctx, blockNum, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) []types.ReceiptPlus[*evmtypes.Receipt]); ok { + r0 = rf(ctx, blockNum, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]types.ReceiptPlus[*evmtypes.Receipt]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, *big.Int) error); ok { + r1 = rf(ctx, blockNum, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindTxesWithAttemptsAndReceiptsByIdsAndState provides a mock function with given fields: ctx, ids, states, chainID func (_m *EvmTxStore) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []types.TxState, chainID *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { ret := _m.Called(ctx, ids, states, chainID) @@ -1018,6 +1018,20 @@ func (_m *EvmTxStore) UpdateTxAttemptInProgressToBroadcast(ctx context.Context, return r0 } +// UpdateTxCallbackCompleted provides a mock function with given fields: ctx, pipelineTaskRunRid, chainId +func (_m *EvmTxStore) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId *big.Int) error { + ret := _m.Called(ctx, pipelineTaskRunRid, chainId) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, *big.Int) error); ok { + r0 = rf(ctx, pipelineTaskRunRid, chainId) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // UpdateTxFatalError provides a mock function with given fields: ctx, etx func (_m *EvmTxStore) UpdateTxFatalError(ctx context.Context, etx *types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) error { ret := _m.Called(ctx, etx) diff --git a/core/services/pipeline/orm.go b/core/services/pipeline/orm.go index d60050700f7..056a7deab28 100644 --- a/core/services/pipeline/orm.go +++ b/core/services/pipeline/orm.go @@ -306,13 +306,13 @@ func (o *orm) UpdateTaskRunResult(taskID uuid.UUID, result Result) (run Run, sta WHERE pipeline_task_runs.id = $1 AND pipeline_runs.state in ('running', 'suspended') FOR UPDATE` if err = tx.Get(&run, sql, taskID); err != nil { - return err + return fmt.Errorf("failed to find pipeline run for ID %s: %w", taskID.String(), err) } // Update the task with result sql = `UPDATE pipeline_task_runs SET output = $2, error = $3, finished_at = $4 WHERE id = $1` if _, err = tx.Exec(sql, taskID, result.OutputDB(), result.ErrorDB(), time.Now()); err != nil { - return errors.Wrap(err, "UpdateTaskRunResult") + return fmt.Errorf("failed to update pipeline task run: %w", err) } if run.State == RunStatusSuspended { @@ -321,7 +321,7 @@ func (o *orm) UpdateTaskRunResult(taskID uuid.UUID, result Result) (run Run, sta sql = `UPDATE pipeline_runs SET state = $2 WHERE id = $1` if _, err = tx.Exec(sql, run.ID, run.State); err != nil { - return errors.Wrap(err, "UpdateTaskRunResult") + return fmt.Errorf("failed to update pipeline run state: %w", err) } } diff --git a/core/services/pipeline/runner.go b/core/services/pipeline/runner.go index 3dbe94747e7..d33913b4753 100644 --- a/core/services/pipeline/runner.go +++ b/core/services/pipeline/runner.go @@ -609,7 +609,7 @@ func (r *runner) ResumeRun(taskID uuid.UUID, value interface{}, err error) error Error: err, }) if err != nil { - return err + return fmt.Errorf("failed to update task run result: %w", err) } // TODO: Should probably replace this with a listener to update events diff --git a/core/services/pipeline/task.eth_tx.go b/core/services/pipeline/task.eth_tx.go index 57f1c0a7ed8..384c86446e7 100644 --- a/core/services/pipeline/task.eth_tx.go +++ b/core/services/pipeline/task.eth_tx.go @@ -155,6 +155,7 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu ForwarderAddress: forwarderAddress, Strategy: strategy, Checker: transmitChecker, + SignalCallback: true, } if minOutgoingConfirmations > 0 { diff --git a/core/services/pipeline/task.eth_tx_test.go b/core/services/pipeline/task.eth_tx_test.go index e5f50bc29e5..a0ff54d4448 100644 --- a/core/services/pipeline/task.eth_tx_test.go +++ b/core/services/pipeline/task.eth_tx_test.go @@ -95,6 +95,7 @@ func TestETHTxTask(t *testing.T) { CheckerType: txmgr.TransmitCheckerTypeVRFV2, VRFCoordinatorAddress: &addr, }, + SignalCallback: true, }).Return(txmgr.Tx{}, nil) }, nil, nil, "", pipeline.RunInfo{}, @@ -138,6 +139,7 @@ func TestETHTxTask(t *testing.T) { FeeLimit: gasLimit, Meta: txMeta, Strategy: txmgrcommon.NewSendEveryStrategy(), + SignalCallback: true, }).Return(txmgr.Tx{}, nil) }, nil, nil, "", pipeline.RunInfo{}, @@ -215,6 +217,7 @@ func TestETHTxTask(t *testing.T) { FeeLimit: gasLimit, Meta: txMeta, Strategy: txmgrcommon.NewSendEveryStrategy(), + SignalCallback: true, }).Return(txmgr.Tx{}, nil) }, nil, nil, "", pipeline.RunInfo{}, @@ -260,6 +263,7 @@ func TestETHTxTask(t *testing.T) { FeeLimit: gasLimit, Meta: txMeta, Strategy: txmgrcommon.NewSendEveryStrategy(), + SignalCallback: true, }).Return(txmgr.Tx{}, nil) }, nil, nil, "", pipeline.RunInfo{}, @@ -290,6 +294,7 @@ func TestETHTxTask(t *testing.T) { FeeLimit: gasLimit, Meta: txMeta, Strategy: txmgrcommon.NewSendEveryStrategy(), + SignalCallback: true, }).Return(txmgr.Tx{}, nil) }, nil, nil, "", pipeline.RunInfo{}, @@ -324,6 +329,7 @@ func TestETHTxTask(t *testing.T) { FeeLimit: drJobTypeGasLimit, Meta: txMeta, Strategy: txmgrcommon.NewSendEveryStrategy(), + SignalCallback: true, }).Return(txmgr.Tx{}, nil) }, nil, nil, "", pipeline.RunInfo{}, @@ -358,6 +364,7 @@ func TestETHTxTask(t *testing.T) { FeeLimit: specGasLimit, Meta: txMeta, Strategy: txmgrcommon.NewSendEveryStrategy(), + SignalCallback: true, }).Return(txmgr.Tx{}, nil) }, nil, nil, "", pipeline.RunInfo{}, @@ -423,6 +430,7 @@ func TestETHTxTask(t *testing.T) { FeeLimit: gasLimit, Meta: txMeta, Strategy: txmgrcommon.NewSendEveryStrategy(), + SignalCallback: true, }).Return(txmgr.Tx{}, errors.New("uh oh")) }, nil, pipeline.ErrTaskRunFailed, "while creating transaction", pipeline.RunInfo{IsRetryable: true}, diff --git a/core/store/migrate/migrations/0209_add_resume_pipeline_task_flags_to_evm_txes.sql b/core/store/migrate/migrations/0209_add_resume_pipeline_task_flags_to_evm_txes.sql new file mode 100644 index 00000000000..dbe7e91b9f6 --- /dev/null +++ b/core/store/migrate/migrations/0209_add_resume_pipeline_task_flags_to_evm_txes.sql @@ -0,0 +1,15 @@ +-- +goose Up +ALTER TABLE evm.txes ADD COLUMN "signal_callback" BOOL DEFAULT FALSE; +ALTER TABLE evm.txes ADD COLUMN "callback_completed" BOOL DEFAULT FALSE; + +UPDATE evm.txes +SET signal_callback = TRUE AND callback_completed = FALSE +WHERE evm.txes.pipeline_task_run_id IN ( + SELECT pipeline_task_runs.id FROM pipeline_task_runs + INNER JOIN pipeline_runs ON pipeline_runs.id = pipeline_task_runs.pipeline_run_id + WHERE pipeline_runs.state = 'suspended' +); + +-- +goose Down +ALTER TABLE evm.txes DROP COLUMN "signal_callback"; +ALTER TABLE evm.txes DROP COLUMN "callback_completed";