Skip to content

Commit

Permalink
mercury: Add ChainReader
Browse files Browse the repository at this point in the history
  • Loading branch information
brunotm committed Nov 9, 2023
1 parent 2552efa commit 25ee62d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 64 deletions.
5 changes: 3 additions & 2 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
pkgerrors "github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/jmoiron/sqlx"
"github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator"
"github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median"
"github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median/evmreportcodec"
"github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/smartcontractkit/sqlx"

"github.com/smartcontractkit/chainlink-relay/pkg/services"
relaytypes "github.com/smartcontractkit/chainlink-relay/pkg/types"
Expand Down Expand Up @@ -189,7 +189,8 @@ func (r *Relayer) NewMercuryProvider(rargs relaytypes.RelayArgs, pargs relaytype
}
transmitter := mercury.NewTransmitter(lggr, cw.ContractConfigTracker(), client, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.db, r.pgCfg, transmitterCodec)

return NewMercuryProvider(cw, transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil
chainReader := NewChainReader(r.chain.HeadTracker())
return NewMercuryProvider(cw, transmitter, reportCodecV1, reportCodecV2, reportCodecV3, chainReader, lggr), nil
}

func (r *Relayer) NewFunctionsProvider(rargs relaytypes.RelayArgs, pargs relaytypes.PluginArgs) (relaytypes.FunctionsProvider, error) {
Expand Down
6 changes: 0 additions & 6 deletions core/services/relay/evm/mercury/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,9 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

//go:generate mockery --quiet --name ChainHeadTracker --output ../mocks/ --case=underscore
type ChainHeadTracker interface {
HeadTracker() httypes.HeadTracker
}

type DataSourceORM interface {
LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error)
}
Expand Down
33 changes: 10 additions & 23 deletions core/services/relay/evm/mercury/v1/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
relaymercury "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury"
relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1"

evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
Expand Down Expand Up @@ -67,7 +66,7 @@ type datasource struct {
mu sync.RWMutex

chEnhancedTelem chan<- ocrcommon.EnhancedTelemetryMercuryData
chainHeadTracker types.ChainHeadTracker
chainReader relaymercury.ChainReader
fetcher Fetcher
initialBlockNumber *int64

Expand All @@ -77,8 +76,8 @@ type datasource struct {

var _ relaymercuryv1.DataSource = &datasource{}

func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker types.ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64, feedID mercuryutils.FeedID) *datasource {
return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber, insufficientBlocksCount.WithLabelValues(feedID.String()), zeroBlocksCount.WithLabelValues(feedID.String())}
func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainReader relaymercury.ChainReader, fetcher Fetcher, initialBlockNumber *int64, feedID mercuryutils.FeedID) *datasource {
return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainReader, fetcher, initialBlockNumber, insufficientBlocksCount.WithLabelValues(feedID.String()), zeroBlocksCount.WithLabelValues(feedID.String())}
}

type ErrEmptyLatestReport struct {
Expand Down Expand Up @@ -291,7 +290,7 @@ func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.T
}

func (ds *datasource) setLatestBlocks(ctx context.Context, obs *relaymercuryv1.Observation) {
latestBlocks := ds.getLatestBlocks(ctx, nBlocksObservation)
latestBlocks, err := ds.chainReader.LatestBlocks(ctx, nBlocksObservation)
if len(latestBlocks) < nBlocksObservation {
ds.insufficientBlocksCounter.Inc()
ds.lggr.Warnw("Insufficient blocks", "latestBlocks", latestBlocks, "lenLatestBlocks", len(latestBlocks), "nBlocksObservation", nBlocksObservation)
Expand All @@ -300,30 +299,18 @@ func (ds *datasource) setLatestBlocks(ctx context.Context, obs *relaymercuryv1.O
// TODO: remove with https://smartcontract-it.atlassian.net/browse/BCF-2209
if len(latestBlocks) == 0 {
ds.zeroBlocksCounter.Inc()
err := errors.New("no blocks available")
obs.CurrentBlockNum.Err = err
obs.CurrentBlockHash.Err = err
obs.CurrentBlockTimestamp.Err = err
} else {
obs.CurrentBlockNum.Val = latestBlocks[0].Number
obs.CurrentBlockHash.Val = latestBlocks[0].Hash.Bytes()
if latestBlocks[0].Timestamp.IsZero() {
obs.CurrentBlockTimestamp.Val = 0
} else {
obs.CurrentBlockTimestamp.Val = uint64(latestBlocks[0].Timestamp.Unix())
}
obs.CurrentBlockNum.Val = int64(latestBlocks[0].Number)
obs.CurrentBlockHash.Val = latestBlocks[0].Hash
obs.CurrentBlockTimestamp.Val = latestBlocks[0].Timestamp
}

for _, block := range latestBlocks {
obs.LatestBlocks = append(obs.LatestBlocks, relaymercuryv1.NewBlock(block.Number, block.Hash.Bytes(), uint64(block.Timestamp.Unix())))
obs.LatestBlocks = append(
obs.LatestBlocks,
relaymercuryv1.NewBlock(int64(block.Number), block.Hash, block.Timestamp))
}
}

func (ds *datasource) getLatestBlocks(ctx context.Context, k int) (blocks []*evmtypes.Head) {
// Use the headtracker's view of the chain, this is very fast since
// it doesn't make any external network requests, and it is the
// headtracker's job to ensure it has an up-to-date view of the chain based
// on responses from all available RPC nodes
latestHead := ds.chainHeadTracker.HeadTracker().LatestChain()
return latestHead.AsSlice(k)
}
42 changes: 9 additions & 33 deletions core/services/relay/evm/mercury/v1/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ import (
relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1"
commonmocks "github.com/smartcontractkit/chainlink/v2/common/mocks"
"github.com/smartcontractkit/chainlink/v2/core/assets"
httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"

"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
mercurymocks "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
reportcodecv1 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v1/reportcodec"
"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand All @@ -52,14 +51,6 @@ func (m *mockFetcher) LatestTimestamp(context.Context) (int64, error) {
return 0, nil
}

var _ types.ChainHeadTracker = &mockHeadTracker{}

type mockHeadTracker struct {
h httypes.HeadTracker
}

func (m *mockHeadTracker) HeadTracker() httypes.HeadTracker { return m.h }

type mockORM struct {
report []byte
err error
Expand Down Expand Up @@ -106,10 +97,7 @@ func TestMercury_Observe(t *testing.T) {
ds.spec = spec

h := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
ht := &mockHeadTracker{
h: h,
}
ds.chainHeadTracker = ht
ds.chainReader = evm.NewChainReader(h)

head := &evmtypes.Head{
Number: int64(rand.Int31()),
Expand Down Expand Up @@ -201,7 +189,7 @@ func TestMercury_Observe(t *testing.T) {
t.Run("if no current block available", func(t *testing.T) {
h2 := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
h2.On("LatestChain").Return((*evmtypes.Head)(nil))
ht.h = h2
ds.chainReader = evm.NewChainReader(h2)

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)
Expand All @@ -212,7 +200,7 @@ func TestMercury_Observe(t *testing.T) {
})
})

ht.h = h
ds.chainReader = evm.NewChainReader(h)

t.Run("when fetchMaxFinalizedBlockNum=false", func(t *testing.T) {
t.Run("when run execution fails, returns error", func(t *testing.T) {
Expand Down Expand Up @@ -320,7 +308,7 @@ func TestMercury_Observe(t *testing.T) {
t.Run("when chain length is zero", func(t *testing.T) {
ht2 := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
ht2.On("LatestChain").Return((*evmtypes.Head)(nil))
ht.h = ht2
ds.chainReader = evm.NewChainReader(ht2)

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)
Expand All @@ -345,7 +333,7 @@ func TestMercury_Observe(t *testing.T) {

ht2 := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
ht2.On("LatestChain").Return(h6)
ht.h = ht2
ds.chainReader = evm.NewChainReader(ht2)

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)
Expand Down Expand Up @@ -384,7 +372,7 @@ func TestMercury_Observe(t *testing.T) {

ht2 := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
ht2.On("LatestChain").Return(h6)
ht.h = ht2
ds.chainReader = evm.NewChainReader(ht2)

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)
Expand Down Expand Up @@ -418,12 +406,8 @@ func TestMercury_SetLatestBlocks(t *testing.T) {

t.Run("returns head from headtracker if present", func(t *testing.T) {
headTracker := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
chainHeadTracker := mercurymocks.NewChainHeadTracker(t)

chainHeadTracker.On("HeadTracker").Return(headTracker)
headTracker.On("LatestChain").Return(&h, nil)

ds.chainHeadTracker = chainHeadTracker
ds.chainReader = evm.NewChainReader(headTracker)

obs := relaymercuryv1.Observation{}
ds.setLatestBlocks(context.Background(), &obs)
Expand All @@ -433,21 +417,15 @@ func TestMercury_SetLatestBlocks(t *testing.T) {
assert.Equal(t, uint64(h.Timestamp.Unix()), obs.CurrentBlockTimestamp.Val)

assert.Len(t, obs.LatestBlocks, 1)

chainHeadTracker.AssertExpectations(t)
headTracker.AssertExpectations(t)
})

t.Run("if headtracker returns nil head", func(t *testing.T) {
headTracker := commonmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
chainHeadTracker := mercurymocks.NewChainHeadTracker(t)

chainHeadTracker.On("HeadTracker").Return(headTracker)
// This can happen in some cases e.g. RPC node is offline
headTracker.On("LatestChain").Return((*evmtypes.Head)(nil))

ds.chainHeadTracker = chainHeadTracker

ds.chainReader = evm.NewChainReader(headTracker)
obs := relaymercuryv1.Observation{}
ds.setLatestBlocks(context.Background(), &obs)

Expand All @@ -459,8 +437,6 @@ func TestMercury_SetLatestBlocks(t *testing.T) {
assert.EqualError(t, obs.CurrentBlockTimestamp.Err, "no blocks available")

assert.Len(t, obs.LatestBlocks, 0)

chainHeadTracker.AssertExpectations(t)
headTracker.AssertExpectations(t)
})
}
Expand Down
39 changes: 39 additions & 0 deletions core/services/relay/evm/mercury_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package evm
import (
"context"
"errors"
"fmt"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

Expand All @@ -12,6 +13,7 @@ import (
relaymercuryv3 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v3"
"github.com/smartcontractkit/chainlink-relay/pkg/services"
relaytypes "github.com/smartcontractkit/chainlink-relay/pkg/types"
httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury"
Expand All @@ -25,6 +27,7 @@ type mercuryProvider struct {
reportCodecV1 relaymercuryv1.ReportCodec
reportCodecV2 relaymercuryv2.ReportCodec
reportCodecV3 relaymercuryv3.ReportCodec
chainReader relaymercury.ChainReader
logger logger.Logger

ms services.MultiStart
Expand All @@ -36,6 +39,7 @@ func NewMercuryProvider(
reportCodecV1 relaymercuryv1.ReportCodec,
reportCodecV2 relaymercuryv2.ReportCodec,
reportCodecV3 relaymercuryv3.ReportCodec,
chainReader relaymercury.ChainReader,
lggr logger.Logger,
) *mercuryProvider {
return &mercuryProvider{
Expand All @@ -44,6 +48,7 @@ func NewMercuryProvider(
reportCodecV1,
reportCodecV2,
reportCodecV3,
chainReader,
lggr,
services.MultiStart{},
}
Expand Down Expand Up @@ -103,3 +108,37 @@ func (p *mercuryProvider) ContractTransmitter() ocrtypes.ContractTransmitter {
func (p *mercuryProvider) MercuryServerFetcher() relaymercury.MercuryServerFetcher {
return p.transmitter
}

func (p *mercuryProvider) ChainReader() relaymercury.ChainReader {
return p.chainReader
}

var _ relaymercury.ChainReader = (*chainReader)(nil)

type chainReader struct {
tracker httypes.HeadTracker
}

func NewChainReader(h httypes.HeadTracker) relaymercury.ChainReader {
return &chainReader{
tracker: h,
}
}

func (r *chainReader) LatestBlocks(ctx context.Context, k int) ([]relaymercury.Block, error) {
evmBlocks := r.tracker.LatestChain().AsSlice(k)
if len(evmBlocks) == 0 {
return nil, fmt.Errorf("no blocks available")
}

blocks := make([]relaymercury.Block, len(evmBlocks))
for x := 0; x < len(evmBlocks); x++ {
blocks[x] = relaymercury.Block{
Number: uint64(evmBlocks[x].BlockNumber()),
Hash: evmBlocks[x].Hash[:],
Timestamp: uint64(evmBlocks[x].Timestamp.Unix()),
}
}

return blocks, nil
}

0 comments on commit 25ee62d

Please sign in to comment.