diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 7972bcb762..e75317f421 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -301,7 +301,7 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() { const maxPeersToDrop = 1 immunityQuota := len(connectedPeers) - maxPeersToDrop - protectedPeers := n.PeerProtection(connectedPeers, mySubnets, immunityQuota) + protectedPeers := n.PeerProtection(immunityQuota) for _, peer := range connectedPeers { if _, ok := protectedPeers[peer]; ok { n.libConnManager.Protect(peer, peers.ProtectedTag) @@ -320,36 +320,54 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() { // - Prefer peers that you have more shared subents with. // - Protect at most immunityQuota peers. func (n *p2pNetwork) PeerProtection( - myPeers []peer.ID, - mySubnets records.Subnets, immunityQuota int, ) map[peer.ID]struct{} { - commonSubnets := make(map[int]struct{}) - for _, p := range myPeers { - peerSubnets := n.idx.GetPeerSubnets(p) - for i, a := range mySubnets { - if a == 1 && peerSubnets[i] == 1 { - commonSubnets[i] = struct{}{} + protectedPeers := make(map[peer.ID]struct{}) + + tpcs := n.topicsCtrl.Topics() + peerz := make(map[string][]peer.ID, len(tpcs)) + for _, tpc := range tpcs { + var err error + peerz[tpc], err = n.topicsCtrl.Peers(tpc) + if err != nil { + n.interfaceLogger.Error("Cant get peers from topics, skipping to keep the network running", zap.Error(err)) + continue + } + } + + sharedTopics := func(peerID peer.ID) int { + shared := 0 + for _, tpc := range tpcs { + if slices.Contains(peerz[tpc], peerID) { + shared++ } } + return shared } - protectedPeers := make(map[peer.ID]struct{}) - for subnet, _ := range commonSubnets { - subnetPeers := n.idx.GetSubnetPeers(subnet) - slices.SortFunc(subnetPeers, func(a, b peer.ID) int { - x := len(records.SharedSubnets(mySubnets, n.idx.GetPeerSubnets(a), 0)) - y := len(records.SharedSubnets(mySubnets, n.idx.GetPeerSubnets(b), 0)) + // for each topic we have peers in, sort the peers by the numbers of topics they present in + // and protect the top 2 peers in each topic, if there's only one peer in that topic, always protect it. + for _, tpc := range tpcs { + peersInTopic := peerz[tpc] + if len(peersInTopic) == 0 { + continue + } + if len(peersInTopic) == 1 { + protectedPeers[peersInTopic[0]] = struct{}{} + immunityQuota-- + continue + } + + slices.SortFunc(peersInTopic, func(a, b peer.ID) int { + x := sharedTopics(a) + y := sharedTopics(b) return x - y // desc order }) - const minPeersPerSubnet = 2 - for i := 0; i < min(minPeersPerSubnet, len(subnetPeers)) && immunityQuota > 0; i++ { - peerID := subnetPeers[i] - if _, ok := protectedPeers[peerID]; ok { - continue - } - protectedPeers[peerID] = struct{}{} + + const minPeersPerTopic = 2 + for i := 0; i < min(minPeersPerTopic, len(peersInTopic)) && immunityQuota > 0; i++ { + protectedPeers[peersInTopic[i]] = struct{}{} immunityQuota-- } }