Skip to content

Commit

Permalink
Adding missing prom reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Sep 2, 2024
1 parent feda4f4 commit af42df8
Show file tree
Hide file tree
Showing 6 changed files with 675 additions and 31 deletions.
5 changes: 5 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ packages:
ORM:
Runner:
PipelineParamUnmarshaler:
github.com/smartcontractkit/chainlink/v2/core/services/promreporter:
config:
dir: core/internal/mocks
interfaces:
PrometheusBackend:
github.com/smartcontractkit/chainlink/v2/core/services/headreporter:
config:
dir: "{{ .InterfaceDir }}"
Expand Down
2 changes: 2 additions & 0 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
LoopRegistry: plugins.NewLoopRegistry(lggr, nil),
MercuryPool: mercuryPool,
CapabilitiesRegistry: capabilitiesRegistry,
CapabilitiesDispatcher: dispatcher,
CapabilitiesPeerWrapper: peerWrapper,
})

require.NoError(t, err)
Expand Down
204 changes: 204 additions & 0 deletions core/internal/mocks/prometheus_backend.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 29 additions & 31 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

"github.com/smartcontractkit/chainlink/v2/core/capabilities"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/services/standardcapabilities"
"github.com/smartcontractkit/chainlink/v2/core/static"

"github.com/smartcontractkit/chainlink/v2/core/services/promreporter"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/build"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
Expand All @@ -46,7 +49,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/feeds"
"github.com/smartcontractkit/chainlink/v2/core/services/fluxmonitorv2"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway"
"github.com/smartcontractkit/chainlink/v2/core/services/headreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keeper"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
Expand Down Expand Up @@ -180,6 +182,8 @@ type ApplicationOpts struct {
GRPCOpts loop.GRPCOpts
MercuryPool wsrpc.Pool
CapabilitiesRegistry *capabilities.Registry
CapabilitiesDispatcher remotetypes.Dispatcher
CapabilitiesPeerWrapper p2ptypes.PeerWrapper
}

// NewApplication initializes a new store if one is not already
Expand All @@ -199,20 +203,36 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
restrictedHTTPClient := opts.RestrictedHTTPClient
unrestrictedHTTPClient := opts.UnrestrictedHTTPClient

if opts.CapabilitiesRegistry == nil { // for tests only, in prod Registry is always set at this point
if opts.CapabilitiesRegistry == nil {
// for tests only, in prod Registry should always be set at this point
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger)
}

var externalPeerWrapper p2ptypes.PeerWrapper
var capabilityRegistrySyncer registrysyncer.Syncer
if cfg.Capabilities().Peering().Enabled() {
var dispatcher remotetypes.Dispatcher
if opts.CapabilitiesDispatcher == nil {
externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger)
signer := externalPeer
externalPeerWrapper = externalPeer
remoteDispatcher := remote.NewDispatcher(externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger)
srvcs = append(srvcs, remoteDispatcher)

dispatcher = remoteDispatcher
} else {
dispatcher = opts.CapabilitiesDispatcher
externalPeerWrapper = opts.CapabilitiesPeerWrapper
}

srvcs = append(srvcs, externalPeerWrapper)

if cfg.Capabilities().ExternalRegistry().Address() != "" {
rid := cfg.Capabilities().ExternalRegistry().RelayID()
registryAddress := cfg.Capabilities().ExternalRegistry().Address()
relayer, err := relayerChainInterops.Get(rid)
if err != nil {
return nil, fmt.Errorf("could not fetch relayer %s configured for capabilities registry: %w", rid, err)
}

registrySyncer, err := registrysyncer.New(
globalLogger,
externalPeerWrapper,
Expand All @@ -223,31 +243,15 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
return nil, fmt.Errorf("could not configure syncer: %w", err)
}

capabilityRegistrySyncer = registrySyncer
srvcs = append(srvcs, capabilityRegistrySyncer)
}

if cfg.Capabilities().Peering().Enabled() {
if capabilityRegistrySyncer == nil {
return nil, errors.Errorf("peering enabled but no capability registry found")
}
externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger)
signer := externalPeer
externalPeerWrapper = externalPeer

srvcs = append(srvcs, externalPeerWrapper)

dispatcher := remote.NewDispatcher(externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger)

wfLauncher := capabilities.NewLauncher(
globalLogger,
externalPeerWrapper,
dispatcher,
opts.CapabilitiesRegistry,
)
registrySyncer.AddLauncher(wfLauncher)

capabilityRegistrySyncer.AddLauncher(wfLauncher)
srvcs = append(srvcs, dispatcher, wfLauncher)
srvcs = append(srvcs, dispatcher, wfLauncher, registrySyncer)
}

// LOOPs can be created as options, in the case of LOOP relayers, or
Expand Down Expand Up @@ -320,6 +324,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

srvcs = append(srvcs, mailMon)
srvcs = append(srvcs, relayerChainInterops.Services()...)
promReporter := promreporter.NewPromReporter(opts.DS, legacyEVMChains, globalLogger)
srvcs = append(srvcs, promReporter)

// Initialize Local Users ORM and Authentication Provider specified in config
// BasicAdminUsersORM is initialized and required regardless of separate Authentication Provider
Expand Down Expand Up @@ -359,16 +365,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
workflowORM = workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock())
)

promReporter := headreporter.NewPrometheusReporter(opts.DS, legacyEVMChains)
chainIDs := make([]*big.Int, legacyEVMChains.Len())
for i, chain := range legacyEVMChains.Slice() {
chainIDs[i] = chain.ID()
}
telemReporter := headreporter.NewTelemetryReporter(telemetryManager, globalLogger, chainIDs...)
headReporter := headreporter.NewHeadReporterService(opts.DS, globalLogger, promReporter, telemReporter)
srvcs = append(srvcs, headReporter)
for _, chain := range legacyEVMChains.Slice() {
chain.HeadBroadcaster().Subscribe(headReporter)
chain.HeadBroadcaster().Subscribe(promReporter)
chain.TxManager().RegisterResumeCallback(pipelineRunner.ResumeRun)
}

Expand Down
Loading

0 comments on commit af42df8

Please sign in to comment.