Skip to content

Commit

Permalink
Observability - instrument Node metrics, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-ssvlabs committed Dec 2, 2024
1 parent 8b1eebe commit 52a93fa
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 97 deletions.
10 changes: 3 additions & 7 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ var StartNodeCmd = &cobra.Command{
networkConfig,
genesisvalidation.WithNodeStorage(nodeStorage),
genesisvalidation.WithLogger(logger),
genesisvalidation.WithMetrics(metricsReporter),
genesisvalidation.WithDutyStore(dutyStore),
),
}
Expand Down Expand Up @@ -344,7 +343,6 @@ var StartNodeCmd = &cobra.Command{
cfg.SSVOptions.ValidatorOptions.Graffiti = []byte(cfg.Graffiti)
cfg.SSVOptions.ValidatorOptions.ValidatorStore = nodeStorage.ValidatorStore()
cfg.SSVOptions.ValidatorOptions.OperatorSigner = types.NewSsvOperatorSigner(operatorPrivKey, operatorDataStore.GetOperatorID)
cfg.SSVOptions.Metrics = metricsReporter

cfg.SSVOptions.ValidatorOptions.GenesisControllerOptions.StorageMap = genesisStorageMap
cfg.SSVOptions.ValidatorOptions.GenesisControllerOptions.Network = &genesisP2pNetwork
Expand All @@ -356,7 +354,7 @@ var StartNodeCmd = &cobra.Command{
operatorNode = operator.New(logger, cfg.SSVOptions, slotTickerProvider, storageMap)

if cfg.MetricsAPIPort > 0 {
go startMetricsHandler(cmd.Context(), logger, db, metricsReporter, cfg.MetricsAPIPort, cfg.EnableProfile)
go startMetricsHandler(logger, db, cfg.MetricsAPIPort, cfg.EnableProfile)
}

nodeProber := nodeprobe.NewProber(
Expand All @@ -378,8 +376,6 @@ var StartNodeCmd = &cobra.Command{
nodeProber.Wait()
logger.Info("ethereum node(s) are healthy")

metricsReporter.SSVNodeHealthy()

eventSyncer := setupEventHandling(
cmd.Context(),
logger,
Expand Down Expand Up @@ -812,10 +808,10 @@ func setupEventHandling(
return eventSyncer
}

func startMetricsHandler(ctx context.Context, logger *zap.Logger, db basedb.Database, metricsReporter metricsreporter.MetricsReporter, port int, enableProf bool) {
func startMetricsHandler(logger *zap.Logger, db basedb.Database, port int, enableProf bool) {
logger = logger.Named(logging.NameMetricsHandler)
// init and start HTTP handler
metricsHandler := metrics.NewMetricsHandler(ctx, db, metricsReporter, enableProf, operatorNode.(metrics.HealthChecker))
metricsHandler := metrics.NewHandler(db, enableProf, operatorNode.(metrics.HealthChecker))
addr := fmt.Sprintf(":%d", port)
if err := metricsHandler.Start(logger, http.NewServeMux(), addr); err != nil {
logger.Panic("failed to serve metrics", zap.Error(err))
Expand Down
48 changes: 17 additions & 31 deletions monitoring/metrics/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package metrics

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
Expand All @@ -19,40 +18,27 @@ import (
"github.com/ssvlabs/ssv/storage/basedb"
)

// Handler handles incoming metrics requests
type Handler interface {
// Start starts an http server, listening to /metrics requests
Start(logger *zap.Logger, mux *http.ServeMux, addr string) error
}

type metricsHandler struct {
ctx context.Context
type Handler struct {
db basedb.Database
reporter nodeMetrics
enableProf bool
healthChecker HealthChecker
}

// NewMetricsHandler returns a new metrics handler.
func NewMetricsHandler(ctx context.Context, db basedb.Database, reporter nodeMetrics, enableProf bool, healthChecker HealthChecker) Handler {
if reporter == nil {
reporter = nopMetrics{}
}
mh := metricsHandler{
ctx: ctx,
// NewHandler returns a new metrics handler.
func NewHandler(db basedb.Database, enableProf bool, healthChecker HealthChecker) *Handler {
mh := Handler{
db: db,
reporter: reporter,
enableProf: enableProf,
healthChecker: healthChecker,
}
return &mh
}

func (mh *metricsHandler) Start(logger *zap.Logger, mux *http.ServeMux, addr string) error {
logger.Info("setup collection", fields.Address(addr), zap.Bool("enableProf", mh.enableProf))
func (h *Handler) Start(logger *zap.Logger, mux *http.ServeMux, addr string) error {
logger.Info("setup collection", fields.Address(addr), zap.Bool("enableProf", h.enableProf))

if mh.enableProf {
mh.configureProfiling()
if h.enableProf {
h.configureProfiling()
// adding pprof routes manually on an own HTTPMux to avoid lint issue:
// `G108: Profiling endpoint is automatically exposed on /debug/pprof (gosec)`
mux.HandleFunc("/debug/pprof/", http_pprof.Index)
Expand All @@ -69,8 +55,8 @@ func (mh *metricsHandler) Start(logger *zap.Logger, mux *http.ServeMux, addr str
EnableOpenMetrics: true,
},
))
mux.HandleFunc("/database/count-by-collection", mh.handleCountByCollection)
mux.HandleFunc("/health", mh.handleHealth)
mux.HandleFunc("/database/count-by-collection", h.handleCountByCollection)
mux.HandleFunc("/health", h.handleHealth)

// Set a high timeout to allow for long-running pprof requests.
const timeout = 600 * time.Second
Expand All @@ -92,7 +78,7 @@ func (mh *metricsHandler) Start(logger *zap.Logger, mux *http.ServeMux, addr str
// handleCountByCollection responds with the number of key in the database by collection.
// Prefix can be a string or a 0x-prefixed hex string.
// Empty prefix returns the total number of keys in the database.
func (mh *metricsHandler) handleCountByCollection(w http.ResponseWriter, r *http.Request) {
func (h *Handler) handleCountByCollection(w http.ResponseWriter, r *http.Request) {
var response struct {
Count int64 `json:"count"`
}
Expand All @@ -113,7 +99,7 @@ func (mh *metricsHandler) handleCountByCollection(w http.ResponseWriter, r *http
}
}

n, err := mh.db.CountPrefix(prefix)
n, err := h.db.CountPrefix(prefix)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -126,9 +112,9 @@ func (mh *metricsHandler) handleCountByCollection(w http.ResponseWriter, r *http
}
}

func (mh *metricsHandler) handleHealth(res http.ResponseWriter, req *http.Request) {
if err := mh.healthChecker.HealthCheck(); err != nil {
mh.reporter.SSVNodeNotHealthy()
func (h *Handler) handleHealth(res http.ResponseWriter, req *http.Request) {
if err := h.healthChecker.HealthCheck(); err != nil {
recordSSVNodeStatus(req.Context(), statusUnhealthy)
result := map[string][]string{
"errors": {err.Error()},
}
Expand All @@ -138,14 +124,14 @@ func (mh *metricsHandler) handleHealth(res http.ResponseWriter, req *http.Reques
http.Error(res, string(raw), http.StatusInternalServerError)
}
} else {
mh.reporter.SSVNodeHealthy()
recordSSVNodeStatus(req.Context(), statusHealthy)
if _, err := fmt.Fprintln(res, ""); err != nil {
http.Error(res, err.Error(), http.StatusInternalServerError)
}
}
}

func (mh *metricsHandler) configureProfiling() {
func (h *Handler) configureProfiling() {
runtime.SetBlockProfileRate(10000)
runtime.SetMutexProfileFraction(5)
}
56 changes: 56 additions & 0 deletions monitoring/metrics/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package metrics

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/ssvlabs/ssv/observability"
)

const (
observabilityName = "github.com/ssvlabs/ssv/monitoring/metrics"
observabilityNamespace = "ssv.node"
)

type ssvNodeStatus string

const (
statusHealthy ssvNodeStatus = "healthy"
statusUnhealthy ssvNodeStatus = "unhealthy"
)

var (
meter = otel.Meter(observabilityName)

clientStatusGauge = observability.NewMetric(
meter.Int64Gauge(
metricName("status"),
metric.WithDescription("ssv node health status")))
)

func metricName(name string) string {
return fmt.Sprintf("%s.%s", observabilityNamespace, name)
}

func statusAttribute(value ssvNodeStatus) attribute.KeyValue {
eventNameAttrName := fmt.Sprintf("%s.status", observabilityNamespace)
return attribute.String(eventNameAttrName, string(value))
}

func recordSSVNodeStatus(ctx context.Context, status ssvNodeStatus) {
resetSSVNodeStatusGauge(ctx)

clientStatusGauge.Record(ctx, 1,
metric.WithAttributes(statusAttribute(status)),
)
}

func resetSSVNodeStatusGauge(ctx context.Context) {
for _, status := range []ssvNodeStatus{statusUnhealthy, statusHealthy} {
clientStatusGauge.Record(ctx, 0, metric.WithAttributes(statusAttribute(status)))
}
}
11 changes: 0 additions & 11 deletions monitoring/metrics/reporter.go

This file was deleted.

19 changes: 0 additions & 19 deletions monitoring/metricsreporter/metrics_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,7 @@ import (
ssvmessage "github.com/ssvlabs/ssv/protocol/v2/message"
)

const (
ssvNodeNotHealthy = float64(0)
ssvNodeHealthy = float64(1)
)

var (
ssvNodeStatus = promauto.NewGauge(prometheus.GaugeOpts{
Name: "ssv_node_status",
Help: "Status of the operator node",
})
operatorIndex = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "ssv:exporter:operator_index",
Help: "operator footprint",
Expand Down Expand Up @@ -78,8 +69,6 @@ var (
)

type MetricsReporter interface {
SSVNodeHealthy()
SSVNodeNotHealthy()
OperatorPublicKey(operatorID spectypes.OperatorID, publicKey []byte)
MessageValidationRSAVerifications()
SignatureValidationDuration(duration time.Duration, labels ...string)
Expand Down Expand Up @@ -112,14 +101,6 @@ func New(opts ...Option) MetricsReporter {
return &metricsReporter{}
}

func (m *metricsReporter) SSVNodeHealthy() {
ssvNodeStatus.Set(ssvNodeHealthy)
}

func (m *metricsReporter) SSVNodeNotHealthy() {
ssvNodeStatus.Set(ssvNodeNotHealthy)
}

func (m *metricsReporter) OperatorPublicKey(operatorID spectypes.OperatorID, publicKey []byte) {
pkHash := fmt.Sprintf("%x", sha256.Sum256(publicKey))
operatorIndex.WithLabelValues(pkHash, strconv.FormatUint(operatorID, 10)).Set(float64(operatorID))
Expand Down
2 changes: 0 additions & 2 deletions monitoring/metricsreporter/nop_metrics_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ func NewNop() MetricsReporter {
return &nopMetrics{}
}

func (n *nopMetrics) SSVNodeHealthy() {}
func (n *nopMetrics) SSVNodeNotHealthy() {}
func (n *nopMetrics) OperatorPublicKey(operatorID spectypes.OperatorID, publicKey []byte) {}
func (n *nopMetrics) MessageValidationRSAVerifications() {}
func (n *nopMetrics) GenesisMessageAccepted(role genesisspectypes.BeaconRole, round genesisspecqbft.Round) {
Expand Down
17 changes: 0 additions & 17 deletions operator/metrics.go

This file was deleted.

10 changes: 0 additions & 10 deletions operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type Options struct {
DutyStore *dutystore.Store
WS api.WebSocketServer
WsAPIPort int
Metrics nodeMetrics
}

// operatorNode implements Node interface
Expand All @@ -65,8 +64,6 @@ type operatorNode struct {

ws api.WebSocketServer
wsAPIPort int

metrics nodeMetrics
}

// New is the constructor of operatorNode
Expand Down Expand Up @@ -107,12 +104,6 @@ func New(logger *zap.Logger, opts Options, slotTickerProvider slotticker.Provide

ws: opts.WS,
wsAPIPort: opts.WsAPIPort,

metrics: opts.Metrics,
}

if node.metrics == nil {
node.metrics = nopMetrics{}
}

return node
Expand Down Expand Up @@ -210,7 +201,6 @@ func (n *operatorNode) reportOperators(logger *zap.Logger) {
}
logger.Debug("reporting operators", zap.Int("count", len(operators)))
for i := range operators {
n.metrics.OperatorPublicKey(operators[i].ID, operators[i].PublicKey)
logger.Debug("report operator public key",
fields.OperatorID(operators[i].ID),
fields.PubKey(operators[i].PublicKey))
Expand Down

0 comments on commit 52a93fa

Please sign in to comment.