Skip to content

Commit

Permalink
Update fluxmonitor to use IdempotencyKey when creating Tx (#10589)
Browse files Browse the repository at this point in the history
* Updated fluxmonitor to use IdempotencyKey when creating Tx

* Randomly generate idempotency key for fluxmonitor tests
  • Loading branch information
amit-momin authored Sep 19, 2023
1 parent 9015419 commit c32eb83
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 61 deletions.
7 changes: 3 additions & 4 deletions core/services/fluxmonitorv2/contract_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/flux_aggregator_wrapper"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

//go:generate mockery --quiet --name ContractSubmitter --output ./mocks/ --case=underscore
Expand All @@ -17,7 +16,7 @@ var FluxAggregatorABI = evmtypes.MustGetABI(flux_aggregator_wrapper.FluxAggregat

// ContractSubmitter defines an interface to submit an eth tx.
type ContractSubmitter interface {
Submit(roundID *big.Int, submission *big.Int, qopts ...pg.QOpt) error
Submit(roundID *big.Int, submission *big.Int, idempotencyKey *string) error
}

// FluxAggregatorContractSubmitter submits the polled answer in an eth tx.
Expand Down Expand Up @@ -51,7 +50,7 @@ func NewFluxAggregatorContractSubmitter(

// Submit submits the answer by writing a EthTx for the txmgr to
// pick up
func (c *FluxAggregatorContractSubmitter) Submit(roundID *big.Int, submission *big.Int, qopts ...pg.QOpt) error {
func (c *FluxAggregatorContractSubmitter) Submit(roundID *big.Int, submission *big.Int, idempotencyKey *string) error {
fromAddress, err := c.keyStore.GetRoundRobinAddress(c.chainID)
if err != nil {
return err
Expand All @@ -63,7 +62,7 @@ func (c *FluxAggregatorContractSubmitter) Submit(roundID *big.Int, submission *b
}

return errors.Wrap(
c.orm.CreateEthTransaction(fromAddress, c.Address(), payload, c.gasLimit, qopts...),
c.orm.CreateEthTransaction(fromAddress, c.Address(), payload, c.gasLimit, idempotencyKey),
"failed to send Eth transaction",
)
}
7 changes: 5 additions & 2 deletions core/services/fluxmonitorv2/contract_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"math/big"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink/v2/core/internal/mocks"
Expand Down Expand Up @@ -32,8 +33,10 @@ func TestFluxAggregatorContractSubmitter_Submit(t *testing.T) {

keyStore.On("GetRoundRobinAddress", testutils.FixtureChainID).Return(fromAddress, nil)
fluxAggregator.On("Address").Return(toAddress)
orm.On("CreateEthTransaction", fromAddress, toAddress, payload, gasLimit).Return(nil)

err = submitter.Submit(roundID, submission)
idempotencyKey := uuid.New().String()
orm.On("CreateEthTransaction", fromAddress, toAddress, payload, gasLimit, &idempotencyKey).Return(nil)

err = submitter.Submit(roundID, submission, &idempotencyKey)
assert.NoError(t, err)
}
5 changes: 4 additions & 1 deletion core/services/fluxmonitorv2/flux_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,13 +1073,16 @@ func (fm *FluxMonitor) initialRoundState() flux_aggregator_wrapper.OracleRoundSt
}

func (fm *FluxMonitor) queueTransactionForTxm(tx pg.Queryer, runID int64, answer decimal.Decimal, roundID uint32, log *flux_aggregator_wrapper.FluxAggregatorNewRound) error {
// Use pipeline run ID to generate globally unique key that can correlate this run to a Tx
idempotencyKey := fmt.Sprintf("fluxmonitor-%d", runID)
// Submit the Eth Tx
err := fm.contractSubmitter.Submit(
new(big.Int).SetInt64(int64(roundID)),
answer.BigInt(),
pg.WithQueryer(tx),
&idempotencyKey,
)
if err != nil {
fm.logger.Errorw("failed to submit Tx to TXM", "err", err)
return err
}

Expand Down
60 changes: 37 additions & 23 deletions core/services/fluxmonitorv2/flux_monitor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fluxmonitorv2_test

import (
"fmt"
"math/big"
"strings"
"testing"
Expand Down Expand Up @@ -130,6 +131,11 @@ func setupMocks(t *testing.T) *testMocks {
return tm
}

func buildIdempotencyKey(ID int64) *string {
key := fmt.Sprintf("fluxmonitor-%d", ID)
return &key
}

type setupOptions struct {
pollTickerDisabled bool
idleTimerDisabled bool
Expand Down Expand Up @@ -449,7 +455,7 @@ func TestFluxMonitor_PollIfEligible(t *testing.T) {
},
},
), mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(answers.polledAnswer),
Expand All @@ -468,7 +474,7 @@ func TestFluxMonitor_PollIfEligible(t *testing.T) {
}).
Once()
tm.contractSubmitter.
On("Submit", big.NewInt(reportableRoundID), big.NewInt(answers.polledAnswer), mock.Anything).
On("Submit", big.NewInt(reportableRoundID), big.NewInt(answers.polledAnswer), buildIdempotencyKey(run.ID)).
Return(nil).
Once()

Expand Down Expand Up @@ -578,6 +584,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
tm.orm.On("MostRecentFluxMonitorRoundID", contractAddress).Return(uint32(4), nil)

// Round 1
run := pipeline.Run{ID: 1}
tm.orm.
On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(1), mock.Anything).
Return(fluxmonitorv2.FluxMonitorRoundStatsV2{
Expand All @@ -586,23 +593,23 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
}, nil)
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(fetchedValue),
Error: nil,
},
Task: &pipeline.HTTPTask{},
},
}, nil)
}, nil).Once()
tm.pipelineRunner.
On("InsertFinishedRun", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
args.Get(0).(*pipeline.Run).ID = 1
})
}).Once()
tm.contractSubmitter.
On("Submit", big.NewInt(1), big.NewInt(fetchedValue), mock.Anything).
On("Submit", big.NewInt(1), big.NewInt(fetchedValue), buildIdempotencyKey(run.ID)).
Return(nil).
Once()

Expand All @@ -617,6 +624,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
Return(nil).Once()

// Round 3
run = pipeline.Run{ID: 2}
tm.orm.
On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(3), mock.Anything).
Return(fluxmonitorv2.FluxMonitorRoundStatsV2{
Expand All @@ -625,23 +633,23 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
}, nil)
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(fetchedValue),
Error: nil,
},
Task: &pipeline.HTTPTask{},
},
}, nil)
}, nil).Once()
tm.pipelineRunner.
On("InsertFinishedRun", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
args.Get(0).(*pipeline.Run).ID = 2
})
}).Once()
tm.contractSubmitter.
On("Submit", big.NewInt(3), big.NewInt(fetchedValue), mock.Anything).
On("Submit", big.NewInt(3), big.NewInt(fetchedValue), buildIdempotencyKey(run.ID)).
Return(nil).
Once()
tm.orm.
Expand All @@ -655,6 +663,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
Return(nil).Once()

// Round 4
run = pipeline.Run{ID: 3}
tm.orm.
On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(4), mock.Anything).
Return(fluxmonitorv2.FluxMonitorRoundStatsV2{
Expand All @@ -663,23 +672,23 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) {
}, nil)
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(fetchedValue),
Error: nil,
},
Task: &pipeline.HTTPTask{},
},
}, nil)
}, nil).Once()
tm.pipelineRunner.
On("InsertFinishedRun", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
args.Get(0).(*pipeline.Run).ID = 3
})
}).Once()
tm.contractSubmitter.
On("Submit", big.NewInt(4), big.NewInt(fetchedValue), mock.Anything).
On("Submit", big.NewInt(4), big.NewInt(fetchedValue), buildIdempotencyKey(run.ID)).
Return(nil).
Once()
tm.orm.
Expand Down Expand Up @@ -1475,6 +1484,8 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
answer = 100
)

run := pipeline.Run{ID: 1}

tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once()
tm.logBroadcaster.On("IsConnected").Return(true).Maybe()

Expand All @@ -1488,7 +1499,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
}, nil).Once()
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(answer),
Expand All @@ -1504,7 +1515,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
args.Get(0).(*pipeline.Run).ID = 1
})
tm.logBroadcaster.On("MarkConsumed", mock.Anything, mock.Anything).Return(nil).Once()
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), mock.Anything).Return(nil).Once()
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once()
tm.orm.
On("UpdateFluxMonitorRoundStats",
contractAddress,
Expand Down Expand Up @@ -1588,6 +1599,8 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
roundID = 3
answer = 100
)

run := pipeline.Run{ID: 1}
tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once()
tm.logBroadcaster.On("IsConnected").Return(true).Maybe()

Expand All @@ -1614,7 +1627,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
}, nil).Once()
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(answer),
Expand All @@ -1629,7 +1642,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
Run(func(args mock.Arguments) {
args.Get(0).(*pipeline.Run).ID = 1
})
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), mock.Anything).Return(nil).Once()
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once()
tm.orm.
On("UpdateFluxMonitorRoundStats",
contractAddress,
Expand Down Expand Up @@ -1683,6 +1696,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
roundID = 3
answer = 100
)
run := pipeline.Run{ID: 1}
tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once()
tm.logBroadcaster.On("IsConnected").Return(true).Maybe()

Expand All @@ -1709,7 +1723,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
}, nil).Once()
tm.pipelineRunner.
On("ExecuteRun", mock.Anything, pipelineSpec, mock.Anything, mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(run, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(answer),
Expand All @@ -1724,7 +1738,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
Run(func(args mock.Arguments) {
args.Get(0).(*pipeline.Run).ID = 1
})
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), mock.Anything).Return(nil).Once()
tm.contractSubmitter.On("Submit", big.NewInt(roundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once()
tm.orm.
On("UpdateFluxMonitorRoundStats",
contractAddress,
Expand Down Expand Up @@ -1797,7 +1811,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) {
Once()

// and that should result in a new submission
tm.contractSubmitter.On("Submit", big.NewInt(olderRoundID), big.NewInt(answer), mock.Anything).Return(nil).Once()
tm.contractSubmitter.On("Submit", big.NewInt(olderRoundID), big.NewInt(answer), buildIdempotencyKey(run.ID)).Return(nil).Once()

tm.orm.
On("UpdateFluxMonitorRoundStats",
Expand Down Expand Up @@ -1887,7 +1901,7 @@ func TestFluxMonitor_DrumbeatTicker(t *testing.T) {
},
},
), mock.Anything).
Return(pipeline.Run{}, pipeline.TaskRunResults{
Return(pipeline.Run{ID: runID}, pipeline.TaskRunResults{
{
Result: pipeline.Result{
Value: decimal.NewFromInt(fetchedAnswer),
Expand All @@ -1905,7 +1919,7 @@ func TestFluxMonitor_DrumbeatTicker(t *testing.T) {
}).
Once()
tm.contractSubmitter.
On("Submit", big.NewInt(int64(roundID)), answerBigInt, mock.Anything).
On("Submit", big.NewInt(int64(roundID)), answerBigInt, buildIdempotencyKey(runID)).
Return(nil).
Once()

Expand Down
19 changes: 5 additions & 14 deletions core/services/fluxmonitorv2/mocks/contract_submitter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 5 additions & 12 deletions core/services/fluxmonitorv2/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c32eb83

Please sign in to comment.