Skip to content

Commit

Permalink
Restrict Dials From Discovery (#14052)
Browse files Browse the repository at this point in the history
* Fix Excessive Subnet Dials

* Handle backoff in Iterator

* Slow Down Lookups

* Add Flag To Configure Dials

* Preston's Review

* Update cmd/beacon-chain/flags/base.go

Co-authored-by: Preston Van Loon <[email protected]>

* Reduce polling period

* Manu's Review

---------

Co-authored-by: Preston Van Loon <[email protected]>
  • Loading branch information
nisdas and prestonvanloon authored May 30, 2024
1 parent 82f0ea5 commit 7a4ecb6
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 20 deletions.
63 changes: 44 additions & 19 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/ecdsa"
"net"
"sync"
"time"

"github.com/ethereum/go-ethereum/p2p/discover"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
Expand Down Expand Up @@ -106,7 +108,7 @@ func (s *Service) RefreshENR() {

// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() {
iterator := enode.Filter(s.dv5Listener.RandomNodes(), s.filterPeer)
iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
defer iterator.Close()

for {
Expand All @@ -122,29 +124,41 @@ func (s *Service) listenForNewNodes() {
time.Sleep(pollingPeriod)
continue
}

if exists := iterator.Next(); !exists {
break
}

node := iterator.Node()
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
wantedCount := s.wantedPeerDials()
if wantedCount == 0 {
log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod)
continue
}

if peerInfo == nil {
continue
// Restrict dials if limit is applied.
if flags.MaxDialIsActive() {
wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials)
}
wantedNodes := enode.ReadNodes(iterator, wantedCount)
wg := new(sync.WaitGroup)
for i := 0; i < len(wantedNodes); i++ {
node := wantedNodes[i]
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue
}

// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
s.Peers().RandomizeBackOff(peerInfo.ID)
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
if peerInfo == nil {
continue
}
}(peerInfo)

// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
s.Peers().RandomizeBackOff(peerInfo.ID)
wg.Add(1)
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
wg.Done()
}(peerInfo)
}
wg.Wait()
}
}

Expand Down Expand Up @@ -384,6 +398,17 @@ func (s *Service) isPeerAtLimit(inbound bool) bool {
return activePeers >= maxPeers || numOfConns >= maxPeers
}

func (s *Service) wantedPeerDials() int {
maxPeers := int(s.cfg.MaxPeers)

activePeers := len(s.Peers().Active())
wantedCount := 0
if maxPeers > activePeers {
wantedCount = maxPeers - activePeers
}
return wantedCount
}

// PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p.
func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
var allAddrs []ma.Multiaddr
Expand Down
12 changes: 12 additions & 0 deletions beacon-chain/p2p/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package p2p

import (
"context"
"runtime"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
)

const backOffCounter = 50

// filterNodes wraps an iterator such that Next only returns nodes for which
// the 'check' function returns true. This custom implementation also
// checks for context deadlines so that in the event the parent context has
Expand All @@ -24,13 +28,21 @@ type filterIter struct {
// Next looks up for the next valid node according to our
// filter criteria.
func (f *filterIter) Next() bool {
lookupCounter := 0
for f.Iterator.Next() {
// Do not excessively perform lookups if we constantly receive non-viable peers.
if lookupCounter > backOffCounter {
lookupCounter = 0
runtime.Gosched()
time.Sleep(pollingPeriod)
}
if f.Context.Err() != nil {
return false
}
if f.check(f.Node()) {
return true
}
lookupCounter++
}
return false
}
7 changes: 6 additions & 1 deletion beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+
"only %d out of %d peers were able to be found", topic, currNum, threshold)
}
nodes := enode.ReadNodes(iterator, int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch))
nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch)
// Restrict dials if limit is applied.
if flags.MaxDialIsActive() {
nodeCount = min(nodeCount, flags.Get().MaxConcurrentDials)
}
nodes := enode.ReadNodes(iterator, nodeCount)
for _, node := range nodes {
info, _, err := convertToAddrInfo(node)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions cmd/beacon-chain/flags/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ var (
Usage: "Sets the minimum number of peers that a node will attempt to peer with that are subscribed to a subnet.",
Value: 6,
}
// MaxConcurrentDials defines a flag to set the maximum number of peers that a node will attempt to dial with from discovery.
MaxConcurrentDials = &cli.Uint64Flag{
Name: "max-concurrent-dials",
Usage: "Sets the maximum number of peers that a node will attempt to dial with from discovery. By default we will dials as " +
"many peers as possible.",
}
// SuggestedFeeRecipient specifies the fee recipient for the transaction fees.
SuggestedFeeRecipient = &cli.StringFlag{
Name: "suggested-fee-recipient",
Expand Down
7 changes: 7 additions & 0 deletions cmd/beacon-chain/flags/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type GlobalFlags struct {
SubscribeToAllSubnets bool
MinimumSyncPeers int
MinimumPeersPerSubnet int
MaxConcurrentDials int
BlockBatchLimit int
BlockBatchLimitBurstFactor int
BlobBatchLimit int
Expand Down Expand Up @@ -45,11 +46,17 @@ func ConfigureGlobalFlags(ctx *cli.Context) {
cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name)
cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name)
cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name)
cfg.MaxConcurrentDials = ctx.Int(MaxConcurrentDials.Name)
configureMinimumPeers(ctx, cfg)

Init(cfg)
}

// MaxDialIsActive checks if the user has enabled the max dial flag.
func MaxDialIsActive() bool {
return Get().MaxConcurrentDials > 0
}

func configureMinimumPeers(ctx *cli.Context, cfg *GlobalFlags) {
cfg.MinimumSyncPeers = ctx.Int(MinSyncPeers.Name)
maxPeers := ctx.Int(cmd.P2PMaxPeers.Name)
Expand Down
1 change: 1 addition & 0 deletions cmd/beacon-chain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ var appFlags = []cli.Flag{
flags.WeakSubjectivityCheckpoint,
flags.Eth1HeaderReqLimit,
flags.MinPeersPerSubnet,
flags.MaxConcurrentDials,
flags.SuggestedFeeRecipient,
flags.TerminalTotalDifficultyOverride,
flags.TerminalBlockHashOverride,
Expand Down
1 change: 1 addition & 0 deletions cmd/beacon-chain/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ var appHelpFlagGroups = []flagGroup{
flags.WeakSubjectivityCheckpoint,
flags.Eth1HeaderReqLimit,
flags.MinPeersPerSubnet,
flags.MaxConcurrentDials,
flags.MevRelayEndpoint,
flags.MaxBuilderEpochMissedSlots,
flags.MaxBuilderConsecutiveMissedSlots,
Expand Down

0 comments on commit 7a4ecb6

Please sign in to comment.