Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): OTeL - instrument Node component #1899

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ var cfg config

var globalArgs global_config.Args

var operatorNode operator.Node

// StartNodeCmd is the command to start SSV node
var StartNodeCmd = &cobra.Command{
Use: "start-node",
Expand Down Expand Up @@ -262,7 +260,6 @@ var StartNodeCmd = &cobra.Command{
networkConfig,
genesisvalidation.WithNodeStorage(nodeStorage),
genesisvalidation.WithLogger(logger),
genesisvalidation.WithMetrics(metricsReporter),
genesisvalidation.WithDutyStore(dutyStore),
),
}
Expand Down Expand Up @@ -343,7 +340,6 @@ var StartNodeCmd = &cobra.Command{
cfg.SSVOptions.ValidatorOptions.Graffiti = []byte(cfg.Graffiti)
cfg.SSVOptions.ValidatorOptions.ValidatorStore = nodeStorage.ValidatorStore()
cfg.SSVOptions.ValidatorOptions.OperatorSigner = types.NewSsvOperatorSigner(operatorPrivKey, operatorDataStore.GetOperatorID)
cfg.SSVOptions.Metrics = metricsReporter

cfg.SSVOptions.ValidatorOptions.GenesisControllerOptions.StorageMap = genesisStorageMap
cfg.SSVOptions.ValidatorOptions.GenesisControllerOptions.Network = &genesisP2pNetwork
Expand All @@ -352,10 +348,10 @@ var StartNodeCmd = &cobra.Command{
cfg.SSVOptions.ValidatorController = validatorCtrl
cfg.SSVOptions.ValidatorStore = validatorStore

operatorNode = operator.New(logger, cfg.SSVOptions, slotTickerProvider, storageMap)
operatorNode := operator.New(logger, cfg.SSVOptions, slotTickerProvider, storageMap)

if cfg.MetricsAPIPort > 0 {
go startMetricsHandler(cmd.Context(), logger, db, metricsReporter, cfg.MetricsAPIPort, cfg.EnableProfile)
go startMetricsHandler(logger, db, cfg.MetricsAPIPort, cfg.EnableProfile, operatorNode)
}

nodeProber := nodeprobe.NewProber(
Expand All @@ -377,8 +373,6 @@ var StartNodeCmd = &cobra.Command{
nodeProber.Wait()
logger.Info("ethereum node(s) are healthy")

metricsReporter.SSVNodeHealthy()

eventSyncer := setupEventHandling(
cmd.Context(),
logger,
Expand Down Expand Up @@ -811,10 +805,10 @@ func setupEventHandling(
return eventSyncer
}

func startMetricsHandler(ctx context.Context, logger *zap.Logger, db basedb.Database, metricsReporter metricsreporter.MetricsReporter, port int, enableProf bool) {
func startMetricsHandler(logger *zap.Logger, db basedb.Database, port int, enableProf bool, opNode *operator.Node) {
logger = logger.Named(logging.NameMetricsHandler)
// init and start HTTP handler
metricsHandler := metrics.NewMetricsHandler(ctx, db, metricsReporter, enableProf, operatorNode.(metrics.HealthChecker))
metricsHandler := metrics.NewHandler(db, enableProf, opNode)
addr := fmt.Sprintf(":%d", port)
if err := metricsHandler.Start(logger, http.NewServeMux(), addr); err != nil {
logger.Panic("failed to serve metrics", zap.Error(err))
Expand Down
46 changes: 15 additions & 31 deletions monitoring/metrics/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package metrics

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
Expand All @@ -19,40 +18,27 @@ import (
"github.com/ssvlabs/ssv/storage/basedb"
)

// Handler handles incoming metrics requests
type Handler interface {
// Start starts an http server, listening to /metrics requests
Start(logger *zap.Logger, mux *http.ServeMux, addr string) error
}

type metricsHandler struct {
ctx context.Context
type Handler struct {
db basedb.Database
reporter nodeMetrics
enableProf bool
healthChecker HealthChecker
}

// NewMetricsHandler returns a new metrics handler.
func NewMetricsHandler(ctx context.Context, db basedb.Database, reporter nodeMetrics, enableProf bool, healthChecker HealthChecker) Handler {
if reporter == nil {
reporter = nopMetrics{}
}
mh := metricsHandler{
ctx: ctx,
// NewHandler returns a new metrics handler.
func NewHandler(db basedb.Database, enableProf bool, healthChecker HealthChecker) *Handler {
mh := Handler{
db: db,
reporter: reporter,
enableProf: enableProf,
healthChecker: healthChecker,
}
return &mh
}

func (mh *metricsHandler) Start(logger *zap.Logger, mux *http.ServeMux, addr string) error {
logger.Info("setup collection", fields.Address(addr), zap.Bool("enableProf", mh.enableProf))
func (h *Handler) Start(logger *zap.Logger, mux *http.ServeMux, addr string) error {
logger.Info("setup collection", fields.Address(addr), zap.Bool("enableProf", h.enableProf))

if mh.enableProf {
mh.configureProfiling()
if h.enableProf {
h.configureProfiling()
// adding pprof routes manually on an own HTTPMux to avoid lint issue:
// `G108: Profiling endpoint is automatically exposed on /debug/pprof (gosec)`
mux.HandleFunc("/debug/pprof/", http_pprof.Index)
Expand All @@ -69,8 +55,8 @@ func (mh *metricsHandler) Start(logger *zap.Logger, mux *http.ServeMux, addr str
EnableOpenMetrics: true,
},
))
mux.HandleFunc("/database/count-by-collection", mh.handleCountByCollection)
mux.HandleFunc("/health", mh.handleHealth)
mux.HandleFunc("/database/count-by-collection", h.handleCountByCollection)
mux.HandleFunc("/health", h.handleHealth)

// Set a high timeout to allow for long-running pprof requests.
const timeout = 600 * time.Second
Expand All @@ -92,7 +78,7 @@ func (mh *metricsHandler) Start(logger *zap.Logger, mux *http.ServeMux, addr str
// handleCountByCollection responds with the number of key in the database by collection.
// Prefix can be a string or a 0x-prefixed hex string.
// Empty prefix returns the total number of keys in the database.
func (mh *metricsHandler) handleCountByCollection(w http.ResponseWriter, r *http.Request) {
func (h *Handler) handleCountByCollection(w http.ResponseWriter, r *http.Request) {
var response struct {
Count int64 `json:"count"`
}
Expand All @@ -113,7 +99,7 @@ func (mh *metricsHandler) handleCountByCollection(w http.ResponseWriter, r *http
}
}

n, err := mh.db.CountPrefix(prefix)
n, err := h.db.CountPrefix(prefix)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -126,9 +112,8 @@ func (mh *metricsHandler) handleCountByCollection(w http.ResponseWriter, r *http
}
}

func (mh *metricsHandler) handleHealth(res http.ResponseWriter, req *http.Request) {
if err := mh.healthChecker.HealthCheck(); err != nil {
mh.reporter.SSVNodeNotHealthy()
func (h *Handler) handleHealth(res http.ResponseWriter, req *http.Request) {
if err := h.healthChecker.HealthCheck(); err != nil {
result := map[string][]string{
"errors": {err.Error()},
}
Expand All @@ -138,14 +123,13 @@ func (mh *metricsHandler) handleHealth(res http.ResponseWriter, req *http.Reques
http.Error(res, string(raw), http.StatusInternalServerError)
}
} else {
mh.reporter.SSVNodeHealthy()
if _, err := fmt.Fprintln(res, ""); err != nil {
http.Error(res, err.Error(), http.StatusInternalServerError)
}
}
}

func (mh *metricsHandler) configureProfiling() {
func (h *Handler) configureProfiling() {
runtime.SetBlockProfileRate(10000)
runtime.SetMutexProfileFraction(5)
}
11 changes: 0 additions & 11 deletions monitoring/metrics/reporter.go

This file was deleted.

19 changes: 0 additions & 19 deletions monitoring/metricsreporter/metrics_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,7 @@ import (
ssvmessage "github.com/ssvlabs/ssv/protocol/v2/message"
)

const (
ssvNodeNotHealthy = float64(0)
ssvNodeHealthy = float64(1)
)

var (
ssvNodeStatus = promauto.NewGauge(prometheus.GaugeOpts{
Name: "ssv_node_status",
Help: "Status of the operator node",
})
operatorIndex = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "ssv:exporter:operator_index",
Help: "operator footprint",
Expand Down Expand Up @@ -78,8 +69,6 @@ var (
)

type MetricsReporter interface {
SSVNodeHealthy()
SSVNodeNotHealthy()
OperatorPublicKey(operatorID spectypes.OperatorID, publicKey []byte)
MessageValidationRSAVerifications()
SignatureValidationDuration(duration time.Duration, labels ...string)
Expand Down Expand Up @@ -112,14 +101,6 @@ func New(opts ...Option) MetricsReporter {
return &metricsReporter{}
}

func (m *metricsReporter) SSVNodeHealthy() {
ssvNodeStatus.Set(ssvNodeHealthy)
}

func (m *metricsReporter) SSVNodeNotHealthy() {
ssvNodeStatus.Set(ssvNodeNotHealthy)
}

func (m *metricsReporter) OperatorPublicKey(operatorID spectypes.OperatorID, publicKey []byte) {
pkHash := fmt.Sprintf("%x", sha256.Sum256(publicKey))
operatorIndex.WithLabelValues(pkHash, strconv.FormatUint(operatorID, 10)).Set(float64(operatorID))
Expand Down
2 changes: 0 additions & 2 deletions monitoring/metricsreporter/nop_metrics_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ func NewNop() MetricsReporter {
return &nopMetrics{}
}

func (n *nopMetrics) SSVNodeHealthy() {}
func (n *nopMetrics) SSVNodeNotHealthy() {}
func (n *nopMetrics) OperatorPublicKey(operatorID spectypes.OperatorID, publicKey []byte) {}
func (n *nopMetrics) MessageValidationRSAVerifications() {}
func (n *nopMetrics) GenesisMessageAccepted(role genesisspectypes.BeaconRole, round genesisspecqbft.Round) {
Expand Down
17 changes: 0 additions & 17 deletions operator/metrics.go

This file was deleted.

34 changes: 9 additions & 25 deletions operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ import (
"github.com/ssvlabs/ssv/storage/basedb"
)

// Node represents the behavior of SSV node
type Node interface {
Start(logger *zap.Logger) error
}

// Options contains options to create the node
type Options struct {
// NetworkName is the network name of this node
Expand All @@ -46,11 +41,9 @@ type Options struct {
DutyStore *dutystore.Store
WS api.WebSocketServer
WsAPIPort int
Metrics nodeMetrics
}

// operatorNode implements Node interface
type operatorNode struct {
type Node struct {
network networkconfig.NetworkConfig
context context.Context
validatorsCtrl validator.Controller
Expand All @@ -65,13 +58,11 @@ type operatorNode struct {

ws api.WebSocketServer
wsAPIPort int

metrics nodeMetrics
}

// New is the constructor of operatorNode
func New(logger *zap.Logger, opts Options, slotTickerProvider slotticker.Provider, qbftStorage *qbftstorage.QBFTStores) Node {
node := &operatorNode{
// New is the constructor of Node
func New(logger *zap.Logger, opts Options, slotTickerProvider slotticker.Provider, qbftStorage *qbftstorage.QBFTStores) *Node {
node := &Node{
context: opts.Context,
validatorsCtrl: opts.ValidatorController,
validatorOptions: opts.ValidatorOptions,
Expand Down Expand Up @@ -107,19 +98,13 @@ func New(logger *zap.Logger, opts Options, slotTickerProvider slotticker.Provide

ws: opts.WS,
wsAPIPort: opts.WsAPIPort,

metrics: opts.Metrics,
}

if node.metrics == nil {
node.metrics = nopMetrics{}
}

return node
}

// Start starts to stream duties and run IBFT instances
func (n *operatorNode) Start(logger *zap.Logger) error {
func (n *Node) Start(logger *zap.Logger) error {
logger.Named(logging.NameOperator)

logger.Info("All required services are ready. OPERATOR SUCCESSFULLY CONFIGURED AND NOW RUNNING!")
Expand Down Expand Up @@ -164,15 +149,15 @@ func (n *operatorNode) Start(logger *zap.Logger) error {
}

// HealthCheck returns a list of issues regards the state of the operator node
func (n *operatorNode) HealthCheck() error {
func (n *Node) HealthCheck() error {
// TODO: previously this checked availability of consensus & execution clients.
// However, currently the node crashes when those clients are down,
// so this health check is currently a positive no-op.
return nil
}

// handleQueryRequests waits for incoming messages and
func (n *operatorNode) handleQueryRequests(logger *zap.Logger, nm *api.NetworkMessage) {
func (n *Node) handleQueryRequests(logger *zap.Logger, nm *api.NetworkMessage) {
if nm.Err != nil {
nm.Msg = api.Message{Type: api.TypeError, Data: []string{"could not parse network message"}}
}
Expand All @@ -188,7 +173,7 @@ func (n *operatorNode) handleQueryRequests(logger *zap.Logger, nm *api.NetworkMe
}
}

func (n *operatorNode) startWSServer(logger *zap.Logger) error {
func (n *Node) startWSServer(logger *zap.Logger) error {
if n.ws != nil {
logger.Info("starting WS server")

Expand All @@ -202,15 +187,14 @@ func (n *operatorNode) startWSServer(logger *zap.Logger) error {
return nil
}

func (n *operatorNode) reportOperators(logger *zap.Logger) {
func (n *Node) reportOperators(logger *zap.Logger) {
operators, err := n.storage.ListOperators(nil, 0, 1000) // TODO more than 1000?
if err != nil {
logger.Warn("failed to get all operators for reporting", zap.Error(err))
return
}
logger.Debug("reporting operators", zap.Int("count", len(operators)))
for i := range operators {
n.metrics.OperatorPublicKey(operators[i].ID, operators[i].PublicKey)
logger.Debug("report operator public key",
fields.OperatorID(operators[i].ID),
fields.PubKey(operators[i].PublicKey))
Expand Down
Loading