diff --git a/cli/operator/node.go b/cli/operator/node.go index 4359e20ff5..fa7f8b2326 100644 --- a/cli/operator/node.go +++ b/cli/operator/node.go @@ -268,7 +268,6 @@ var StartNodeCmd = &cobra.Command{ } } - cfg.P2pNetworkConfig.Metrics = metricsReporter cfg.P2pNetworkConfig.MessageValidator = messageValidator cfg.SSVOptions.ValidatorOptions.MessageValidator = messageValidator @@ -670,7 +669,7 @@ func setupP2P(logger *zap.Logger, db basedb.Database, mr metricsreporter.Metrics } cfg.P2pNetworkConfig.NetworkPrivateKey = netPrivKey - n, err := p2pv1.New(logger, &cfg.P2pNetworkConfig, mr) + n, err := p2pv1.New(logger, &cfg.P2pNetworkConfig) if err != nil { logger.Fatal("failed to setup p2p network", zap.Error(err)) } diff --git a/network/p2p/config.go b/network/p2p/config.go index c659b610ea..dec1102418 100644 --- a/network/p2p/config.go +++ b/network/p2p/config.go @@ -17,7 +17,6 @@ import ( "go.uber.org/zap" "github.com/ssvlabs/ssv/message/validation" - "github.com/ssvlabs/ssv/monitoring/metricsreporter" "github.com/ssvlabs/ssv/network" "github.com/ssvlabs/ssv/network/commons" "github.com/ssvlabs/ssv/networkconfig" @@ -75,8 +74,6 @@ type Config struct { Network networkconfig.NetworkConfig // MessageValidator validates incoming messages. MessageValidator validation.MessageValidator - // Metrics report metrics. - Metrics metricsreporter.MetricsReporter PubsubMsgCacheTTL time.Duration `yaml:"PubsubMsgCacheTTL" env:"PUBSUB_MSG_CACHE_TTL" env-description:"How long a message ID will be remembered as seen"` PubsubOutQueueSize int `yaml:"PubsubOutQueueSize" env:"PUBSUB_OUT_Q_SIZE" env-description:"The size that we assign to the outbound pubsub message queue"` diff --git a/network/p2p/metrics.go b/network/p2p/metrics.go deleted file mode 100644 index 88164599c4..0000000000 --- a/network/p2p/metrics.go +++ /dev/null @@ -1,147 +0,0 @@ -package p2pv1 - -import ( - "strconv" - - "github.com/ssvlabs/ssv/logging/fields" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/zap" - - "github.com/ssvlabs/ssv/utils/format" -) - -type Metrics interface { -} - -var ( - // MetricsAllConnectedPeers counts all connected peers - MetricsAllConnectedPeers = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "ssv_p2p_all_connected_peers", - Help: "Count connected peers", - }) - // MetricsConnectedPeers counts connected peers for a topic - MetricsConnectedPeers = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "ssv_p2p_connected_peers", - Help: "Count connected peers for a validator", - }, []string{"pubKey"}) - // MetricsPeersIdentity tracks peers identity - MetricsPeersIdentity = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "ssv:network:peers_identity", - Help: "Peers identity", - }, []string{"pubKey", "operatorID", "v", "pid", "type"}) - metricsRouterIncoming = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "ssv:network:router:in", - Help: "Counts incoming messages", - }, []string{"mt"}) -) - -func init() { - logger := zap.L() - if err := prometheus.Register(MetricsAllConnectedPeers); err != nil { - logger.Debug("could not register prometheus collector") - } - if err := prometheus.Register(MetricsPeersIdentity); err != nil { - logger.Debug("could not register prometheus collector") - } - if err := prometheus.Register(MetricsConnectedPeers); err != nil { - logger.Debug("could not register prometheus collector") - } - if err := prometheus.Register(metricsRouterIncoming); err != nil { - logger.Debug("could not register prometheus collector") - } -} - -var unknown = "unknown" - -func (n *p2pNetwork) reportAllPeers(logger *zap.Logger) func() { - return func() { - pids := n.host.Network().Peers() - logger.Debug("connected peers status", fields.Count(len(pids))) - MetricsAllConnectedPeers.Set(float64(len(pids))) - } -} - -func (n *p2pNetwork) reportPeerIdentities(logger *zap.Logger) func() { - return func() { - pids := n.host.Network().Peers() - for _, pid := range pids { - n.reportPeerIdentity(logger, pid) - } - } -} - -func (n *p2pNetwork) reportTopics(logger *zap.Logger) func() { - return func() { - topics := n.topicsCtrl.Topics() - nTopics := len(topics) - logger.Debug("connected topics", fields.Count(nTopics)) - for _, name := range topics { - n.reportTopicPeers(logger, name) - } - } -} - -func (n *p2pNetwork) reportTopicPeers(logger *zap.Logger, name string) { - peers, err := n.topicsCtrl.Peers(name) - if err != nil { - logger.Warn("could not get topic peers", fields.Topic(name), zap.Error(err)) - return - } - logger.Debug("topic peers status", fields.Topic(name), fields.Count(len(peers)), zap.Any("peers", peers)) - MetricsConnectedPeers.WithLabelValues(name).Set(float64(len(peers))) -} - -func (n *p2pNetwork) reportPeerIdentity(logger *zap.Logger, pid peer.ID) { - opPKHash, opID, nodeVersion, nodeType := unknown, unknown, unknown, unknown - ni := n.idx.NodeInfo(pid) - if ni != nil { - if ni.Metadata != nil { - nodeVersion = ni.Metadata.NodeVersion - } - nodeType = "operator" - if len(opPKHash) == 0 && nodeVersion != unknown { - nodeType = "exporter" - } - } - - if pubKey, ok := n.operatorPKHashToPKCache.Get(opPKHash); ok { - operatorData, found, opDataErr := n.nodeStorage.GetOperatorDataByPubKey(nil, pubKey) - if opDataErr == nil && found { - opID = strconv.FormatUint(operatorData.ID, 10) - } - } else { - operators, err := n.nodeStorage.ListOperators(nil, 0, 0) - if err != nil { - logger.Warn("failed to get all operators for reporting", zap.Error(err)) - } - - for _, operator := range operators { - pubKeyHash := format.OperatorID(operator.PublicKey) - n.operatorPKHashToPKCache.Set(pubKeyHash, operator.PublicKey) - if pubKeyHash == opPKHash { - opID = strconv.FormatUint(operator.ID, 10) - } - } - } - - state := n.idx.State(pid) - logger.Debug("peer identity", - fields.PeerID(pid), - zap.String("node_version", nodeVersion), - zap.String("operator_id", opID), - zap.String("state", state.String()), - ) - MetricsPeersIdentity.WithLabelValues(opPKHash, opID, nodeVersion, pid.String(), nodeType).Set(1) -} - -// -// func reportLastMsg(pid string) { -// MetricsPeerLastMsg.WithLabelValues(pid).Set(float64(timestamp())) -//} -// -// func timestamp() int64 { -// return time.Now().UnixNano() / int64(time.Millisecond) -//} diff --git a/network/p2p/observability.go b/network/p2p/observability.go new file mode 100644 index 0000000000..16c239271a --- /dev/null +++ b/network/p2p/observability.go @@ -0,0 +1,84 @@ +package p2pv1 + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/ssvlabs/ssv/network/peers" + "github.com/ssvlabs/ssv/network/topics" + "github.com/ssvlabs/ssv/observability" +) + +const ( + observabilityName = "github.com/ssvlabs/ssv/network/p2p" + observabilityNamespace = "ssv.p2p.peer" +) + +var ( + meter = otel.Meter(observabilityName) + + peersConnectedGauge = observability.NewMetric( + meter.Int64Gauge( + metricName("connected"), + metric.WithUnit("{peer}"), + metric.WithDescription("number of connected peers"))) + + peersByTopicCounter = observability.NewMetric( + meter.Int64Gauge( + metricName("by_topic"), + metric.WithUnit("{peer}"), + metric.WithDescription("number of connected peers per topic"))) + + peerIdentityGauge = observability.NewMetric( + meter.Int64Gauge( + metricName("identity"), + metric.WithUnit("{peer}"), + metric.WithDescription("describes identities of connected peers"))) +) + +func metricName(name string) string { + return fmt.Sprintf("%s.%s", observabilityNamespace, name) +} + +func recordPeerCount(ctx context.Context, host host.Host) func() { + return func() { + peers := host.Network().Peers() + peersConnectedGauge.Record(ctx, int64(len(peers))) + } +} + +func recordPeerCountPerTopic(ctx context.Context, ctrl topics.Controller) func() { + return func() { + for _, topicName := range ctrl.Topics() { + peers, err := ctrl.Peers(topicName) + if err != nil { + return + } + peersByTopicCounter.Record(ctx, int64(len(peers)), metric.WithAttributes(attribute.String("ssv.p2p.topic.name", topicName))) + } + } +} + +func recordPeerIdentities(ctx context.Context, host host.Host, index peers.Index) func() { + return func() { + peersByVersion := make(map[string]int64) + for _, pid := range host.Network().Peers() { + nodeVersion := "unknown" + ni := index.NodeInfo(pid) + if ni != nil { + if ni.Metadata != nil { + nodeVersion = ni.Metadata.NodeVersion + } + } + peersByVersion[nodeVersion]++ + } + for version, peerCount := range peersByVersion { + peerIdentityGauge.Record(ctx, peerCount, metric.WithAttributes(attribute.String("ssv.node.version", version))) + } + } +} diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 674ab65c94..c983c7d96c 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -51,7 +51,7 @@ const ( connManagerBalancingInterval = 3 * time.Minute connManagerBalancingTimeout = time.Minute peersReportingInterval = 60 * time.Second - peerIdentitiesReportingInterval = 5 * time.Minute + peerIdentitiesReportingInterval = time.Minute topicsReportingInterval = 180 * time.Second maximumIrrelevantPeersToDisconnect = 3 ) @@ -86,7 +86,6 @@ type p2pNetwork struct { connHandler connections.ConnHandler connGater connmgr.ConnectionGater trustedPeers []*peer.AddrInfo - metrics Metrics state int32 @@ -107,7 +106,7 @@ type p2pNetwork struct { } // New creates a new p2p network -func New(logger *zap.Logger, cfg *Config, mr Metrics) (*p2pNetwork, error) { +func New(logger *zap.Logger, cfg *Config) (*p2pNetwork, error) { ctx, cancel := context.WithCancel(cfg.Ctx) logger = logger.Named(logging.NameP2PNetwork) @@ -127,7 +126,6 @@ func New(logger *zap.Logger, cfg *Config, mr Metrics) (*p2pNetwork, error) { operatorPKHashToPKCache: hashmap.New[string, []byte](), operatorSigner: cfg.OperatorSigner, operatorDataStore: cfg.OperatorDataStore, - metrics: mr, } if err := n.parseTrustedPeers(); err != nil { return nil, err @@ -259,14 +257,12 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error { go n.startDiscovery(logger, connector) async.Interval(n.ctx, connManagerBalancingInterval, n.peersBalancing(logger)) - // don't report metrics in tests - if n.cfg.Metrics != nil { - async.Interval(n.ctx, peersReportingInterval, n.reportAllPeers(logger)) - async.Interval(n.ctx, peerIdentitiesReportingInterval, n.reportPeerIdentities(logger)) + async.Interval(n.ctx, peersReportingInterval, recordPeerCount(n.ctx, n.host)) - async.Interval(n.ctx, topicsReportingInterval, n.reportTopics(logger)) - } + async.Interval(n.ctx, peerIdentitiesReportingInterval, recordPeerIdentities(n.ctx, n.host, n.idx)) + + async.Interval(n.ctx, topicsReportingInterval, recordPeerCountPerTopic(n.ctx, n.topicsCtrl)) if err := n.subscribeToSubnets(logger); err != nil { return err @@ -291,7 +287,6 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() { // Check if it has the maximum number of connections currentCount := len(allPeers) if currentCount < n.cfg.MaxPeers { - _ = n.idx.GetSubnetsStats() // trigger metrics update return } diff --git a/network/p2p/p2p_pubsub.go b/network/p2p/p2p_pubsub.go index 68baf5126c..7a16156e42 100644 --- a/network/p2p/p2p_pubsub.go +++ b/network/p2p/p2p_pubsub.go @@ -16,9 +16,7 @@ import ( "github.com/ssvlabs/ssv/network" "github.com/ssvlabs/ssv/network/commons" "github.com/ssvlabs/ssv/network/records" - genesismessage "github.com/ssvlabs/ssv/protocol/genesis/message" "github.com/ssvlabs/ssv/protocol/genesis/ssv/genesisqueue" - "github.com/ssvlabs/ssv/protocol/v2/message" p2pprotocol "github.com/ssvlabs/ssv/protocol/v2/p2p" "github.com/ssvlabs/ssv/protocol/v2/ssv/queue" ) @@ -231,10 +229,8 @@ func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.C switch m := msg.ValidatorData.(type) { case *queue.SSVMessage: decodedMsg = m - metricsRouterIncoming.WithLabelValues(message.MsgTypeToString(m.MsgType)).Inc() case *genesisqueue.GenesisSSVMessage: decodedMsg = m - metricsRouterIncoming.WithLabelValues(genesismessage.MsgTypeToString(m.MsgType)).Inc() case nil: return errors.New("message was not decoded") default: diff --git a/network/p2p/test_utils.go b/network/p2p/test_utils.go index 8b4b79d6e4..3153b96996 100644 --- a/network/p2p/test_utils.go +++ b/network/p2p/test_utils.go @@ -14,7 +14,6 @@ import ( "golang.org/x/sync/errgroup" "github.com/ssvlabs/ssv/message/validation" - "github.com/ssvlabs/ssv/monitoring/metricsreporter" "github.com/ssvlabs/ssv/network" "github.com/ssvlabs/ssv/network/commons" p2pcommons "github.com/ssvlabs/ssv/network/commons" @@ -180,7 +179,6 @@ func (ln *LocalNet) NewTestP2pNetwork(ctx context.Context, nodeIndex uint64, key cfg.Ctx = ctx cfg.Subnets = "00000000000000000100000400000400" // calculated for topics 64, 90, 114; PAY ATTENTION for future test scenarios which use more than one eth-validator we need to make this field dynamically changing cfg.NodeStorage = nodeStorage - cfg.Metrics = nil cfg.MessageValidator = validation.New( networkconfig.TestNetwork, nodeStorage.ValidatorStore(), @@ -224,8 +222,7 @@ func (ln *LocalNet) NewTestP2pNetwork(ctx context.Context, nodeIndex uint64, key cfg.OperatorDataStore = operatordatastore.New(®istrystorage.OperatorData{ID: nodeIndex + 1}) - mr := metricsreporter.New() - p, err := New(logger, cfg, mr) + p, err := New(logger, cfg) if err != nil { return nil, err }