Skip to content

Commit

Permalink
more testing
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Dec 3, 2024
1 parent 3ec0d17 commit 0824ac2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 10 deletions.
10 changes: 5 additions & 5 deletions network/p2p/p2p_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ func (n *p2pNetwork) SetupServices(logger *zap.Logger) error {
if err := n.setupStreamCtrl(logger); err != nil {
return errors.Wrap(err, "could not setup stream controller")
}
if err := n.setupPeerServices(logger); err != nil {
return errors.Wrap(err, "could not setup peer services")
}
topicsController, err := n.setupPubsub(logger)
if err != nil {
return errors.Wrap(err, "could not setup topic controller")
}
if err := n.setupPeerServices(logger, topicsController); err != nil {
return errors.Wrap(err, "could not setup peer services")
}
if err := n.setupDiscovery(logger, topicsController); err != nil {
return errors.Wrap(err, "could not setup discovery service")
}
Expand All @@ -172,7 +172,7 @@ func (n *p2pNetwork) setupStreamCtrl(logger *zap.Logger) error {
return nil
}

func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error {
func (n *p2pNetwork) setupPeerServices(logger *zap.Logger, topicsController topics.Controller) error {
libPrivKey, err := p2pcommons.ECDSAPrivToInterface(n.cfg.NetworkPrivateKey)
if err != nil {
return err
Expand Down Expand Up @@ -231,7 +231,7 @@ func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error {
n.host.SetStreamHandler(peers.NodeInfoProtocol, handshaker.Handler(logger))
logger.Debug("handshaker is ready")

n.connHandler = connections.NewConnHandler(n.ctx, handshaker, subnetsProvider, n.idx, n.idx, n.idx, n.metrics)
n.connHandler = connections.NewConnHandler(n.ctx, handshaker, subnetsProvider, n.idx, n.idx, n.idx, topicsController, n.metrics)
n.host.Network().Notify(n.connHandler.Handle(logger))
logger.Debug("connection handler is ready")

Expand Down
56 changes: 56 additions & 0 deletions network/peers/connections/conn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package connections

import (
"context"
"fmt"
"github.com/ssvlabs/ssv/network/discovery"
"github.com/ssvlabs/ssv/network/topics"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -32,6 +35,7 @@ type connHandler struct {
subnetsIndex peers.SubnetsIndex
connIdx peers.ConnectionIndex
peerInfos peers.PeerInfoIndex
topicsCtrl topics.Controller
metrics Metrics
}

Expand All @@ -43,6 +47,7 @@ func NewConnHandler(
subnetsIndex peers.SubnetsIndex,
connIdx peers.ConnectionIndex,
peerInfos peers.PeerInfoIndex,
topicsController topics.Controller,
mr Metrics,
) ConnHandler {
return &connHandler{
Expand All @@ -52,6 +57,7 @@ func NewConnHandler(
subnetsIndex: subnetsIndex,
connIdx: connIdx,
peerInfos: peerInfos,
topicsCtrl: topicsController,
metrics: mr,
}
}
Expand Down Expand Up @@ -145,6 +151,10 @@ func (ch *connHandler) Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle {
if !ch.sharesEnoughSubnets(logger, conn) {
return errors.New("peer doesn't share enough subnets")
}
if !ch.helpfulPeer(logger, conn) {
return errors.New("peer doesn't help us much (with dead/solo subnets)")
}

return nil
}

Expand Down Expand Up @@ -401,3 +411,49 @@ func (ch *connHandler) sharesEnoughSubnets(logger *zap.Logger, conn libp2pnetwor

return len(shared) == 1
}

// TODO - need a better (smart) way to do it, but for now try to connect only
// peers that help with getting rid of dead subnets
func (ch *connHandler) helpfulPeer(logger *zap.Logger, conn libp2pnetwork.Conn) bool {
pid := conn.RemotePeer()

helpfulPeer := false
peerSubnets := ch.subnetsIndex.GetPeerSubnets(pid)
if len(peerSubnets) == 0 {
// no subnets for this peer
return false
}

subscribedTopics := ch.topicsCtrl.Topics()
for _, topic := range subscribedTopics {
topicPeers, err := ch.topicsCtrl.Peers(topic)
if err != nil {
panic(fmt.Sprintf("could not get subscribed topic peers: %s", err)) // TODO
}

//if len(topicPeers) >= 1 {
// continue // this topic has enough peers - TODO (1 is not enough tho)
//}
// TODO - testing 0 to see if this even works
if len(topicPeers) >= 0 {
continue // this topic has enough peers - TODO (1 is not enough tho)
}

// we've got a dead subnet here, see if this peer can help with that
subnet, err := strconv.Atoi(topic)
if err != nil {
panic(fmt.Sprintf("could not convert topic name to subnet id: %s", err)) // TODO
}
peerSubnet := peerSubnets[subnet]
if peerSubnet != 1 {
continue // peer doesn't have this subnet either, lets check other dead subnets we have
}
helpfulPeer = true // this peer helps with at least 1 dead subnet for us
break
}
if !helpfulPeer {
return false
}

return true
}
9 changes: 4 additions & 5 deletions network/topics/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,10 @@ func NewPubSub(ctx context.Context, logger *zap.Logger, cfg *PubSubConfig, metri
pubsub.WithSubscriptionFilter(sf),
pubsub.WithGossipSubParams(params.GossipSubParams()),
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
// TODO
pubsub.WithPeerFilter(func(pid peer.ID, topic string) bool {
logger.Debug("pubsubTrace: filtering peer", zap.String("id", pid.String()), zap.String("topic", topic))
return true
}),
//pubsub.WithPeerFilter(func(pid peer.ID, topic string) bool {
// logger.Debug("pubsubTrace: filtering peer", zap.String("id", pid.String()), zap.String("topic", topic))
// return true
//}),
}

if cfg.Discovery != nil {
Expand Down

0 comments on commit 0824ac2

Please sign in to comment.