Skip to content

Commit

Permalink
Merge branch 'develop' into TT-653-Migrate-CTF-Packages-To-CTF
Browse files Browse the repository at this point in the history
  • Loading branch information
tateexon authored Nov 13, 2023
2 parents 8baaf97 + 1206283 commit 2369eae
Show file tree
Hide file tree
Showing 17 changed files with 144 additions and 138 deletions.
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1464,8 +1464,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255 h1:Pt6c7bJU9wIN6PQQnmN8UmYYH6lpfiQ6U/B8yEC2s5s=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20231109141932-cb1ea9020255/go.mod h1:EHppaccd/LTlTMI2o4dmBHe4BknEgEFFDjDGMNuGb3k=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78 h1:ZBsxdB/5iIpl/tWhXe/RHrOwBG7pbKOMeppy5Zt2BVc=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108205920-694ce17a4a78/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742 h1:28XkPE6YfJ4uabTX9/7sueRV6IKtY4hcm1nIt1e6b20=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231108215906-8bbaf383b742/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8=
Expand Down
6 changes: 1 addition & 5 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,10 +653,6 @@ func (d *Delegate) newServicesMercury(
if err != nil {
return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: "mercury"}
}
chain, err := d.legacyChains.Get(rid.ChainID)
if err != nil {
return nil, fmt.Errorf("mercury services: failed to get chain %s: %w", rid.ChainID, err)
}

provider, err2 := relayer.NewPluginProvider(ctx,
types.RelayArgs{
Expand Down Expand Up @@ -695,7 +691,7 @@ func (d *Delegate) newServicesMercury(

chEnhancedTelem := make(chan ocrcommon.EnhancedTelemetryMercuryData, 100)

mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))
mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))

if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury"))
Expand Down
3 changes: 1 addition & 2 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func NewServices(
argsNoPlugin libocr2.MercuryOracleArgs,
cfg Config,
chEnhancedTelem chan ocrcommon.EnhancedTelemetryMercuryData,
chainHeadTracker types.ChainHeadTracker,
orm types.DataSourceORM,
feedID utils.FeedID,
) ([]job.ServiceCtx, error) {
Expand Down Expand Up @@ -66,7 +65,7 @@ func NewServices(
lggr,
runResults,
chEnhancedTelem,
chainHeadTracker,
ocr2Provider.ChainReader(),
ocr2Provider.MercuryServerFetcher(),
pluginConfig.InitialBlockNumber.Ptr(),
feedID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func (r *EvmRegistry) CheckUpkeeps(ctx context.Context, keys ...ocr2keepers.Upke
}

chResult := make(chan checkResult, 1)
go r.doCheck(ctx, keys, chResult)

r.threadCtrl.Go(func(ctx context.Context) {
r.doCheck(ctx, keys, chResult)
})

select {
case rs := <-chResult:
Expand Down
16 changes: 13 additions & 3 deletions core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,15 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep
}

var wg sync.WaitGroup

for i, lookup := range lookups {
i := i
wg.Add(1)
go r.doLookup(ctx, &wg, lookup, i, checkResults, lggr)
r.threadCtrl.Go(func(ctx context.Context) {
r.doLookup(ctx, &wg, lookup, i, checkResults, lggr)
})
}

wg.Wait()

// don't surface error to plugin bc StreamsLookup process should be self-contained.
Expand Down Expand Up @@ -289,14 +294,19 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, p
if sl.FeedParamKey == feedIdHex && sl.TimeParamKey == blockNumber {
// only mercury v0.2
for i := range sl.Feeds {
go r.singleFeedRequest(ctx, ch, i, sl, lggr)
i := i
r.threadCtrl.Go(func(ctx context.Context) {
r.singleFeedRequest(ctx, ch, i, sl, lggr)
})
}
} else if sl.FeedParamKey == feedIDs {
// only mercury v0.3
resultLen = 1
isMercuryV03 = true
ch = make(chan MercuryData, resultLen)
go r.multiFeedsRequest(ctx, ch, sl, lggr)
r.threadCtrl.Go(func(ctx context.Context) {
r.multiFeedsRequest(ctx, ch, sl, lggr)
})
} else {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/encoding"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/mocks"
"github.com/smartcontractkit/chainlink/v2/core/utils"

evmClientMocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -70,7 +71,8 @@ func setupEVMRegistry(t *testing.T) *EvmRegistry {
allowListCache: cache.New(defaultAllowListExpiration, cleanupInterval),
pluginRetryCache: cache.New(defaultPluginRetryExpiration, cleanupInterval),
},
hc: mockHttpClient,
hc: mockHttpClient,
threadCtrl: utils.NewThreadControl(),
}
return r
}
Expand Down Expand Up @@ -220,6 +222,7 @@ func TestEvmRegistry_StreamsLookup(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)
defer r.Close()
client := new(evmClientMocks.Client)
r.client = client

Expand Down Expand Up @@ -362,6 +365,7 @@ func TestEvmRegistry_AllowedToUseMercury(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)
defer r.Close()

client := new(evmClientMocks.Client)
r.client = client
Expand Down Expand Up @@ -576,9 +580,12 @@ func TestEvmRegistry_DoMercuryRequestV02(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)
defer r.Close()

if tt.pluginRetries != 0 {
r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration)
}

hc := mocks.NewHttpClient(t)

for _, blob := range tt.mockChainlinkBlobs {
Expand Down Expand Up @@ -812,6 +819,8 @@ func TestEvmRegistry_SingleFeedRequest(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)
defer r.Close()

hc := mocks.NewHttpClient(t)

mr := MercuryV02Response{ChainlinkBlob: tt.blob}
Expand Down Expand Up @@ -1157,6 +1166,8 @@ func TestEvmRegistry_MultiFeedRequest(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := setupEVMRegistry(t)
defer r.Close()

if tt.pluginRetries != 0 {
r.mercury.pluginRetryCache.Set(tt.pluginRetryKey, tt.pluginRetries, cache.DefaultExpiration)
}
Expand Down Expand Up @@ -1319,6 +1330,8 @@ func TestEvmRegistry_CheckCallback(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
client := new(evmClientMocks.Client)
r := setupEVMRegistry(t)
defer r.Close()

payload, err := r.abi.Pack("checkCallback", tt.lookup.upkeepId, values, tt.lookup.ExtraData)
require.Nil(t, err)
args := map[string]interface{}{
Expand Down
5 changes: 3 additions & 2 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
pkgerrors "github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/jmoiron/sqlx"
"github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator"
"github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median"
"github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median/evmreportcodec"
Expand Down Expand Up @@ -189,7 +189,8 @@ func (r *Relayer) NewMercuryProvider(rargs relaytypes.RelayArgs, pargs relaytype
}
transmitter := mercury.NewTransmitter(lggr, cw.ContractConfigTracker(), client, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.db, r.pgCfg, transmitterCodec)

return NewMercuryProvider(cw, transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil
chainReader := NewChainReader(r.chain.HeadTracker())
return NewMercuryProvider(cw, transmitter, reportCodecV1, reportCodecV2, reportCodecV3, chainReader, lggr), nil
}

func (r *Relayer) NewFunctionsProvider(rargs relaytypes.RelayArgs, pargs relaytypes.PluginArgs) (relaytypes.FunctionsProvider, error) {
Expand Down
47 changes: 0 additions & 47 deletions core/services/relay/evm/mercury/mocks/chain_head_tracker.go

This file was deleted.

6 changes: 0 additions & 6 deletions core/services/relay/evm/mercury/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,9 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

//go:generate mockery --quiet --name ChainHeadTracker --output ../mocks/ --case=underscore
type ChainHeadTracker interface {
HeadTracker() httypes.HeadTracker
}

type DataSourceORM interface {
LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error)
}
Expand Down
53 changes: 26 additions & 27 deletions core/services/relay/evm/mercury/v1/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
relaymercury "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury"
relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1"

evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
Expand Down Expand Up @@ -67,7 +66,7 @@ type datasource struct {
mu sync.RWMutex

chEnhancedTelem chan<- ocrcommon.EnhancedTelemetryMercuryData
chainHeadTracker types.ChainHeadTracker
chainReader relaymercury.ChainReader
fetcher Fetcher
initialBlockNumber *int64

Expand All @@ -77,8 +76,8 @@ type datasource struct {

var _ relaymercuryv1.DataSource = &datasource{}

func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker types.ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64, feedID mercuryutils.FeedID) *datasource {
return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber, insufficientBlocksCount.WithLabelValues(feedID.String()), zeroBlocksCount.WithLabelValues(feedID.String())}
func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainReader relaymercury.ChainReader, fetcher Fetcher, initialBlockNumber *int64, feedID mercuryutils.FeedID) *datasource {
return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainReader, fetcher, initialBlockNumber, insufficientBlocksCount.WithLabelValues(feedID.String()), zeroBlocksCount.WithLabelValues(feedID.String())}
}

type ErrEmptyLatestReport struct {
Expand All @@ -94,7 +93,11 @@ func (e ErrEmptyLatestReport) Error() string {
func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedBlockNum bool) (obs relaymercuryv1.Observation, pipelineExecutionErr error) {
// setLatestBlocks must come chronologically before observations, along
// with observationTimestamp, to avoid front-running
ds.setLatestBlocks(ctx, &obs)

// Errors are not expected when reading from the underlying ChainReader
if err := ds.setLatestBlocks(ctx, &obs); err != nil {
return obs, err
}

var wg sync.WaitGroup
if fetchMaxFinalizedBlockNum {
Expand Down Expand Up @@ -290,40 +293,36 @@ func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.T
return run, trrs, err
}

func (ds *datasource) setLatestBlocks(ctx context.Context, obs *relaymercuryv1.Observation) {
latestBlocks := ds.getLatestBlocks(ctx, nBlocksObservation)
func (ds *datasource) setLatestBlocks(ctx context.Context, obs *relaymercuryv1.Observation) error {
latestBlocks, err := ds.chainReader.LatestHeads(ctx, nBlocksObservation)
if err != nil {
ds.lggr.Errorw("failed to read latest blocks", "error", err)
return err
}

if len(latestBlocks) < nBlocksObservation {
ds.insufficientBlocksCounter.Inc()
ds.lggr.Warnw("Insufficient blocks", "latestBlocks", latestBlocks, "lenLatestBlocks", len(latestBlocks), "nBlocksObservation", nBlocksObservation)
}

// TODO: remove with https://smartcontract-it.atlassian.net/browse/BCF-2209
if len(latestBlocks) == 0 {
obsErr := fmt.Errorf("no blocks available")
ds.zeroBlocksCounter.Inc()
err := errors.New("no blocks available")
obs.CurrentBlockNum.Err = err
obs.CurrentBlockHash.Err = err
obs.CurrentBlockTimestamp.Err = err
obs.CurrentBlockNum.Err = obsErr
obs.CurrentBlockHash.Err = obsErr
obs.CurrentBlockTimestamp.Err = obsErr
} else {
obs.CurrentBlockNum.Val = latestBlocks[0].Number
obs.CurrentBlockHash.Val = latestBlocks[0].Hash.Bytes()
if latestBlocks[0].Timestamp.IsZero() {
obs.CurrentBlockTimestamp.Val = 0
} else {
obs.CurrentBlockTimestamp.Val = uint64(latestBlocks[0].Timestamp.Unix())
}
obs.CurrentBlockNum.Val = int64(latestBlocks[0].Number)
obs.CurrentBlockHash.Val = latestBlocks[0].Hash
obs.CurrentBlockTimestamp.Val = latestBlocks[0].Timestamp
}

for _, block := range latestBlocks {
obs.LatestBlocks = append(obs.LatestBlocks, relaymercuryv1.NewBlock(block.Number, block.Hash.Bytes(), uint64(block.Timestamp.Unix())))
obs.LatestBlocks = append(
obs.LatestBlocks,
relaymercuryv1.NewBlock(int64(block.Number), block.Hash, block.Timestamp))
}
}

func (ds *datasource) getLatestBlocks(ctx context.Context, k int) (blocks []*evmtypes.Head) {
// Use the headtracker's view of the chain, this is very fast since
// it doesn't make any external network requests, and it is the
// headtracker's job to ensure it has an up-to-date view of the chain based
// on responses from all available RPC nodes
latestHead := ds.chainHeadTracker.HeadTracker().LatestChain()
return latestHead.AsSlice(k)
return nil
}
Loading

0 comments on commit 2369eae

Please sign in to comment.