Skip to content

Commit

Permalink
Re-design TestStartDiscV5_DiscoverPeersWithSubnets test (#13766)
Browse files Browse the repository at this point in the history
* `Test_AttSubnets`: Factorize.

* `filterPeerForAttSubnet`: `O(n)` ==> `O(1)`

* `FindPeersWithSubnet`: Optimize.

* `TestStartDiscV5_DiscoverPeersWithSubnets`: Complete re-design.

* `broadcastAttestation`: User `log.WithFields`.

* `filterPeer`: Refactor comments.

* Make deepsource happy.

* `TestStartDiscV5_FindPeersWithSubnet`: Add context cancellation.

Add some notes on `FindPeersWithSubnet` about
this limitation as well.
  • Loading branch information
nalepae authored Mar 20, 2024
1 parent b692722 commit fca1adb
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 190 deletions.
18 changes: 11 additions & 7 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -68,7 +69,7 @@ func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *
}

// Non-blocking broadcast, with attempts to discover a subnet peer if none available.
go s.broadcastAttestation(ctx, subnet, att, forkDigest)
go s.internalBroadcastAttestation(ctx, subnet, att, forkDigest)

return nil
}
Expand All @@ -94,8 +95,8 @@ func (s *Service) BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint
return nil
}

func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation, forkDigest [4]byte) {
_, span := trace.StartSpan(ctx, "p2p.broadcastAttestation")
func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation, forkDigest [4]byte) {
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastAttestation")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.

Expand Down Expand Up @@ -137,7 +138,10 @@ func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *
// acceptable threshold, we exit early and do not broadcast it.
currSlot := slots.CurrentSlot(uint64(s.genesisTime.Unix()))
if att.Data.Slot+params.BeaconConfig().SlotsPerEpoch < currSlot {
log.Warnf("Attestation is too old to broadcast, discarding it. Current Slot: %d , Attestation Slot: %d", currSlot, att.Data.Slot)
log.WithFields(logrus.Fields{
"attestationSlot": att.Data.Slot,
"currentSlot": currSlot,
}).Warning("Attestation is too old to broadcast, discarding it")
return
}

Expand Down Expand Up @@ -218,13 +222,13 @@ func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.
}

// Non-blocking broadcast, with attempts to discover a subnet peer if none available.
go s.broadcastBlob(ctx, subnet, blob, forkDigest)
go s.internalBroadcastBlob(ctx, subnet, blob, forkDigest)

return nil
}

func (s *Service) broadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte) {
_, span := trace.StartSpan(ctx, "p2p.broadcastBlob")
func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte) {
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastBlob")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.

Expand Down
33 changes: 22 additions & 11 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,58 +277,69 @@ func (s *Service) startDiscoveryV5(
// filterPeer validates each node that we retrieve from our dht. We
// try to ascertain that the peer can be a valid protocol peer.
// Validity Conditions:
// 1. The local node is still actively looking for peers to
// connect to.
// 2. Peer has a valid IP and TCP port set in their enr.
// 3. Peer hasn't been marked as 'bad'
// 4. Peer is not currently active or connected.
// 5. Peer is ready to receive incoming connections.
// 6. Peer's fork digest in their ENR matches that of
// 1. Peer has a valid IP and TCP port set in their enr.
// 2. Peer hasn't been marked as 'bad'.
// 3. Peer is not currently active or connected.
// 4. Peer is ready to receive incoming connections.
// 5. Peer's fork digest in their ENR matches that of
// our localnodes.
func (s *Service) filterPeer(node *enode.Node) bool {
// Ignore nil node entries passed in.
if node == nil {
return false
}
// ignore nodes with no ip address stored.

// Ignore nodes with no IP address stored.
if node.IP() == nil {
return false
}
// do not dial nodes with their tcp ports not set

// Ignore nodes with their TCP ports not set.
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Debug("Could not retrieve tcp port")
}
return false
}

peerData, multiAddr, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Debug("Could not convert to peer data")
return false
}

// Ignore bad nodes.
if s.peers.IsBad(peerData.ID) {
return false
}

// Ignore nodes that are already active.
if s.peers.IsActive(peerData.ID) {
return false
}

// Ignore nodes that are already connected.
if s.host.Network().Connectedness(peerData.ID) == network.Connected {
return false
}

// Ignore nodes that are not ready to receive incoming connections.
if !s.peers.IsReadyToDial(peerData.ID) {
return false
}

// Ignore nodes that don't match our fork digest.
nodeENR := node.Record()
// Decide whether or not to connect to peer that does not
// match the proper fork ENR data with our local node.
if s.genesisValidatorsRoot != nil {
if err := s.compareForkENR(nodeENR); err != nil {
log.WithError(err).Trace("Fork ENR mismatches between peer and local node")
return false
}
}

// Add peer to peer handler.
s.peers.Add(nodeENR, peerData.ID, multiAddr, network.DirUnknown)

return true
}

Expand Down
39 changes: 20 additions & 19 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ const syncLockerVal = 100
const blobSubnetLockerVal = 110

// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then we try to connect
// with those peers. This method will block until the required amount of
// peers are found, the method only exits in the event of context timeouts.
// subscribed to a particular subnet. Then it tries to connect
// with those peers. This method will block until either:
// - the required amount of peers are found, or
// - the context is terminated.
// On some edge cases, this method may hang indefinitely while peers
// are actually found. In such a case, the user should cancel the context
// and re-run the method again.
func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
index uint64, threshold int) (bool, error) {
ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
Expand All @@ -73,9 +77,9 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
return false, errors.New("no subnet exists for provided topic")
}

currNum := len(s.pubsub.ListPeers(topic))
wg := new(sync.WaitGroup)
for {
currNum := len(s.pubsub.ListPeers(topic))
if currNum >= threshold {
break
}
Expand All @@ -99,7 +103,6 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
}
// Wait for all dials to be completed.
wg.Wait()
currNum = len(s.pubsub.ListPeers(topic))
}
return true, nil
}
Expand All @@ -110,18 +113,13 @@ func (s *Service) filterPeerForAttSubnet(index uint64) func(node *enode.Node) bo
if !s.filterPeer(node) {
return false
}

subnets, err := attSubnets(node.Record())
if err != nil {
return false
}
indExists := false
for _, comIdx := range subnets {
if comIdx == index {
indExists = true
break
}
}
return indExists

return subnets[index]
}
}

Expand Down Expand Up @@ -205,8 +203,10 @@ func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error {
//
// return [compute_subscribed_subnet(node_id, epoch, index) for index in range(SUBNETS_PER_NODE)]
func computeSubscribedSubnets(nodeID enode.ID, epoch primitives.Epoch) ([]uint64, error) {
subs := []uint64{}
for i := uint64(0); i < params.BeaconConfig().SubnetsPerNode; i++ {
subnetsPerNode := params.BeaconConfig().SubnetsPerNode
subs := make([]uint64, 0, subnetsPerNode)

for i := uint64(0); i < subnetsPerNode; i++ {
sub, err := computeSubscribedSubnet(nodeID, epoch, i)
if err != nil {
return nil, err
Expand Down Expand Up @@ -281,19 +281,20 @@ func initializeSyncCommSubnets(node *enode.LocalNode) *enode.LocalNode {

// Reads the attestation subnets entry from a node's ENR and determines
// the committee indices of the attestation subnets the node is subscribed to.
func attSubnets(record *enr.Record) ([]uint64, error) {
func attSubnets(record *enr.Record) (map[uint64]bool, error) {
bitV, err := attBitvector(record)
if err != nil {
return nil, err
}
committeeIdxs := make(map[uint64]bool)
// lint:ignore uintcast -- subnet count can be safely cast to int.
if len(bitV) != byteCount(int(attestationSubnetCount)) {
return []uint64{}, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV))
return committeeIdxs, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV))
}
var committeeIdxs []uint64

for i := uint64(0); i < attestationSubnetCount; i++ {
if bitV.BitAt(i) {
committeeIdxs = append(committeeIdxs, i)
committeeIdxs[i] = true
}
}
return committeeIdxs, nil
Expand Down
Loading

0 comments on commit fca1adb

Please sign in to comment.