Skip to content

Commit

Permalink
Observability - instrument P2P component
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-ssvlabs committed Dec 2, 2024
1 parent 8b1eebe commit b9c7af3
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 162 deletions.
2 changes: 1 addition & 1 deletion cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ func setupP2P(logger *zap.Logger, db basedb.Database, mr metricsreporter.Metrics
}
cfg.P2pNetworkConfig.NetworkPrivateKey = netPrivKey

n, err := p2pv1.New(logger, &cfg.P2pNetworkConfig, mr)
n, err := p2pv1.New(logger, &cfg.P2pNetworkConfig)
if err != nil {
logger.Fatal("failed to setup p2p network", zap.Error(err))
}
Expand Down
147 changes: 0 additions & 147 deletions network/p2p/metrics.go

This file was deleted.

79 changes: 79 additions & 0 deletions network/p2p/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package p2pv1

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/topics"
"github.com/ssvlabs/ssv/observability"
)

const (
observabilityName = "github.com/ssvlabs/ssv/network/p2p"
observabilityNamespace = "ssv.p2p.peer"
)

var (
meter = otel.Meter(observabilityName)

peersConnectedGauge = observability.NewMetric(
meter.Int64Gauge(
metricName("connected"),
metric.WithUnit("{peer}"),
metric.WithDescription("number of connected peers")))

peersByTopicCounter = observability.NewMetric(
meter.Int64Gauge(
metricName("by_topic"),
metric.WithUnit("{peer}"),
metric.WithDescription("number of connected peers per topic")))

peerIdentityGauge = observability.NewMetric(
meter.Int64Gauge(
metricName("identity"),
metric.WithUnit("{peer}"),
metric.WithDescription("describes identities of connected peers")))
)

func metricName(name string) string {
return fmt.Sprintf("%s.%s", observabilityNamespace, name)
}

func recordPeerCount(ctx context.Context, peers []peer.ID) func() {
return func() {
peersConnectedGauge.Record(ctx, int64(len(peers)))
}
}

func recordPeerCountPerTopic(ctx context.Context, ctrl topics.Controller) func() {
return func() {
for _, topicName := range ctrl.Topics() {
peers, err := ctrl.Peers(topicName)
if err != nil {
return
}
peersByTopicCounter.Record(ctx, int64(len(peers)), metric.WithAttributes(attribute.String("ssv.p2p.topic.name", topicName)))
}
}
}

func recordPeerIdentities(ctx context.Context, peers []peer.ID, index peers.Index) func() {
return func() {
for _, pid := range peers {
nodeVersion := "unknown"
ni := index.NodeInfo(pid)
if ni != nil {
if ni.Metadata != nil {
nodeVersion = ni.Metadata.NodeVersion
}
}
peerIdentityGauge.Record(ctx, 1, metric.WithAttributes(attribute.String("ssv.node.version", nodeVersion)))
}
}
}
12 changes: 5 additions & 7 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ type p2pNetwork struct {
connHandler connections.ConnHandler
connGater connmgr.ConnectionGater
trustedPeers []*peer.AddrInfo
metrics Metrics

state int32

Expand All @@ -107,7 +106,7 @@ type p2pNetwork struct {
}

// New creates a new p2p network
func New(logger *zap.Logger, cfg *Config, mr Metrics) (*p2pNetwork, error) {
func New(logger *zap.Logger, cfg *Config) (*p2pNetwork, error) {
ctx, cancel := context.WithCancel(cfg.Ctx)

logger = logger.Named(logging.NameP2PNetwork)
Expand All @@ -127,7 +126,6 @@ func New(logger *zap.Logger, cfg *Config, mr Metrics) (*p2pNetwork, error) {
operatorPKHashToPKCache: hashmap.New[string, []byte](),
operatorSigner: cfg.OperatorSigner,
operatorDataStore: cfg.OperatorDataStore,
metrics: mr,
}
if err := n.parseTrustedPeers(); err != nil {
return nil, err
Expand Down Expand Up @@ -259,13 +257,14 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
go n.startDiscovery(logger, connector)

async.Interval(n.ctx, connManagerBalancingInterval, n.peersBalancing(logger))

// don't report metrics in tests
if n.cfg.Metrics != nil {
async.Interval(n.ctx, peersReportingInterval, n.reportAllPeers(logger))
async.Interval(n.ctx, peersReportingInterval, recordPeerCount(n.ctx, n.host.Network().Peers()))

async.Interval(n.ctx, peerIdentitiesReportingInterval, n.reportPeerIdentities(logger))
async.Interval(n.ctx, peerIdentitiesReportingInterval, recordPeerIdentities(n.ctx, n.host.Network().Peers(), n.idx))

async.Interval(n.ctx, topicsReportingInterval, n.reportTopics(logger))
async.Interval(n.ctx, topicsReportingInterval, recordPeerCountPerTopic(n.ctx, n.topicsCtrl))
}

if err := n.subscribeToSubnets(logger); err != nil {
Expand All @@ -291,7 +290,6 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() {
// Check if it has the maximum number of connections
currentCount := len(allPeers)
if currentCount < n.cfg.MaxPeers {
_ = n.idx.GetSubnetsStats() // trigger metrics update
return
}

Expand Down
4 changes: 0 additions & 4 deletions network/p2p/p2p_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (
"github.com/ssvlabs/ssv/network"
"github.com/ssvlabs/ssv/network/commons"
"github.com/ssvlabs/ssv/network/records"
genesismessage "github.com/ssvlabs/ssv/protocol/genesis/message"
"github.com/ssvlabs/ssv/protocol/genesis/ssv/genesisqueue"
"github.com/ssvlabs/ssv/protocol/v2/message"
p2pprotocol "github.com/ssvlabs/ssv/protocol/v2/p2p"
"github.com/ssvlabs/ssv/protocol/v2/ssv/queue"
)
Expand Down Expand Up @@ -231,10 +229,8 @@ func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.C
switch m := msg.ValidatorData.(type) {
case *queue.SSVMessage:
decodedMsg = m
metricsRouterIncoming.WithLabelValues(message.MsgTypeToString(m.MsgType)).Inc()
case *genesisqueue.GenesisSSVMessage:
decodedMsg = m
metricsRouterIncoming.WithLabelValues(genesismessage.MsgTypeToString(m.MsgType)).Inc()
case nil:
return errors.New("message was not decoded")
default:
Expand Down
4 changes: 1 addition & 3 deletions network/p2p/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"golang.org/x/sync/errgroup"

"github.com/ssvlabs/ssv/message/validation"
"github.com/ssvlabs/ssv/monitoring/metricsreporter"
"github.com/ssvlabs/ssv/network"
"github.com/ssvlabs/ssv/network/commons"
p2pcommons "github.com/ssvlabs/ssv/network/commons"
Expand Down Expand Up @@ -224,8 +223,7 @@ func (ln *LocalNet) NewTestP2pNetwork(ctx context.Context, nodeIndex uint64, key

cfg.OperatorDataStore = operatordatastore.New(&registrystorage.OperatorData{ID: nodeIndex + 1})

mr := metricsreporter.New()
p, err := New(logger, cfg, mr)
p, err := New(logger, cfg)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit b9c7af3

Please sign in to comment.