Skip to content

Commit

Permalink
Decouple job pipeline tables from the TXM DB (#11173)
Browse files Browse the repository at this point in the history
* Decoupled job pipeline tables from the TXM DB

* Updated errors to return better context

* Updated method comment
  • Loading branch information
amit-momin authored Nov 13, 2023
1 parent 023985b commit aa02231
Show file tree
Hide file tree
Showing 15 changed files with 281 additions and 93 deletions.
7 changes: 6 additions & 1 deletion common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,12 +775,17 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
// Now we have an errored pipeline even though the tx succeeded. This case
// is relatively benign and probably nobody will ever run into it in
// practice, but something to be aware of.
if etx.PipelineTaskRunID.Valid && eb.resumeCallback != nil {
if etx.PipelineTaskRunID.Valid && eb.resumeCallback != nil && etx.SignalCallback {
err := eb.resumeCallback(etx.PipelineTaskRunID.UUID, nil, errors.Errorf("fatal error while sending transaction: %s", etx.Error.String))
if errors.Is(err, sql.ErrNoRows) {
lgr.Debugw("callback missing or already resumed", "etxID", etx.ID)
} else if err != nil {
return errors.Wrap(err, "failed to resume pipeline")
} else {
// Mark tx as having completed callback
if err := eb.txStore.UpdateTxCallbackCompleted(ctx, etx.PipelineTaskRunID.UUID, eb.chainID); err != nil {
return err
}
}
}
return eb.txStore.UpdateTxFatalError(ctx, etx)
Expand Down
6 changes: 5 additions & 1 deletion common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen
// ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error {

receiptsPlus, err := ec.txStore.FindReceiptsPendingConfirmation(ctx, head.BlockNumber(), ec.chainID)
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID)

if err != nil {
return err
Expand All @@ -1105,6 +1105,10 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Res

ec.lggr.Debugw("Callback: resuming tx with receipt", "output", output, "taskErr", taskErr, "pipelineTaskRunID", data.ID)
if err := ec.resumeCallback(data.ID, output, taskErr); err != nil {
return fmt.Errorf("failed to resume suspended pipeline run: %w", err)
}
// Mark tx as having completed callback
if err := ec.txStore.UpdateTxCallbackCompleted(ctx, data.ID, ec.chainID); err != nil {
return err
}
}
Expand Down
66 changes: 40 additions & 26 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type TxRequest[ADDR types.Hashable, TX_HASH types.Hashable] struct {

// Checker defines the check that should be run before a transaction is submitted on chain.
Checker TransmitCheckerSpec[ADDR]

// Mark tx requiring callback
SignalCallback bool
}

// TransmitCheckerSpec defines the check that should be performed before a transaction is submitted
Expand Down Expand Up @@ -217,6 +220,11 @@ type Tx[
// TransmitChecker defines the check that should be performed before a transaction is submitted on
// chain.
TransmitChecker *datatypes.JSON

// Marks tx requiring callback
SignalCallback bool
// Marks tx callback as signaled
CallbackCompleted bool
}

func (e *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetError() error {
Expand Down
8 changes: 6 additions & 2 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ type TxStore[
TxHistoryReaper[CHAIN_ID]
TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]

// methods for saving & retreiving receipts
FindReceiptsPendingConfirmation(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled
FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
SaveFetchedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) (err error)

// additional methods for tx store management
Expand Down Expand Up @@ -93,6 +95,8 @@ type TransactionStore[
SetBroadcastBeforeBlockNum(ctx context.Context, blockNum int64, chainID CHAIN_ID) error
UpdateBroadcastAts(ctx context.Context, now time.Time, etxIDs []int64) error
UpdateTxAttemptInProgressToBroadcast(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], NewAttemptState TxAttemptState) error
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
UpdateTxsUnconfirmed(ctx context.Context, ids []int64) error
UpdateTxUnstartedToInProgress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
UpdateTxFatalError(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/txmgr/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) {
FeeLimit: gasLimit,
State: txmgrcommon.TxUnstarted,
PipelineTaskRunID: uuid.NullUUID{UUID: tr.ID, Valid: true},
SignalCallback: true,
}

t.Run("with erroring callback bails out", func(t *testing.T) {
Expand Down
48 changes: 40 additions & 8 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2933,11 +2933,12 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {

etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 1, 1, fromAddress)
cltest.MustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2 WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)
// Setting both signal_callback and callback_completed to TRUE to simulate a completed pipeline task
// It would only be in a state past suspended if the resume callback was called and callback_completed was set to TRUE
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(testutils.Context(t), &head)
require.NoError(t, err)

})

t.Run("doesn't process task runs where the receipt is younger than minConfirmations", func(t *testing.T) {
Expand All @@ -2952,15 +2953,15 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 2, 1, fromAddress)
cltest.MustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash)

pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2 WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(testutils.Context(t), &head)
require.NoError(t, err)

})

t.Run("processes eth_txes with receipts older than minConfirmations", func(t *testing.T) {
ch := make(chan interface{})
nonce := evmtypes.Nonce(3)
var err error
ec := cltest.NewEthConfirmer(t, txStore, ethClient, evmcfg, ethKeyStore, func(id uuid.UUID, value interface{}, thisErr error) error {
err = thisErr
Expand All @@ -2972,15 +2973,19 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID)
pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run.ID)

etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress)
etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress)
pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`)
receipt := cltest.MustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)

pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2 WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

go func() {
err2 := ec.ResumePendingTaskRuns(testutils.Context(t), &head)
require.NoError(t, err2)
// Retrieve Tx to check if callback completed flag was set to true
updateTx, err3 := txStore.FindTxWithSequence(testutils.Context(t), fromAddress, nonce)
require.NoError(t, err3)
require.Equal(t, true, updateTx.CallbackCompleted)
}()

select {
Expand All @@ -3000,6 +3005,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {

t.Run("processes eth_txes with receipt older than minConfirmations that reverted", func(t *testing.T) {
ch := make(chan interface{})
nonce := evmtypes.Nonce(4)
var err error
ec := cltest.NewEthConfirmer(t, txStore, ethClient, evmcfg, ethKeyStore, func(id uuid.UUID, value interface{}, thisErr error) error {
err = thisErr
Expand All @@ -3011,17 +3017,21 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID)
pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run.ID)

etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 4, 1, fromAddress)
etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress)
pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`)

// receipt is not passed through as a value since it reverted and caused an error
cltest.MustInsertRevertedEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)

pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2 WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

go func() {
err2 := ec.ResumePendingTaskRuns(testutils.Context(t), &head)
require.NoError(t, err2)
// Retrieve Tx to check if callback completed flag was set to true
updateTx, err3 := txStore.FindTxWithSequence(testutils.Context(t), fromAddress, nonce)
require.NoError(t, err3)
require.Equal(t, true, updateTx.CallbackCompleted)
}()

select {
Expand All @@ -3036,6 +3046,28 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
t.Fatal("no value received")
}
})

t.Run("does not mark callback complete if callback fails", func(t *testing.T) {
nonce := evmtypes.Nonce(5)
ec := cltest.NewEthConfirmer(t, txStore, ethClient, evmcfg, ethKeyStore, func(uuid.UUID, interface{}, error) error {
return errors.New("error")
})

run := cltest.MustInsertPipelineRun(t, db)
tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID)

etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress)
cltest.MustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(testutils.Context(t), &head)
require.Error(t, err)

// Retrieve Tx to check if callback completed flag was left unchanged
updateTx, err := txStore.FindTxWithSequence(testutils.Context(t), fromAddress, nonce)
require.NoError(t, err)
require.Equal(t, false, updateTx.CallbackCompleted)
})
}

func ptr[T any](t T) *T { return &t }
Loading

0 comments on commit aa02231

Please sign in to comment.