Skip to content

Commit

Permalink
Observability - instrument P2P component
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-ssvlabs committed Dec 2, 2024
1 parent 8b1eebe commit a1c4e5f
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 171 deletions.
3 changes: 1 addition & 2 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ var StartNodeCmd = &cobra.Command{
}
}

cfg.P2pNetworkConfig.Metrics = metricsReporter
cfg.P2pNetworkConfig.MessageValidator = messageValidator
cfg.SSVOptions.ValidatorOptions.MessageValidator = messageValidator

Expand Down Expand Up @@ -670,7 +669,7 @@ func setupP2P(logger *zap.Logger, db basedb.Database, mr metricsreporter.Metrics
}
cfg.P2pNetworkConfig.NetworkPrivateKey = netPrivKey

n, err := p2pv1.New(logger, &cfg.P2pNetworkConfig, mr)
n, err := p2pv1.New(logger, &cfg.P2pNetworkConfig)
if err != nil {
logger.Fatal("failed to setup p2p network", zap.Error(err))
}
Expand Down
3 changes: 0 additions & 3 deletions network/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"go.uber.org/zap"

"github.com/ssvlabs/ssv/message/validation"
"github.com/ssvlabs/ssv/monitoring/metricsreporter"
"github.com/ssvlabs/ssv/network"
"github.com/ssvlabs/ssv/network/commons"
"github.com/ssvlabs/ssv/networkconfig"
Expand Down Expand Up @@ -75,8 +74,6 @@ type Config struct {
Network networkconfig.NetworkConfig
// MessageValidator validates incoming messages.
MessageValidator validation.MessageValidator
// Metrics report metrics.
Metrics metricsreporter.MetricsReporter

PubsubMsgCacheTTL time.Duration `yaml:"PubsubMsgCacheTTL" env:"PUBSUB_MSG_CACHE_TTL" env-description:"How long a message ID will be remembered as seen"`
PubsubOutQueueSize int `yaml:"PubsubOutQueueSize" env:"PUBSUB_OUT_Q_SIZE" env-description:"The size that we assign to the outbound pubsub message queue"`
Expand Down
147 changes: 0 additions & 147 deletions network/p2p/metrics.go

This file was deleted.

84 changes: 84 additions & 0 deletions network/p2p/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package p2pv1

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/libp2p/go-libp2p/core/host"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/topics"
"github.com/ssvlabs/ssv/observability"
)

const (
observabilityName = "github.com/ssvlabs/ssv/network/p2p"
observabilityNamespace = "ssv.p2p.peer"
)

var (
meter = otel.Meter(observabilityName)

peersConnectedGauge = observability.NewMetric(
meter.Int64Gauge(
metricName("connected"),
metric.WithUnit("{peer}"),
metric.WithDescription("number of connected peers")))

peersByTopicCounter = observability.NewMetric(
meter.Int64Gauge(
metricName("by_topic"),
metric.WithUnit("{peer}"),
metric.WithDescription("number of connected peers per topic")))

peerIdentityGauge = observability.NewMetric(
meter.Int64Gauge(
metricName("identity"),
metric.WithUnit("{peer}"),
metric.WithDescription("describes identities of connected peers")))
)

func metricName(name string) string {
return fmt.Sprintf("%s.%s", observabilityNamespace, name)
}

func recordPeerCount(ctx context.Context, host host.Host) func() {
return func() {
peers := host.Network().Peers()
peersConnectedGauge.Record(ctx, int64(len(peers)))
}
}

func recordPeerCountPerTopic(ctx context.Context, ctrl topics.Controller) func() {
return func() {
for _, topicName := range ctrl.Topics() {
peers, err := ctrl.Peers(topicName)
if err != nil {
return
}
peersByTopicCounter.Record(ctx, int64(len(peers)), metric.WithAttributes(attribute.String("ssv.p2p.topic.name", topicName)))
}
}
}

func recordPeerIdentities(ctx context.Context, host host.Host, index peers.Index) func() {
return func() {
peersByVersion := make(map[string]int64)
for _, pid := range host.Network().Peers() {
nodeVersion := "unknown"
ni := index.NodeInfo(pid)
if ni != nil {
if ni.Metadata != nil {
nodeVersion = ni.Metadata.NodeVersion
}
}
peersByVersion[nodeVersion]++
}
for version, peerCount := range peersByVersion {
peerIdentityGauge.Record(ctx, peerCount, metric.WithAttributes(attribute.String("ssv.node.version", version)))
}
}
}
17 changes: 6 additions & 11 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
connManagerBalancingInterval = 3 * time.Minute
connManagerBalancingTimeout = time.Minute
peersReportingInterval = 60 * time.Second
peerIdentitiesReportingInterval = 5 * time.Minute
peerIdentitiesReportingInterval = time.Minute
topicsReportingInterval = 180 * time.Second
maximumIrrelevantPeersToDisconnect = 3
)
Expand Down Expand Up @@ -86,7 +86,6 @@ type p2pNetwork struct {
connHandler connections.ConnHandler
connGater connmgr.ConnectionGater
trustedPeers []*peer.AddrInfo
metrics Metrics

state int32

Expand All @@ -107,7 +106,7 @@ type p2pNetwork struct {
}

// New creates a new p2p network
func New(logger *zap.Logger, cfg *Config, mr Metrics) (*p2pNetwork, error) {
func New(logger *zap.Logger, cfg *Config) (*p2pNetwork, error) {
ctx, cancel := context.WithCancel(cfg.Ctx)

logger = logger.Named(logging.NameP2PNetwork)
Expand All @@ -127,7 +126,6 @@ func New(logger *zap.Logger, cfg *Config, mr Metrics) (*p2pNetwork, error) {
operatorPKHashToPKCache: hashmap.New[string, []byte](),
operatorSigner: cfg.OperatorSigner,
operatorDataStore: cfg.OperatorDataStore,
metrics: mr,
}
if err := n.parseTrustedPeers(); err != nil {
return nil, err
Expand Down Expand Up @@ -259,14 +257,12 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
go n.startDiscovery(logger, connector)

async.Interval(n.ctx, connManagerBalancingInterval, n.peersBalancing(logger))
// don't report metrics in tests
if n.cfg.Metrics != nil {
async.Interval(n.ctx, peersReportingInterval, n.reportAllPeers(logger))

async.Interval(n.ctx, peerIdentitiesReportingInterval, n.reportPeerIdentities(logger))
async.Interval(n.ctx, peersReportingInterval, recordPeerCount(n.ctx, n.host))

async.Interval(n.ctx, topicsReportingInterval, n.reportTopics(logger))
}
async.Interval(n.ctx, peerIdentitiesReportingInterval, recordPeerIdentities(n.ctx, n.host, n.idx))

async.Interval(n.ctx, topicsReportingInterval, recordPeerCountPerTopic(n.ctx, n.topicsCtrl))

if err := n.subscribeToSubnets(logger); err != nil {
return err
Expand All @@ -291,7 +287,6 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() {
// Check if it has the maximum number of connections
currentCount := len(allPeers)
if currentCount < n.cfg.MaxPeers {
_ = n.idx.GetSubnetsStats() // trigger metrics update
return
}

Expand Down
4 changes: 0 additions & 4 deletions network/p2p/p2p_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (
"github.com/ssvlabs/ssv/network"
"github.com/ssvlabs/ssv/network/commons"
"github.com/ssvlabs/ssv/network/records"
genesismessage "github.com/ssvlabs/ssv/protocol/genesis/message"
"github.com/ssvlabs/ssv/protocol/genesis/ssv/genesisqueue"
"github.com/ssvlabs/ssv/protocol/v2/message"
p2pprotocol "github.com/ssvlabs/ssv/protocol/v2/p2p"
"github.com/ssvlabs/ssv/protocol/v2/ssv/queue"
)
Expand Down Expand Up @@ -231,10 +229,8 @@ func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.C
switch m := msg.ValidatorData.(type) {
case *queue.SSVMessage:
decodedMsg = m
metricsRouterIncoming.WithLabelValues(message.MsgTypeToString(m.MsgType)).Inc()
case *genesisqueue.GenesisSSVMessage:
decodedMsg = m
metricsRouterIncoming.WithLabelValues(genesismessage.MsgTypeToString(m.MsgType)).Inc()
case nil:
return errors.New("message was not decoded")
default:
Expand Down
5 changes: 1 addition & 4 deletions network/p2p/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"golang.org/x/sync/errgroup"

"github.com/ssvlabs/ssv/message/validation"
"github.com/ssvlabs/ssv/monitoring/metricsreporter"
"github.com/ssvlabs/ssv/network"
"github.com/ssvlabs/ssv/network/commons"
p2pcommons "github.com/ssvlabs/ssv/network/commons"
Expand Down Expand Up @@ -180,7 +179,6 @@ func (ln *LocalNet) NewTestP2pNetwork(ctx context.Context, nodeIndex uint64, key
cfg.Ctx = ctx
cfg.Subnets = "00000000000000000100000400000400" // calculated for topics 64, 90, 114; PAY ATTENTION for future test scenarios which use more than one eth-validator we need to make this field dynamically changing
cfg.NodeStorage = nodeStorage
cfg.Metrics = nil
cfg.MessageValidator = validation.New(
networkconfig.TestNetwork,
nodeStorage.ValidatorStore(),
Expand Down Expand Up @@ -224,8 +222,7 @@ func (ln *LocalNet) NewTestP2pNetwork(ctx context.Context, nodeIndex uint64, key

cfg.OperatorDataStore = operatordatastore.New(&registrystorage.OperatorData{ID: nodeIndex + 1})

mr := metricsreporter.New()
p, err := New(logger, cfg, mr)
p, err := New(logger, cfg)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a1c4e5f

Please sign in to comment.