Skip to content

Commit

Permalink
adjustments, track disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Dec 2, 2024
1 parent 8ee34ca commit ff8a911
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 47 deletions.
55 changes: 35 additions & 20 deletions network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"github.com/ssvlabs/ssv/networkconfig"
)

// metrics to keep track of discovered/connected/disconnected peers (per subnet)
var (
Discovered1stSubnets = hashmap.New[int, int64]()
DiscoveredSubnets = hashmap.New[int, []peer.ID]()
Connected1stSubnets = hashmap.New[int, int64]()
ConnectedSubnets = hashmap.New[int, []peer.ID]()
Discovered1stTimeSubnets = hashmap.New[int, int64]()
Connected1stTimeSubnets = hashmap.New[int, int64]()
DiscoveredSubnets = hashmap.New[int, []peer.ID]()
ConnectedSubnets = hashmap.New[int, []peer.ID]()
)

var (
Expand Down Expand Up @@ -197,33 +198,47 @@ func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error {
return errors.New("zero subnets")
}

// TODO - maybe get subnetsBefore and only look at the diff
subnetsBefore := dvs.subnetsIdx.GetPeerSubnets(e.AddrInfo.ID)
dvs.subnetsIdx.UpdatePeerSubnets(e.AddrInfo.ID, nodeSubnets)
subnetsAfter := dvs.subnetsIdx.GetPeerSubnets(e.AddrInfo.ID)
for subnet := range subnetsAfter {
_, ok := Discovered1stSubnets.Get(subnet)
for subnet, subnetActive := range subnetsAfter {
if subnetActive != 1 {
continue // not interested in subnet that's not active (or no longer active)
}
subnetActiveBefore := subnetsBefore[subnet]
if subnetActiveBefore == 1 {
continue // not interested in subnet that we've discovered & recorded as such for this peer previously
}

_, ok := Discovered1stTimeSubnets.Get(subnet)
if !ok {
logger.Debug(
"discovered subnet 1st time!",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(e.AddrInfo.ID)),
)
Discovered1stSubnets.Set(subnet, 1)
Discovered1stTimeSubnets.Set(subnet, 1)
}
peerIDs, ok := DiscoveredSubnets.Get(subnet)
if ok {
for _, pID := range peerIDs {
if pID == e.AddrInfo.ID {
logger.Debug(
"already discovered this subnet through this peer",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(pID)),
)
break
}
peerIDs, _ := DiscoveredSubnets.Get(subnet)
peerAlreadyDiscoveredForSubnet := false
for _, peerID := range peerIDs {
if peerID == e.AddrInfo.ID {
// TODO - it's fine to get this warning occasionally, I guess ? Not sure what
// it would mean though ... a peer who has advertised he has a subnet, but then
// we discovered that he doesn't, and then peer says (still/again) that does
// work with that subnet ...
logger.Debug(
"already discovered this subnet through this peer",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(peerID)),
)
peerAlreadyDiscoveredForSubnet = true
break
}
}
DiscoveredSubnets.Set(subnet, append(peerIDs, e.AddrInfo.ID))
if !peerAlreadyDiscoveredForSubnet {
DiscoveredSubnets.Set(subnet, append(peerIDs, e.AddrInfo.ID))
}
}

// Filters
Expand Down
6 changes: 3 additions & 3 deletions network/p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ func (n *p2pNetwork) reportPeerIdentities(logger *zap.Logger) func() {

func (n *p2pNetwork) reportTopics(logger *zap.Logger) func() {
return func() {
topics := n.topicsCtrl.Topics()
nTopics := len(topics)
subscribedTopics := n.topicsCtrl.Topics()
nTopics := len(subscribedTopics)
logger.Debug("connected topics", fields.Count(nTopics))
for _, name := range topics {
for _, name := range subscribedTopics {
n.reportTopicPeers(logger, name)
}
}
Expand Down
40 changes: 36 additions & 4 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,42 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
go n.startDiscovery(logger, connector)

async.Interval(n.ctx, 1*time.Minute, func() {
logger.Info("discovered subnets 1st", zap.Int("length", discovery.Discovered1stSubnets.SlowLen()))
logger.Info("discovered subnets", zap.Int("length", discovery.DiscoveredSubnets.SlowLen()))
logger.Info("connected subnets 1st", zap.Int("length", discovery.Connected1stSubnets.SlowLen()))
logger.Info("connected subnets", zap.Int("length", discovery.ConnectedSubnets.SlowLen()))
logger.Debug("discovered subnets 1st", zap.Int("total", discovery.Discovered1stTimeSubnets.SlowLen()))
logger.Debug("connected subnets 1st", zap.Int("total", discovery.Connected1stTimeSubnets.SlowLen()))

// check how peer-discovery is doing
discovery.DiscoveredSubnets.Range(func(subnet int, peerIDs []peer.ID) bool {
const warningThresholdTooManyDiscoveredPeers = 10
if len(peerIDs) >= warningThresholdTooManyDiscoveredPeers {
// this means we are discovering peers, but not actually connecting to them,
// it's fine for this value to grow over time - but it shouldn't grow too fast,
// for now just warn
logger.Debug(
fmt.Sprintf("got >= %d discovered peers for subnet", warningThresholdTooManyDiscoveredPeers),
zap.Int("subnet", subnet),
zap.Int("discovered_peers_total", len(peerIDs)),
)
}
return true
})

// check how peer-connecting is doing
deadSubnetsCnt := 0
soloSubnetsCnt := 0
discovery.ConnectedSubnets.Range(func(subnet int, peerIDs []peer.ID) bool {
if len(peerIDs) == 1 {
soloSubnetsCnt++
}
if len(peerIDs) == 0 {
deadSubnetsCnt++
}
return true
})
logger.Debug(
"dead/solo subnets report",
zap.Int("solo_subnets_total", soloSubnetsCnt),
zap.Int("dead_subnets_total", deadSubnetsCnt),
)
})

async.Interval(n.ctx, connManagerBalancingInterval, n.peersBalancing(logger))
Expand Down
127 changes: 108 additions & 19 deletions network/peers/connections/conn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,10 @@ func (ch *connHandler) Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle {
ch.peerInfos.SetState(conn.RemotePeer(), peers.StateConnected)
logger.Debug("peer connected")

// see if this peer is already connected (we probably shouldn't encounter this often
// if at all)
// see if this peer is already connected, we probably shouldn't encounter this often
// if at all because this means we are creating duplicate (unnecessary) peer connections
// and effectively reduce overall peer diversity (because we can't exceed pre-configured
// max peer limit)
discovery.ConnectedSubnets.Range(func(subnet int, ids []peer.ID) bool {
for _, id := range ids {
if id == conn.RemotePeer() {
Expand All @@ -210,35 +212,72 @@ func (ch *connHandler) Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle {
return true
})
// see if we can upgrade any `discovered` subnets to `connected` through this peer
discovery.DiscoveredSubnets.Range(func(subnet int, ids []peer.ID) bool {
otherPeers := make([]peer.ID, 0, len(ids))
for _, id := range ids {
if id == conn.RemotePeer() {
discovery.Connected1stSubnets.Get(subnet)
_, ok := discovery.Connected1stSubnets.Get(subnet)
//
// unexpectedPeer helps us track whether the peer connection we are making is due
// to us discovering this peer previously (due to looking for peers with subnets
// we are interested in), or whether we are connecting to some "unexpected" peer
unexpectedPeer := true
discovery.DiscoveredSubnets.Range(func(subnet int, peerIDs []peer.ID) bool {
otherPeers := make([]peer.ID, 0, len(peerIDs))
for _, peerID := range peerIDs {
if peerID == conn.RemotePeer() {
discovery.Connected1stTimeSubnets.Get(subnet)
_, ok := discovery.Connected1stTimeSubnets.Get(subnet)
if !ok {
logger.Debug(
"connected subnet 1st time!",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(id)),
zap.String("peer_id", string(peerID)),
)
discovery.Connected1stSubnets.Set(subnet, 1)
discovery.Connected1stTimeSubnets.Set(subnet, 1)
}

connectedPeers, _ := discovery.ConnectedSubnets.Get(subnet)
// peerAlreadyContributesToSubnet helps us track and not double-count peer
// contributions to subnets (discovery.ConnectedSubnets contains unique
// list of peers per subnet)
peerAlreadyContributesToSubnet := false
for _, connectedPeer := range connectedPeers {
if connectedPeer == peerID {
peerAlreadyContributesToSubnet = true
break
}
}
if !peerAlreadyContributesToSubnet {
logger.Debug(
"connecting subnet peer (since he has a subnet we are interested in - as we discovered previously)",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(peerID)),
)
connectedPeers = append(connectedPeers, peerID)
discovery.ConnectedSubnets.Set(subnet, connectedPeers)
}
logger.Debug(
"connecting peer through a subnet discovered previously",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(id)),
)
continue
}
otherPeers = append(otherPeers, id)
otherPeers = append(otherPeers, peerID)
}
// exclude this peer from discovered since we've just connected to him, this
// limits DiscoveredSubnets map to only those peers whom we didn't/couldn't
// connect to

if len(otherPeers) == len(peerIDs) {
return true // this discovered subnet is not related to connected peer
}

unexpectedPeer = false // this peer connection is happening due to our subnet discovery process

// exclude this peer from discovered list since we've just connected to him, this
// limits DiscoveredSubnets map to only those discovered peers whom we didn't/couldn't
// connect to yet - this means DiscoveredSubnets map shouldn't grow big for ANY of the
// subnets it contains because that would mean (for such a subnet) we are discovering
// peers but don't connect to them for some reason.
discovery.DiscoveredSubnets.Set(subnet, otherPeers)

return true
})
if unexpectedPeer {
logger.Debug(
"connected peer that doesn't belong to ANY of recently discovered subnets",
zap.String("peer_id", string(conn.RemotePeer())),
)
}
}()
},
DisconnectedF: func(net libp2pnetwork.Network, conn libp2pnetwork.Conn) {
Expand All @@ -257,6 +296,56 @@ func (ch *connHandler) Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle {

logger := connLogger(conn)
logger.Debug("peer disconnected")

// unexpectedPeer helps us track whether the peer we've disconnected is a peer
// that's been connected previously through the process of subnet-discovery,
// it's just a sanity check
unexpectedPeer := true
discovery.ConnectedSubnets.Range(func(subnet int, peerIDs []peer.ID) bool {
otherPeers := make([]peer.ID, 0, len(peerIDs))
for _, peerID := range peerIDs {
if peerID == conn.RemotePeer() {
continue
}
otherPeers = append(otherPeers, peerID)
}

if len(otherPeers) == len(peerIDs) {
return true // this subnet was not affected by disconnected peer
}

unexpectedPeer = false // this peer disconnect is happening due to our subnet discovery process

// exclude this peer from connected list since we've just disconnected him, this
// limits ConnectedSubnets map to only those peers whom we still have active connection
// with - this means ConnectedSubnets map shouldn't grow small for ANY of the
// subnets it contains because that would mean (for such a subnet) we are getting
// close to killing previously alive subnet.
discovery.ConnectedSubnets.Set(subnet, otherPeers)

if len(otherPeers) == 1 {
logger.Debug(
"disconnecting peer resulted in Solo subnet",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(conn.RemotePeer())),
)
}
if len(otherPeers) == 0 {
logger.Debug(
"disconnecting peer resulted in Dead subnet",
zap.Int("subnet_id", subnet),
zap.String("peer_id", string(conn.RemotePeer())),
)
}

return true
})
if unexpectedPeer {
logger.Debug(
"disconnected peer that doesn't belong to ANY of connected subnets",
zap.String("peer_id", string(conn.RemotePeer())),
)
}
},
}
}
Expand Down
3 changes: 2 additions & 1 deletion network/peers/subnets.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package peers

import (
"github.com/ssvlabs/ssv/network/commons"
"sync"

"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -110,7 +111,7 @@ func (si *subnetsIndex) GetPeerSubnets(id peer.ID) records.Subnets {

subnets, ok := si.peerSubnets[id]
if !ok {
return nil
return make(records.Subnets, commons.SubnetsCount)
}
cp := make(records.Subnets, len(subnets))
copy(cp, subnets)
Expand Down

0 comments on commit ff8a911

Please sign in to comment.