From de5027383ba3120554e8dc88b5b178973585159b Mon Sep 17 00:00:00 2001 From: ferglor <19188060+ferglor@users.noreply.github.com> Date: Mon, 13 Nov 2023 15:04:10 +0000 Subject: [PATCH 1/2] Always wait for doCheck to complete before returning (#10878) * Always wait for doCheck to complete before returning Use threadctrl to avail of context cancellation but still rely on a wait group to blocl until the lookups finish Update tests * Revert async streamsLookup to fix test * Rearrange thread control as a test * Copy index * WIP --- .../ocr2keeper/evm21/registry_check_pipeline.go | 5 ++++- .../plugins/ocr2keeper/evm21/streams_lookup.go | 16 +++++++++++++--- .../ocr2keeper/evm21/streams_lookup_test.go | 15 ++++++++++++++- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go index d3530994702..c9752ea14db 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go @@ -41,7 +41,10 @@ func (r *EvmRegistry) CheckUpkeeps(ctx context.Context, keys ...ocr2keepers.Upke } chResult := make(chan checkResult, 1) - go r.doCheck(ctx, keys, chResult) + + r.threadCtrl.Go(func(ctx context.Context) { + r.doCheck(ctx, keys, chResult) + }) select { case rs := <-chResult: diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index 660550afe97..fb2821a74b7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -149,10 +149,15 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep } var wg sync.WaitGroup + for i, lookup := range lookups { + i := i wg.Add(1) - go r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) + r.threadCtrl.Go(func(ctx context.Context) { + r.doLookup(ctx, &wg, lookup, i, checkResults, lggr) + }) } + wg.Wait() // don't surface error to plugin bc StreamsLookup process should be self-contained. @@ -289,14 +294,19 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, p if sl.FeedParamKey == feedIdHex && sl.TimeParamKey == blockNumber { // only mercury v0.2 for i := range sl.Feeds { - go r.singleFeedRequest(ctx, ch, i, sl, lggr) + i := i + r.threadCtrl.Go(func(ctx context.Context) { + r.singleFeedRequest(ctx, ch, i, sl, lggr) + }) } } else if sl.FeedParamKey == feedIDs { // only mercury v0.3 resultLen = 1 isMercuryV03 = true ch = make(chan MercuryData, resultLen) - go r.multiFeedsRequest(ctx, ch, sl, lggr) + r.threadCtrl.Go(func(ctx context.Context) { + r.multiFeedsRequest(ctx, ch, sl, lggr) + }) } else { return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go index 8d7c67d80ce..145d701454d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go @@ -25,6 +25,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/encoding" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/mocks" + "github.com/smartcontractkit/chainlink/v2/core/utils" evmClientMocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -70,7 +71,8 @@ func setupEVMRegistry(t *testing.T) *EvmRegistry { allowListCache: cache.New(defaultAllowListExpiration, cleanupInterval), pluginRetryCache: cache.New(defaultPluginRetryExpiration, cleanupInterval), }, - hc: mockHttpClient, + hc: mockHttpClient, + threadCtrl: utils.NewThreadControl(), } return r } @@ -220,6 +222,7 @@ func TestEvmRegistry_StreamsLookup(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() client := new(evmClientMocks.Client) r.client = client @@ -362,6 +365,7 @@ func TestEvmRegistry_AllowedToUseMercury(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() client := new(evmClientMocks.Client) r.client = client @@ -576,9 +580,12 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() + if tt.pluginRetries != 0 { r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration) } + hc := mocks.NewHttpClient(t) for _, blob := range tt.mockChainlinkBlobs { @@ -812,6 +819,8 @@ func TestEvmRegistry_SingleFeedRequest(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() + hc := mocks.NewHttpClient(t) mr := MercuryV02Response{ChainlinkBlob: tt.blob} @@ -1157,6 +1166,8 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + defer r.Close() + if tt.pluginRetries != 0 { r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration) } @@ -1319,6 +1330,8 @@ func TestEvmRegistry_CheckCallback(t *testing.T) { t.Run(tt.name, func(t *testing.T) { client := new(evmClientMocks.Client) r := setupEVMRegistry(t) + defer r.Close() + payload, err := r.abi.Pack("checkCallback", tt.lookup.upkeepId, values, tt.lookup.ExtraData) require.Nil(t, err) args := map[string]interface{}{ From 12062831ae01b21c6ea6bc052107ecbe254da6c4 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Mon, 13 Nov 2023 15:04:39 +0000 Subject: [PATCH 2/2] Remove Mercury plugin dependency on raw evm chain. (#11201) * mercury: remove dead code * mercury: Add ChainReader * mercury: services init * fix sqlx import * mercury: error handling * mod tidy * mercury: log error from reader * mercury: ensure a failed observation when reading from chain return an error * mercury: add test for setLatestBlocks error * make a happy linter * Update core/services/relay/evm/mercury_provider.go Co-authored-by: Sam --------- Co-authored-by: Sam --- core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 +- core/services/ocr2/delegate.go | 6 +- core/services/ocr2/plugins/mercury/plugin.go | 3 +- core/services/relay/evm/evm.go | 5 +- .../evm/mercury/mocks/chain_head_tracker.go | 47 ------------- .../services/relay/evm/mercury/types/types.go | 6 -- .../relay/evm/mercury/v1/data_source.go | 53 +++++++------- .../relay/evm/mercury/v1/data_source_test.go | 70 +++++++++---------- core/services/relay/evm/mercury_provider.go | 38 ++++++++++ go.mod | 2 +- go.sum | 4 +- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 +- 14 files changed, 113 insertions(+), 133 deletions(-) delete mode 100644 core/services/relay/evm/mercury/mocks/chain_head_tracker.go diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 5f881f354e6..bb68175ddfc 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -304,7 +304,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255 // indirect - github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78 // indirect + github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742 // indirect github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 1d455305a94..bd3b75d37aa 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1464,8 +1464,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255 h1:Pt6c7bJU9wIN6PQQnmN8UmYYH6lpfiQ6U/B8yEC2s5s= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255/go.mod h1:EHppaccd/LTlTMI2o4dmBHe4BknEgEFFDjDGMNuGb3k= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78 h1:ZBsxdB/5iIpl/tWhXe/RHrOwBG7pbKOMeppy5Zt2BVc= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742 h1:28XkPE6YfJ4uabTX9/7sueRV6IKtY4hcm1nIt1e6b20= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8= diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 95ec1469156..19296c72f00 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -653,10 +653,6 @@ func (d *Delegate) newServicesMercury( if err != nil { return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: "mercury"} } - chain, err := d.legacyChains.Get(rid.ChainID) - if err != nil { - return nil, fmt.Errorf("mercury services: failed to get chain %s: %w", rid.ChainID, err) - } provider, err2 := relayer.NewPluginProvider(ctx, types.RelayArgs{ @@ -695,7 +691,7 @@ func (d *Delegate) newServicesMercury( chEnhancedTelem := make(chan ocrcommon.EnhancedTelemetryMercuryData, 100) - mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) + mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) { enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury")) diff --git a/core/services/ocr2/plugins/mercury/plugin.go b/core/services/ocr2/plugins/mercury/plugin.go index 69a3b53c284..ddef1374a4c 100644 --- a/core/services/ocr2/plugins/mercury/plugin.go +++ b/core/services/ocr2/plugins/mercury/plugin.go @@ -37,7 +37,6 @@ func NewServices( argsNoPlugin libocr2.MercuryOracleArgs, cfg Config, chEnhancedTelem chan ocrcommon.EnhancedTelemetryMercuryData, - chainHeadTracker types.ChainHeadTracker, orm types.DataSourceORM, feedID utils.FeedID, ) ([]job.ServiceCtx, error) { @@ -66,7 +65,7 @@ func NewServices( lggr, runResults, chEnhancedTelem, - chainHeadTracker, + ocr2Provider.ChainReader(), ocr2Provider.MercuryServerFetcher(), pluginConfig.InitialBlockNumber.Ptr(), feedID, diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 111e3622b19..aa1d1d774bd 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -10,10 +10,10 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" + "github.com/jmoiron/sqlx" pkgerrors "github.com/pkg/errors" "go.uber.org/multierr" - "github.com/jmoiron/sqlx" "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median/evmreportcodec" @@ -189,7 +189,8 @@ func (r *Relayer) NewMercuryProvider(rargs relaytypes.RelayArgs, pargs relaytype } transmitter := mercury.NewTransmitter(lggr, cw.ContractConfigTracker(), client, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.db, r.pgCfg, transmitterCodec) - return NewMercuryProvider(cw, transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil + chainReader := NewChainReader(r.chain.HeadTracker()) + return NewMercuryProvider(cw, transmitter, reportCodecV1, reportCodecV2, reportCodecV3, chainReader, lggr), nil } func (r *Relayer) NewFunctionsProvider(rargs relaytypes.RelayArgs, pargs relaytypes.PluginArgs) (relaytypes.FunctionsProvider, error) { diff --git a/core/services/relay/evm/mercury/mocks/chain_head_tracker.go b/core/services/relay/evm/mercury/mocks/chain_head_tracker.go deleted file mode 100644 index b6f2981cf07..00000000000 --- a/core/services/relay/evm/mercury/mocks/chain_head_tracker.go +++ /dev/null @@ -1,47 +0,0 @@ -// Code generated by mockery v2.35.4. DO NOT EDIT. - -package mocks - -import ( - common "github.com/ethereum/go-ethereum/common" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - - mock "github.com/stretchr/testify/mock" - - types "github.com/smartcontractkit/chainlink/v2/common/types" -) - -// ChainHeadTracker is an autogenerated mock type for the ChainHeadTracker type -type ChainHeadTracker struct { - mock.Mock -} - -// HeadTracker provides a mock function with given fields: -func (_m *ChainHeadTracker) HeadTracker() types.HeadTracker[*evmtypes.Head, common.Hash] { - ret := _m.Called() - - var r0 types.HeadTracker[*evmtypes.Head, common.Hash] - if rf, ok := ret.Get(0).(func() types.HeadTracker[*evmtypes.Head, common.Hash]); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(types.HeadTracker[*evmtypes.Head, common.Hash]) - } - } - - return r0 -} - -// NewChainHeadTracker creates a new instance of ChainHeadTracker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewChainHeadTracker(t interface { - mock.TestingT - Cleanup(func()) -}) *ChainHeadTracker { - mock := &ChainHeadTracker{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/core/services/relay/evm/mercury/types/types.go b/core/services/relay/evm/mercury/types/types.go index 7059689939a..49bffb6c290 100644 --- a/core/services/relay/evm/mercury/types/types.go +++ b/core/services/relay/evm/mercury/types/types.go @@ -8,15 +8,9 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" - httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) -//go:generate mockery --quiet --name ChainHeadTracker --output ../mocks/ --case=underscore -type ChainHeadTracker interface { - HeadTracker() httypes.HeadTracker -} - type DataSourceORM interface { LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error) } diff --git a/core/services/relay/evm/mercury/v1/data_source.go b/core/services/relay/evm/mercury/v1/data_source.go index 0f8f56f46e4..0bdfb67de78 100644 --- a/core/services/relay/evm/mercury/v1/data_source.go +++ b/core/services/relay/evm/mercury/v1/data_source.go @@ -16,7 +16,6 @@ import ( relaymercury "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury" relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" @@ -67,7 +66,7 @@ type datasource struct { mu sync.RWMutex chEnhancedTelem chan<- ocrcommon.EnhancedTelemetryMercuryData - chainHeadTracker types.ChainHeadTracker + chainReader relaymercury.ChainReader fetcher Fetcher initialBlockNumber *int64 @@ -77,8 +76,8 @@ type datasource struct { var _ relaymercuryv1.DataSource = &datasource{} -func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker types.ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64, feedID mercuryutils.FeedID) *datasource { - return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber, insufficientBlocksCount.WithLabelValues(feedID.String()), zeroBlocksCount.WithLabelValues(feedID.String())} +func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainReader relaymercury.ChainReader, fetcher Fetcher, initialBlockNumber *int64, feedID mercuryutils.FeedID) *datasource { + return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainReader, fetcher, initialBlockNumber, insufficientBlocksCount.WithLabelValues(feedID.String()), zeroBlocksCount.WithLabelValues(feedID.String())} } type ErrEmptyLatestReport struct { @@ -94,7 +93,11 @@ func (e ErrEmptyLatestReport) Error() string { func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedBlockNum bool) (obs relaymercuryv1.Observation, pipelineExecutionErr error) { // setLatestBlocks must come chronologically before observations, along // with observationTimestamp, to avoid front-running - ds.setLatestBlocks(ctx, &obs) + + // Errors are not expected when reading from the underlying ChainReader + if err := ds.setLatestBlocks(ctx, &obs); err != nil { + return obs, err + } var wg sync.WaitGroup if fetchMaxFinalizedBlockNum { @@ -290,8 +293,13 @@ func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.T return run, trrs, err } -func (ds *datasource) setLatestBlocks(ctx context.Context, obs *relaymercuryv1.Observation) { - latestBlocks := ds.getLatestBlocks(ctx, nBlocksObservation) +func (ds *datasource) setLatestBlocks(ctx context.Context, obs *relaymercuryv1.Observation) error { + latestBlocks, err := ds.chainReader.LatestHeads(ctx, nBlocksObservation) + if err != nil { + ds.lggr.Errorw("failed to read latest blocks", "error", err) + return err + } + if len(latestBlocks) < nBlocksObservation { ds.insufficientBlocksCounter.Inc() ds.lggr.Warnw("Insufficient blocks", "latestBlocks", latestBlocks, "lenLatestBlocks", len(latestBlocks), "nBlocksObservation", nBlocksObservation) @@ -299,31 +307,22 @@ func (ds *datasource) setLatestBlocks(ctx context.Context, obs *relaymercuryv1.O // TODO: remove with https://smartcontract-it.atlassian.net/browse/BCF-2209 if len(latestBlocks) == 0 { + obsErr := fmt.Errorf("no blocks available") ds.zeroBlocksCounter.Inc() - err := errors.New("no blocks available") - obs.CurrentBlockNum.Err = err - obs.CurrentBlockHash.Err = err - obs.CurrentBlockTimestamp.Err = err + obs.CurrentBlockNum.Err = obsErr + obs.CurrentBlockHash.Err = obsErr + obs.CurrentBlockTimestamp.Err = obsErr } else { - obs.CurrentBlockNum.Val = latestBlocks[0].Number - obs.CurrentBlockHash.Val = latestBlocks[0].Hash.Bytes() - if latestBlocks[0].Timestamp.IsZero() { - obs.CurrentBlockTimestamp.Val = 0 - } else { - obs.CurrentBlockTimestamp.Val = uint64(latestBlocks[0].Timestamp.Unix()) - } + obs.CurrentBlockNum.Val = int64(latestBlocks[0].Number) + obs.CurrentBlockHash.Val = latestBlocks[0].Hash + obs.CurrentBlockTimestamp.Val = latestBlocks[0].Timestamp } for _, block := range latestBlocks { - obs.LatestBlocks = append(obs.LatestBlocks, relaymercuryv1.NewBlock(block.Number, block.Hash.Bytes(), uint64(block.Timestamp.Unix()))) + obs.LatestBlocks = append( + obs.LatestBlocks, + relaymercuryv1.NewBlock(int64(block.Number), block.Hash, block.Timestamp)) } -} -func (ds *datasource) getLatestBlocks(ctx context.Context, k int) (blocks []*evmtypes.Head) { - // Use the headtracker's view of the chain, this is very fast since - // it doesn't make any external network requests, and it is the - // headtracker's job to ensure it has an up-to-date view of the chain based - // on responses from all available RPC nodes - latestHead := ds.chainHeadTracker.HeadTracker().LatestChain() - return latestHead.AsSlice(k) + return nil } diff --git a/core/services/relay/evm/mercury/v1/data_source_test.go b/core/services/relay/evm/mercury/v1/data_source_test.go index 42983fa0022..40542c2631a 100644 --- a/core/services/relay/evm/mercury/v1/data_source_test.go +++ b/core/services/relay/evm/mercury/v1/data_source_test.go @@ -3,6 +3,7 @@ package mercury_v1 import ( "context" "fmt" + "io" "math/big" "math/rand" "testing" @@ -18,7 +19,6 @@ import ( relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1" commonmocks "github.com/smartcontractkit/chainlink/v2/common/mocks" "github.com/smartcontractkit/chainlink/v2/core/assets" - httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -26,8 +26,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" mercurymocks "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/mocks" - "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types" mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" reportcodecv1 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v1/reportcodec" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -52,14 +52,6 @@ func (m *mockFetcher) LatestTimestamp(context.Context) (int64, error) { return 0, nil } -var _ types.ChainHeadTracker = &mockHeadTracker{} - -type mockHeadTracker struct { - h httypes.HeadTracker -} - -func (m *mockHeadTracker) HeadTracker() httypes.HeadTracker { return m.h } - type mockORM struct { report []byte err error @@ -69,6 +61,15 @@ func (m *mockORM) LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg return m.report, m.err } +type mockChainReader struct { + err error + obs []relaymercury.Head +} + +func (m *mockChainReader) LatestHeads(context.Context, int) ([]relaymercury.Head, error) { + return m.obs, m.err +} + func TestMercury_Observe(t *testing.T) { orm := &mockORM{} lggr := logger.TestLogger(t) @@ -106,10 +107,7 @@ func TestMercury_Observe(t *testing.T) { ds.spec = spec h := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) - ht := &mockHeadTracker{ - h: h, - } - ds.chainHeadTracker = ht + ds.chainReader = evm.NewChainReader(h) head := &evmtypes.Head{ Number: int64(rand.Int31()), @@ -201,7 +199,7 @@ func TestMercury_Observe(t *testing.T) { t.Run("if no current block available", func(t *testing.T) { h2 := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) h2.On("LatestChain").Return((*evmtypes.Head)(nil)) - ht.h = h2 + ds.chainReader = evm.NewChainReader(h2) obs, err := ds.Observe(ctx, repts, true) assert.NoError(t, err) @@ -212,7 +210,7 @@ func TestMercury_Observe(t *testing.T) { }) }) - ht.h = h + ds.chainReader = evm.NewChainReader(h) t.Run("when fetchMaxFinalizedBlockNum=false", func(t *testing.T) { t.Run("when run execution fails, returns error", func(t *testing.T) { @@ -320,7 +318,7 @@ func TestMercury_Observe(t *testing.T) { t.Run("when chain length is zero", func(t *testing.T) { ht2 := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) ht2.On("LatestChain").Return((*evmtypes.Head)(nil)) - ht.h = ht2 + ds.chainReader = evm.NewChainReader(ht2) obs, err := ds.Observe(ctx, repts, true) assert.NoError(t, err) @@ -345,7 +343,7 @@ func TestMercury_Observe(t *testing.T) { ht2 := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) ht2.On("LatestChain").Return(h6) - ht.h = ht2 + ds.chainReader = evm.NewChainReader(ht2) obs, err := ds.Observe(ctx, repts, true) assert.NoError(t, err) @@ -384,7 +382,7 @@ func TestMercury_Observe(t *testing.T) { ht2 := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) ht2.On("LatestChain").Return(h6) - ht.h = ht2 + ds.chainReader = evm.NewChainReader(ht2) obs, err := ds.Observe(ctx, repts, true) assert.NoError(t, err) @@ -398,6 +396,18 @@ func TestMercury_Observe(t *testing.T) { ht2.AssertExpectations(t) }) + + t.Run("when chain reader returns an error", func(t *testing.T) { + + ds.chainReader = &mockChainReader{ + err: io.EOF, + obs: nil, + } + + obs, err := ds.Observe(ctx, repts, true) + assert.Error(t, err) + assert.Equal(t, obs, relaymercuryv1.Observation{}) + }) }) } @@ -418,39 +428,31 @@ func TestMercury_SetLatestBlocks(t *testing.T) { t.Run("returns head from headtracker if present", func(t *testing.T) { headTracker := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) - chainHeadTracker := mercurymocks.NewChainHeadTracker(t) - - chainHeadTracker.On("HeadTracker").Return(headTracker) headTracker.On("LatestChain").Return(&h, nil) - - ds.chainHeadTracker = chainHeadTracker + ds.chainReader = evm.NewChainReader(headTracker) obs := relaymercuryv1.Observation{} - ds.setLatestBlocks(context.Background(), &obs) + err := ds.setLatestBlocks(context.Background(), &obs) + assert.NoError(t, err) assert.Equal(t, h.Number, obs.CurrentBlockNum.Val) assert.Equal(t, h.Hash.Bytes(), obs.CurrentBlockHash.Val) assert.Equal(t, uint64(h.Timestamp.Unix()), obs.CurrentBlockTimestamp.Val) assert.Len(t, obs.LatestBlocks, 1) - - chainHeadTracker.AssertExpectations(t) headTracker.AssertExpectations(t) }) t.Run("if headtracker returns nil head", func(t *testing.T) { headTracker := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) - chainHeadTracker := mercurymocks.NewChainHeadTracker(t) - - chainHeadTracker.On("HeadTracker").Return(headTracker) // This can happen in some cases e.g. RPC node is offline headTracker.On("LatestChain").Return((*evmtypes.Head)(nil)) - ds.chainHeadTracker = chainHeadTracker - + ds.chainReader = evm.NewChainReader(headTracker) obs := relaymercuryv1.Observation{} - ds.setLatestBlocks(context.Background(), &obs) + err := ds.setLatestBlocks(context.Background(), &obs) + assert.NoError(t, err) assert.Zero(t, obs.CurrentBlockNum.Val) assert.Zero(t, obs.CurrentBlockHash.Val) assert.Zero(t, obs.CurrentBlockTimestamp.Val) @@ -459,8 +461,6 @@ func TestMercury_SetLatestBlocks(t *testing.T) { assert.EqualError(t, obs.CurrentBlockTimestamp.Err, "no blocks available") assert.Len(t, obs.LatestBlocks, 0) - - chainHeadTracker.AssertExpectations(t) headTracker.AssertExpectations(t) }) } diff --git a/core/services/relay/evm/mercury_provider.go b/core/services/relay/evm/mercury_provider.go index 914401c0897..bba5e699bc6 100644 --- a/core/services/relay/evm/mercury_provider.go +++ b/core/services/relay/evm/mercury_provider.go @@ -12,6 +12,7 @@ import ( relaymercuryv3 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v3" "github.com/smartcontractkit/chainlink-relay/pkg/services" relaytypes "github.com/smartcontractkit/chainlink-relay/pkg/types" + httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" @@ -25,6 +26,7 @@ type mercuryProvider struct { reportCodecV1 relaymercuryv1.ReportCodec reportCodecV2 relaymercuryv2.ReportCodec reportCodecV3 relaymercuryv3.ReportCodec + chainReader relaymercury.ChainReader logger logger.Logger ms services.MultiStart @@ -36,6 +38,7 @@ func NewMercuryProvider( reportCodecV1 relaymercuryv1.ReportCodec, reportCodecV2 relaymercuryv2.ReportCodec, reportCodecV3 relaymercuryv3.ReportCodec, + chainReader relaymercury.ChainReader, lggr logger.Logger, ) *mercuryProvider { return &mercuryProvider{ @@ -44,6 +47,7 @@ func NewMercuryProvider( reportCodecV1, reportCodecV2, reportCodecV3, + chainReader, lggr, services.MultiStart{}, } @@ -103,3 +107,37 @@ func (p *mercuryProvider) ContractTransmitter() ocrtypes.ContractTransmitter { func (p *mercuryProvider) MercuryServerFetcher() relaymercury.MercuryServerFetcher { return p.transmitter } + +func (p *mercuryProvider) ChainReader() relaymercury.ChainReader { + return p.chainReader +} + +var _ relaymercury.ChainReader = (*chainReader)(nil) + +type chainReader struct { + tracker httypes.HeadTracker +} + +func NewChainReader(h httypes.HeadTracker) relaymercury.ChainReader { + return &chainReader{ + tracker: h, + } +} + +func (r *chainReader) LatestHeads(ctx context.Context, k int) ([]relaymercury.Head, error) { + evmBlocks := r.tracker.LatestChain().AsSlice(k) + if len(evmBlocks) == 0 { + return nil, nil + } + + blocks := make([]relaymercury.Head, len(evmBlocks)) + for x := 0; x < len(evmBlocks); x++ { + blocks[x] = relaymercury.Head{ + Number: uint64(evmBlocks[x].BlockNumber()), + Hash: evmBlocks[x].Hash.Bytes(), + Timestamp: uint64(evmBlocks[x].Timestamp.Unix()), + } + } + + return blocks, nil +} diff --git a/go.mod b/go.mod index f35077e9234..8a4d58469c7 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255 - github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78 + github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742 github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb github.com/smartcontractkit/libocr v0.0.0-20231107151413-13e0202ae8d7 diff --git a/go.sum b/go.sum index 2c9d9e53715..1b96c936c5e 100644 --- a/go.sum +++ b/go.sum @@ -1465,8 +1465,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255 h1:Pt6c7bJU9wIN6PQQnmN8UmYYH6lpfiQ6U/B8yEC2s5s= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255/go.mod h1:EHppaccd/LTlTMI2o4dmBHe4BknEgEFFDjDGMNuGb3k= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78 h1:ZBsxdB/5iIpl/tWhXe/RHrOwBG7pbKOMeppy5Zt2BVc= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742 h1:28XkPE6YfJ4uabTX9/7sueRV6IKtY4hcm1nIt1e6b20= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 6a7e7195ff5..b5455838b58 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -387,7 +387,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255 // indirect - github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78 // indirect + github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742 // indirect github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 780e81c7ef1..fc486e54511 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -2369,8 +2369,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255 h1:Pt6c7bJU9wIN6PQQnmN8UmYYH6lpfiQ6U/B8yEC2s5s= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255/go.mod h1:EHppaccd/LTlTMI2o4dmBHe4BknEgEFFDjDGMNuGb3k= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78 h1:ZBsxdB/5iIpl/tWhXe/RHrOwBG7pbKOMeppy5Zt2BVc= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742 h1:28XkPE6YfJ4uabTX9/7sueRV6IKtY4hcm1nIt1e6b20= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8=