Skip to content

Commit

Permalink
basic metrics draft
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Dec 2, 2024
1 parent ee06bc6 commit ab3b889
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 13 deletions.
48 changes: 41 additions & 7 deletions network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/ssvlabs/ssv/utils/hashmap"
"net"
"time"

Expand All @@ -21,6 +22,13 @@ import (
"github.com/ssvlabs/ssv/networkconfig"
)

var (
Discovered1stSubnets = hashmap.New[int, int64]()
DiscoveredSubnets = hashmap.New[int, []peer.ID]()
Connected1stSubnets = hashmap.New[int, int64]()
ConnectedSubnets = hashmap.New[int, []peer.ID]()
)

var (
defaultDiscoveryInterval = time.Millisecond * 1
publishENRTimeout = time.Minute
Expand Down Expand Up @@ -70,13 +78,12 @@ type DiscV5Service struct {
func newDiscV5Service(pctx context.Context, logger *zap.Logger, discOpts *Options) (Service, error) {
ctx, cancel := context.WithCancel(pctx)
dvs := DiscV5Service{
ctx: ctx,
cancel: cancel,
conns: discOpts.ConnIndex,
subnetsIdx: discOpts.SubnetsIdx,
networkConfig: discOpts.NetworkConfig,
subnets: discOpts.DiscV5Opts.Subnets,
publishLock: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
conns: discOpts.ConnIndex,
subnetsIdx: discOpts.SubnetsIdx,
subnets: discOpts.DiscV5Opts.Subnets,
publishLock: make(chan struct{}, 1),
}

logger.Debug("configuring discv5 discovery", zap.Any("discOpts", discOpts))
Expand Down Expand Up @@ -189,7 +196,34 @@ func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error {
return errors.New("zero subnets")
}

// TODO - maybe get subnetsBefore and only look at the diff
dvs.subnetsIdx.UpdatePeerSubnets(e.AddrInfo.ID, nodeSubnets)
subnetsAfter := dvs.subnetsIdx.GetPeerSubnets(e.AddrInfo.ID)
for subnet := range subnetsAfter {
_, ok := Discovered1stSubnets.Get(subnet)
if !ok {
logger.Debug(
"discovered subnet 1st time!",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(e.AddrInfo.ID)),
)
Discovered1stSubnets.Set(subnet, 1)
}
peerIDs, ok := DiscoveredSubnets.Get(subnet)
if ok {
for _, pID := range peerIDs {
if pID == e.AddrInfo.ID {
logger.Debug(
"already discovered this subnet through this peer",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(pID)),
)
break
}
}
}
DiscoveredSubnets.Set(subnet, append(peerIDs, e.AddrInfo.ID))
}

// Filters
if !dvs.limitNodeFilter(e.Node) {
Expand Down
13 changes: 7 additions & 6 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,13 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {

go n.startDiscovery(logger, connector)

async.Interval(n.ctx, 1*time.Minute, func() {
logger.Info("discovered subnets 1st", zap.Int("length", discovery.Discovered1stSubnets.SlowLen()))
logger.Info("discovered subnets", zap.Int("length", discovery.DiscoveredSubnets.SlowLen()))
logger.Info("connected subnets 1st", zap.Int("length", discovery.Connected1stSubnets.SlowLen()))
logger.Info("connected subnets", zap.Int("length", discovery.ConnectedSubnets.SlowLen()))
})

async.Interval(n.ctx, connManagerBalancingInterval, n.peersBalancing(logger))
// don't report metrics in tests
if n.cfg.Metrics != nil {
Expand Down Expand Up @@ -377,12 +384,6 @@ func (n *p2pNetwork) PeerProtection(allPeers []peer.ID, mySubnets records.Subnet
// it will try to bootstrap discovery service, and inject a connect function.
// the connect function checks if we can connect to the given peer and if so passing it to the backoff connector.
func (n *p2pNetwork) startDiscovery(logger *zap.Logger, connector chan peer.AddrInfo) {
discoveredPeers := make(chan peer.AddrInfo, connectorQueueSize)
go func() {
ctx, cancel := context.WithCancel(n.ctx)
defer cancel()
n.backoffConnector.Connect(ctx, discoveredPeers)
}()
err := tasks.Retry(func() error {
return n.disc.Bootstrap(logger, func(e discovery.PeerEvent) {
if !n.idx.CanConnect(e.AddrInfo.ID) {
Expand Down
47 changes: 47 additions & 0 deletions network/peers/connections/conn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connections

import (
"context"
"github.com/ssvlabs/ssv/network/discovery"
"sync"
"time"

Expand Down Expand Up @@ -192,6 +193,52 @@ func (ch *connHandler) Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle {
metricsConnections.Inc()
ch.peerInfos.SetState(conn.RemotePeer(), peers.StateConnected)
logger.Debug("peer connected")

// see if this peer is already connected (we probably shouldn't encounter this often
// if at all)
discovery.ConnectedSubnets.Range(func(subnet int, ids []peer.ID) bool {
for _, id := range ids {
if id == conn.RemotePeer() {
logger.Debug(
"peer is already connected through subnet discovered previously",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(id)),
)
return false // stop iteration, found what we are looking for
}
}
return true
})
// see if we can upgrade any `discovered` subnets to `connected` through this peer
discovery.DiscoveredSubnets.Range(func(subnet int, ids []peer.ID) bool {
otherPeers := make([]peer.ID, 0, len(ids))
for _, id := range ids {
if id == conn.RemotePeer() {
discovery.Connected1stSubnets.Get(subnet)
_, ok := discovery.Connected1stSubnets.Get(subnet)
if !ok {
logger.Debug(
"connected subnet 1st time!",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(id)),
)
discovery.Connected1stSubnets.Set(subnet, 1)
}
logger.Debug(
"connecting peer through a subnet discovered previously",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(id)),
)
continue
}
otherPeers = append(otherPeers, id)
}
// exclude this peer from discovered since we've just connected to him, this
// limits DiscoveredSubnets map to only those peers whom we didn't/couldn't
// connect to
discovery.DiscoveredSubnets.Set(subnet, otherPeers)
return true
})
}()
},
DisconnectedF: func(net libp2pnetwork.Network, conn libp2pnetwork.Conn) {
Expand Down

0 comments on commit ab3b889

Please sign in to comment.