Skip to content

Commit

Permalink
Add two new metrics for monitoring LLO transmitter health (also, impl…
Browse files Browse the repository at this point in the history
…ement app-scoped prometheus registerer) (#15362)

* Add two new metrics for monitoring LLO transmitter health

(Thread utilization)

* Clean up and organize LLO metrics

* Implement an application registry

* Fix unregister/close order

* Fix changeset
  • Loading branch information
samsondav authored Nov 25, 2024
1 parent b5b1e00 commit 6ea4588
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 104 deletions.
11 changes: 11 additions & 0 deletions .changeset/kind-parents-jump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"chainlink": patch
---

Add two new metrics for monitoring LLO transmitter health #added

`llo_mercurytransmitter_concurrent_transmit_gauge`
Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.

`llo_mercurytransmitter_concurrent_delete_gauge`
Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.
2 changes: 2 additions & 0 deletions core/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"slices"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli"

"github.com/smartcontractkit/chainlink/v2/core/build"
Expand Down Expand Up @@ -85,6 +86,7 @@ func NewApp(s *Shell) *cli.App {
}

s.Logger = lggr
s.Registerer = prometheus.DefaultRegisterer // use the global DefaultRegisterer, should be safe since we only ever run one instance of the app per shell
s.CloseLogger = closeFn
s.Config = cfg

Expand Down
13 changes: 8 additions & 5 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -62,7 +63,7 @@ import (

var (
initGlobalsOnce sync.Once
prometheus *ginprom.Prometheus
ginPrometheus *ginprom.Prometheus
grpcOpts loop.GRPCOpts
)

Expand All @@ -71,7 +72,7 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme
var err error
initGlobalsOnce.Do(func() {
err = func() error {
prometheus = ginprom.New(ginprom.Namespace("service"), ginprom.Token(cfgProm.AuthToken()))
ginPrometheus = ginprom.New(ginprom.Namespace("service"), ginprom.Token(cfgProm.AuthToken()))
grpcOpts = loop.NewGRPCOpts(nil) // default prometheus.Registerer

otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
Expand Down Expand Up @@ -139,6 +140,7 @@ type Shell struct {
Renderer
Config chainlink.GeneralConfig // initialized in Before
Logger logger.Logger // initialized in Before
Registerer prometheus.Registerer // initialized in Before
CloseLogger func() error // called in After
AppFactory AppFactory
KeyStoreAuthenticator TerminalKeyStoreAuthenticator
Expand Down Expand Up @@ -178,14 +180,14 @@ func (s *Shell) configExitErr(validateFn func() error) cli.ExitCoder {

// AppFactory implements the NewApplication method.
type AppFactory interface {
NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (chainlink.Application, error)
NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (chainlink.Application, error)
}

// ChainlinkAppFactory is used to create a new Application.
type ChainlinkAppFactory struct{}

// NewApplication returns a new instance of the node with the given config.
func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (app chainlink.Application, err error) {
func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (app chainlink.Application, err error) {
err = migrate.SetMigrationENVVars(cfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -237,6 +239,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
// create the relayer-chain interoperators from application configuration
relayerFactory := chainlink.RelayerFactory{
Logger: appLggr,
Registerer: appRegisterer,
LoopRegistry: loopRegistry,
GRPCOpts: grpcOpts,
MercuryPool: mercuryPool,
Expand Down Expand Up @@ -425,7 +428,7 @@ func (n ChainlinkRunner) Run(ctx context.Context, app chainlink.Application) err
return errors.New("You must specify at least one port to listen on")
}

handler, err := web.NewRouter(app, prometheus)
handler, err := web.NewRouter(app, ginPrometheus)
if err != nil {
return errors.Wrap(err, "failed to create web router")
}
Expand Down
6 changes: 3 additions & 3 deletions core/cmd/shell_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (s *Shell) runNode(c *cli.Context) error {
// From now on, DB locks and DB connection will be released on every return.
// Keep watching on logger.Fatal* calls and os.Exit(), because defer will not be executed.

app, err := s.AppFactory.NewApplication(rootCtx, s.Config, s.Logger, ldb.DB(), s.KeyStoreAuthenticator)
app, err := s.AppFactory.NewApplication(rootCtx, s.Config, s.Logger, s.Registerer, ldb.DB(), s.KeyStoreAuthenticator)
if err != nil {
return s.errorOut(errors.Wrap(err, "fatal error instantiating application"))
}
Expand Down Expand Up @@ -629,7 +629,7 @@ func (s *Shell) RebroadcastTransactions(c *cli.Context) (err error) {
}
defer lggr.ErrorIfFn(db.Close, "Error closing db")

app, err := s.AppFactory.NewApplication(ctx, s.Config, lggr, db, s.KeyStoreAuthenticator)
app, err := s.AppFactory.NewApplication(ctx, s.Config, lggr, s.Registerer, db, s.KeyStoreAuthenticator)
if err != nil {
return s.errorOut(errors.Wrap(err, "fatal error instantiating application"))
}
Expand Down Expand Up @@ -1275,7 +1275,7 @@ func (s *Shell) RemoveBlocks(c *cli.Context) error {
// From now on, DB locks and DB connection will be released on every return.
// Keep watching on logger.Fatal* calls and os.Exit(), because defer will not be executed.

app, err := s.AppFactory.NewApplication(ctx, s.Config, s.Logger, ldb.DB(), s.KeyStoreAuthenticator)
app, err := s.AppFactory.NewApplication(ctx, s.Config, s.Logger, s.Registerer, ldb.DB(), s.KeyStoreAuthenticator)
if err != nil {
return s.errorOut(errors.Wrap(err, "fatal error instantiating application"))
}
Expand Down
2 changes: 2 additions & 0 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/manyminds/api2go/jsonapi"
"github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -406,6 +407,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
Logger: lggr,
LoopRegistry: loopRegistry,
GRPCOpts: loop.GRPCOpts{},
Registerer: prometheus.NewRegistry(), // Don't use global registry here since otherwise multiple apps can create name conflicts. Could also potentially give a mock registry to test prometheus.
MercuryPool: mercuryPool,
CapabilitiesRegistry: capabilitiesRegistry,
HTTPClient: c,
Expand Down
7 changes: 4 additions & 3 deletions core/internal/cltest/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -87,7 +88,7 @@ type InstanceAppFactoryWithKeystoreMock struct {
}

// NewApplication creates a new application with specified config and calls the authenticate function of the keystore
func (f InstanceAppFactoryWithKeystoreMock) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, lggr logger.Logger, db *sqlx.DB, ks cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) {
func (f InstanceAppFactoryWithKeystoreMock) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, lggr logger.Logger, registerer prometheus.Registerer, db *sqlx.DB, ks cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) {
keyStore := f.App.GetKeyStore()
err := ks.Authenticate(ctx, keyStore, cfg.Password())
if err != nil {
Expand All @@ -102,15 +103,15 @@ type InstanceAppFactory struct {
}

// NewApplication creates a new application with specified config
func (f InstanceAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, *sqlx.DB, cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) {
func (f InstanceAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, prometheus.Registerer, *sqlx.DB, cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) {
return f.App, nil
}

type seededAppFactory struct {
Application chainlink.Application
}

func (s seededAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, *sqlx.DB, cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) {
func (s seededAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, prometheus.Registerer, *sqlx.DB, cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) {
return noopStopApplication{s.Application}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"

"github.com/pelletier/go-toml/v2"
"github.com/prometheus/client_golang/prometheus"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
Expand Down Expand Up @@ -37,6 +38,7 @@ type RelayerFactory struct {
logger.Logger
*plugins.LoopRegistry
loop.GRPCOpts
Registerer prometheus.Registerer
MercuryPool wsrpc.Pool
CapabilitiesRegistry coretypes.CapabilitiesRegistry
HTTPClient *http.Client
Expand Down Expand Up @@ -81,6 +83,7 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m

relayerOpts := evmrelay.RelayerOpts{
DS: ccOpts.DS,
Registerer: r.Registerer,
CSAETHKeystore: config.CSAETHKeystore,
MercuryPool: r.MercuryPool,
MercuryConfig: config.MercuryConfig,
Expand Down
10 changes: 6 additions & 4 deletions core/services/llo/bm/dummy_transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
// A dummy transmitter useful for benchmarking and testing

var (
transmitSuccessCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "llo_transmit_success_count",
Help: "Running count of successful transmits",
promTransmitSuccessCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "llo",
Subsystem: "dummytransmitter",
Name: "transmit_success_count",
Help: "Running count of successful transmits",
})
)

Expand Down Expand Up @@ -101,7 +103,7 @@ func (t *transmitter) Transmit(
lggr.Debugw(fmt.Sprintf("Failed to decode report with type %s", report.Info.ReportFormat), "err", err)
}
}
transmitSuccessCount.Inc()
promTransmitSuccessCount.Inc()
lggr.Infow("Transmit (dummy)", "digest", digest, "seqNr", seqNr, "report.Report", report.Report, "report.Info", report.Info, "sigs", sigs)
return nil
}
Expand Down
12 changes: 8 additions & 4 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ import (

var (
promMissingStreamCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "llo_stream_missing_count",
Help: "Number of times we tried to observe a stream, but it was missing",
Namespace: "llo",
Subsystem: "datasource",
Name: "stream_missing_count",
Help: "Number of times we tried to observe a stream, but it was missing",
},
[]string{"streamID"},
)
promObservationErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "llo_stream_observation_error_count",
Help: "Number of times we tried to observe a stream, but it failed with an error",
Namespace: "llo",
Subsystem: "datasource",
Name: "stream_observation_error_count",
Help: "Number of times we tried to observe a stream, but it failed with an error",
},
[]string{"streamID"},
)
Expand Down
11 changes: 7 additions & 4 deletions core/services/llo/mercurytransmitter/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

Expand All @@ -22,9 +23,11 @@ type asyncDeleter interface {

var _ services.Service = (*transmitQueue)(nil)

var transmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "llo_transmit_queue_load",
Help: "Current count of items in the transmit queue",
var promTransmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "llo",
Subsystem: "mercurytransmitter",
Name: "transmit_queue_load",
Help: "Current count of items in the transmit queue",
},
[]string{"donID", "serverURL", "capacity"},
)
Expand Down Expand Up @@ -75,7 +78,7 @@ func NewTransmitQueue(lggr logger.Logger, serverURL string, maxlen int, asyncDel
maxlen,
false,
nil,
transmitQueueLoad.WithLabelValues(fmt.Sprintf("%d", asyncDeleter.DonID()), serverURL, fmt.Sprintf("%d", maxlen)),
promTransmitQueueLoad.WithLabelValues(strconv.FormatUint(uint64(asyncDeleter.DonID()), 10), serverURL, strconv.FormatInt(int64(maxlen), 10)),
}
}

Expand Down
Loading

0 comments on commit 6ea4588

Please sign in to comment.