diff --git a/cli/operator/node.go b/cli/operator/node.go index 39513ce0fe..ed37573a5f 100644 --- a/cli/operator/node.go +++ b/cli/operator/node.go @@ -43,7 +43,9 @@ import ( "github.com/ssvlabs/ssv/monitoring/metrics" "github.com/ssvlabs/ssv/monitoring/metricsreporter" "github.com/ssvlabs/ssv/network" + networkcommons "github.com/ssvlabs/ssv/network/commons" p2pv1 "github.com/ssvlabs/ssv/network/p2p" + "github.com/ssvlabs/ssv/network/records" "github.com/ssvlabs/ssv/networkconfig" "github.com/ssvlabs/ssv/nodeprobe" "github.com/ssvlabs/ssv/operator" @@ -221,11 +223,9 @@ var StartNodeCmd = &cobra.Command{ signatureVerifier := signatureverifier.NewSignatureVerifier(nodeStorage) - validatorStore := nodeStorage.ValidatorStore() - messageValidator := validation.New( networkConfig, - validatorStore, + nodeStorage.ValidatorStore(), dutyStore, signatureVerifier, validation.WithLogger(logger), @@ -293,7 +293,7 @@ var StartNodeCmd = &cobra.Command{ validatorCtrl := validator.NewController(logger, cfg.SSVOptions.ValidatorOptions) cfg.SSVOptions.ValidatorController = validatorCtrl - cfg.SSVOptions.ValidatorStore = validatorStore + cfg.SSVOptions.ValidatorStore = nodeStorage.ValidatorStore() operatorNode = operator.New(logger, cfg.SSVOptions, slotTickerProvider, storageMap) @@ -338,14 +338,45 @@ var StartNodeCmd = &cobra.Command{ nodeProber.AddNode("event syncer", eventSyncer) } - cfg.P2pNetworkConfig.GetValidatorStats = func() (uint64, uint64, uint64, error) { - return validatorCtrl.GetValidatorStats() - } - if err := p2pNetwork.Setup(logger); err != nil { - logger.Fatal("failed to setup network", zap.Error(err)) - } - if err := p2pNetwork.Start(logger); err != nil { - logger.Fatal("failed to start network", zap.Error(err)) + // Increase MaxPeers if the operator is subscribed to many subnets. + // TODO: use OperatorCommittees when it's fixed. + if cfg.P2pNetworkConfig.DynamicMaxPeers { + var ( + baseMaxPeers = 60 + maxPeersLimit = cfg.P2pNetworkConfig.DynamicMaxPeersLimit + idealPeersPerSubnet = 3 + ) + start := time.Now() + myValidators := nodeStorage.ValidatorStore().OperatorValidators(operatorData.ID) + mySubnets := make(records.Subnets, networkcommons.SubnetsCount) + myActiveSubnets := 0 + for _, v := range myValidators { + subnet := networkcommons.CommitteeSubnet(v.CommitteeID()) + if mySubnets[subnet] == 0 { + mySubnets[subnet] = 1 + myActiveSubnets++ + } + } + idealMaxPeers := min(baseMaxPeers+idealPeersPerSubnet*myActiveSubnets, maxPeersLimit) + if cfg.P2pNetworkConfig.MaxPeers < idealMaxPeers { + logger.Warn("increasing MaxPeers to match the operator's subscribed subnets", + zap.Int("old_max_peers", cfg.P2pNetworkConfig.MaxPeers), + zap.Int("new_max_peers", idealMaxPeers), + zap.Int("subscribed_subnets", myActiveSubnets), + zap.Duration("took", time.Since(start)), + ) + cfg.P2pNetworkConfig.MaxPeers = idealMaxPeers + } + + cfg.P2pNetworkConfig.GetValidatorStats = func() (uint64, uint64, uint64, error) { + return validatorCtrl.GetValidatorStats() + } + if err := p2pNetwork.Setup(logger); err != nil { + logger.Fatal("failed to setup network", zap.Error(err)) + } + if err := p2pNetwork.Start(logger); err != nil { + logger.Fatal("failed to start network", zap.Error(err)) + } } if cfg.SSVAPIPort > 0 { diff --git a/network/p2p/config.go b/network/p2p/config.go index c659b610ea..37a2710c17 100644 --- a/network/p2p/config.go +++ b/network/p2p/config.go @@ -46,8 +46,11 @@ type Config struct { RequestTimeout time.Duration `yaml:"RequestTimeout" env:"P2P_REQUEST_TIMEOUT" env-default:"10s"` MaxBatchResponse uint64 `yaml:"MaxBatchResponse" env:"P2P_MAX_BATCH_RESPONSE" env-default:"25" env-description:"Maximum number of returned objects in a batch"` - MaxPeers int `yaml:"MaxPeers" env:"P2P_MAX_PEERS" env-default:"60" env-description:"Connected peers limit for connections"` - TopicMaxPeers int `yaml:"TopicMaxPeers" env:"P2P_TOPIC_MAX_PEERS" env-default:"10" env-description:"Connected peers limit per pubsub topic"` + + MaxPeers int `yaml:"MaxPeers" env:"P2P_MAX_PEERS" env-default:"60" env-description:"Connected peers limit. At the time being, this may be increased by DynamicMaxPeers until that is phased out."` + DynamicMaxPeers bool `yaml:"DynamicMaxPeers" env:"P2P_DYNAMIC_MAX_PEERS" env-default:"true" env-description:"If true, MaxPeers will grow with the operator's number of committees."` + DynamicMaxPeersLimit int `yaml:"DynamicMaxPeersLimit" env:"P2P_DYNAMIC_MAX_PEERS_LIMIT" env-default:"150" env-description:"Limit for MaxPeers when DynamicMaxPeers is enabled."` + TopicMaxPeers int `yaml:"TopicMaxPeers" env:"P2P_TOPIC_MAX_PEERS" env-default:"10" env-description:"Connected peers limit per pubsub topic"` // Subnets is a static bit list of subnets that this node will register upon start. Subnets string `yaml:"Subnets" env:"SUBNETS" env-description:"Hex string that represents the subnets that this node will join upon start"` diff --git a/network/p2p/metrics.go b/network/p2p/metrics.go index d73e36262e..f8cfea2cbb 100644 --- a/network/p2p/metrics.go +++ b/network/p2p/metrics.go @@ -1,6 +1,7 @@ package p2pv1 import ( + "sort" "strconv" "github.com/ssvlabs/ssv/logging/fields" @@ -80,22 +81,47 @@ func (n *p2pNetwork) reportPeerIdentities(logger *zap.Logger) func() { func (n *p2pNetwork) reportTopics(logger *zap.Logger) func() { return func() { topics := n.topicsCtrl.Topics() - nTopics := len(topics) - logger.Debug("connected topics", fields.Count(nTopics)) + logger.Debug("connected topics", fields.Count(len(topics))) + + subnetPeerCounts := []int{} + deadSubnets := 0 + unhealthySubnets := 0 for _, name := range topics { - n.reportTopicPeers(logger, name) + count := n.reportTopicPeers(logger, name) + subnetPeerCounts = append(subnetPeerCounts, count) + + if count == 0 { + deadSubnets++ + } else if count <= 2 { + unhealthySubnets++ + } } + + // Calculate min, median, max + sort.Ints(subnetPeerCounts) + min := subnetPeerCounts[0] + median := subnetPeerCounts[len(subnetPeerCounts)/2] + max := subnetPeerCounts[len(subnetPeerCounts)-1] + + logger.Debug("topic peers distribution", + zap.Int("min", min), + zap.Int("median", median), + zap.Int("max", max), + zap.Int("dead_subnets", deadSubnets), + zap.Int("unhealthy_subnets", unhealthySubnets), + ) } } -func (n *p2pNetwork) reportTopicPeers(logger *zap.Logger, name string) { +func (n *p2pNetwork) reportTopicPeers(logger *zap.Logger, name string) (peerCount int) { peers, err := n.topicsCtrl.Peers(name) if err != nil { logger.Warn("could not get topic peers", fields.Topic(name), zap.Error(err)) - return + return 0 } logger.Debug("topic peers status", fields.Topic(name), fields.Count(len(peers)), zap.Any("peers", peers)) MetricsConnectedPeers.WithLabelValues(name).Set(float64(len(peers))) + return len(peers) } func (n *p2pNetwork) reportPeerIdentity(logger *zap.Logger, pid peer.ID) {