From 7a4ecb60605a40fa26e47483f86a7270368485d0 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Thu, 30 May 2024 14:57:26 +0800 Subject: [PATCH] Restrict Dials From Discovery (#14052) * Fix Excessive Subnet Dials * Handle backoff in Iterator * Slow Down Lookups * Add Flag To Configure Dials * Preston's Review * Update cmd/beacon-chain/flags/base.go Co-authored-by: Preston Van Loon * Reduce polling period * Manu's Review --------- Co-authored-by: Preston Van Loon --- beacon-chain/p2p/discovery.go | 63 ++++++++++++++++++++++---------- beacon-chain/p2p/iterator.go | 12 ++++++ beacon-chain/p2p/subnets.go | 7 +++- cmd/beacon-chain/flags/base.go | 6 +++ cmd/beacon-chain/flags/config.go | 7 ++++ cmd/beacon-chain/main.go | 1 + cmd/beacon-chain/usage.go | 1 + 7 files changed, 77 insertions(+), 20 deletions(-) diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 7fc63927ca88..f032038d4304 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/ecdsa" "net" + "sync" "time" "github.com/ethereum/go-ethereum/p2p/discover" @@ -15,6 +16,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" + "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v5/config/features" "github.com/prysmaticlabs/prysm/v5/config/params" ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa" @@ -106,7 +108,7 @@ func (s *Service) RefreshENR() { // listen for new nodes watches for new nodes in the network and adds them to the peerstore. func (s *Service) listenForNewNodes() { - iterator := enode.Filter(s.dv5Listener.RandomNodes(), s.filterPeer) + iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer) defer iterator.Close() for { @@ -122,29 +124,41 @@ func (s *Service) listenForNewNodes() { time.Sleep(pollingPeriod) continue } - - if exists := iterator.Next(); !exists { - break - } - - node := iterator.Node() - peerInfo, _, err := convertToAddrInfo(node) - if err != nil { - log.WithError(err).Error("Could not convert to peer info") + wantedCount := s.wantedPeerDials() + if wantedCount == 0 { + log.Trace("Not looking for peers, at peer limit") + time.Sleep(pollingPeriod) continue } - - if peerInfo == nil { - continue + // Restrict dials if limit is applied. + if flags.MaxDialIsActive() { + wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials) } + wantedNodes := enode.ReadNodes(iterator, wantedCount) + wg := new(sync.WaitGroup) + for i := 0; i < len(wantedNodes); i++ { + node := wantedNodes[i] + peerInfo, _, err := convertToAddrInfo(node) + if err != nil { + log.WithError(err).Error("Could not convert to peer info") + continue + } - // Make sure that peer is not dialed too often, for each connection attempt there's a backoff period. - s.Peers().RandomizeBackOff(peerInfo.ID) - go func(info *peer.AddrInfo) { - if err := s.connectWithPeer(s.ctx, *info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) + if peerInfo == nil { + continue } - }(peerInfo) + + // Make sure that peer is not dialed too often, for each connection attempt there's a backoff period. + s.Peers().RandomizeBackOff(peerInfo.ID) + wg.Add(1) + go func(info *peer.AddrInfo) { + if err := s.connectWithPeer(s.ctx, *info); err != nil { + log.WithError(err).Tracef("Could not connect with peer %s", info.String()) + } + wg.Done() + }(peerInfo) + } + wg.Wait() } } @@ -384,6 +398,17 @@ func (s *Service) isPeerAtLimit(inbound bool) bool { return activePeers >= maxPeers || numOfConns >= maxPeers } +func (s *Service) wantedPeerDials() int { + maxPeers := int(s.cfg.MaxPeers) + + activePeers := len(s.Peers().Active()) + wantedCount := 0 + if maxPeers > activePeers { + wantedCount = maxPeers - activePeers + } + return wantedCount +} + // PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p. func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) { var allAddrs []ma.Multiaddr diff --git a/beacon-chain/p2p/iterator.go b/beacon-chain/p2p/iterator.go index cd5451ba3048..2530e46a5b8d 100644 --- a/beacon-chain/p2p/iterator.go +++ b/beacon-chain/p2p/iterator.go @@ -2,10 +2,14 @@ package p2p import ( "context" + "runtime" + "time" "github.com/ethereum/go-ethereum/p2p/enode" ) +const backOffCounter = 50 + // filterNodes wraps an iterator such that Next only returns nodes for which // the 'check' function returns true. This custom implementation also // checks for context deadlines so that in the event the parent context has @@ -24,13 +28,21 @@ type filterIter struct { // Next looks up for the next valid node according to our // filter criteria. func (f *filterIter) Next() bool { + lookupCounter := 0 for f.Iterator.Next() { + // Do not excessively perform lookups if we constantly receive non-viable peers. + if lookupCounter > backOffCounter { + lookupCounter = 0 + runtime.Gosched() + time.Sleep(pollingPeriod) + } if f.Context.Err() != nil { return false } if f.check(f.Node()) { return true } + lookupCounter++ } return false } diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index 2c6262232a00..1809c7db5f19 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -87,7 +87,12 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+ "only %d out of %d peers were able to be found", topic, currNum, threshold) } - nodes := enode.ReadNodes(iterator, int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch)) + nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch) + // Restrict dials if limit is applied. + if flags.MaxDialIsActive() { + nodeCount = min(nodeCount, flags.Get().MaxConcurrentDials) + } + nodes := enode.ReadNodes(iterator, nodeCount) for _, node := range nodes { info, _, err := convertToAddrInfo(node) if err != nil { diff --git a/cmd/beacon-chain/flags/base.go b/cmd/beacon-chain/flags/base.go index 5b26a7480315..d119c427f2fd 100644 --- a/cmd/beacon-chain/flags/base.go +++ b/cmd/beacon-chain/flags/base.go @@ -227,6 +227,12 @@ var ( Usage: "Sets the minimum number of peers that a node will attempt to peer with that are subscribed to a subnet.", Value: 6, } + // MaxConcurrentDials defines a flag to set the maximum number of peers that a node will attempt to dial with from discovery. + MaxConcurrentDials = &cli.Uint64Flag{ + Name: "max-concurrent-dials", + Usage: "Sets the maximum number of peers that a node will attempt to dial with from discovery. By default we will dials as " + + "many peers as possible.", + } // SuggestedFeeRecipient specifies the fee recipient for the transaction fees. SuggestedFeeRecipient = &cli.StringFlag{ Name: "suggested-fee-recipient", diff --git a/cmd/beacon-chain/flags/config.go b/cmd/beacon-chain/flags/config.go index bdb1b8bcb22d..48226dda8924 100644 --- a/cmd/beacon-chain/flags/config.go +++ b/cmd/beacon-chain/flags/config.go @@ -11,6 +11,7 @@ type GlobalFlags struct { SubscribeToAllSubnets bool MinimumSyncPeers int MinimumPeersPerSubnet int + MaxConcurrentDials int BlockBatchLimit int BlockBatchLimitBurstFactor int BlobBatchLimit int @@ -45,11 +46,17 @@ func ConfigureGlobalFlags(ctx *cli.Context) { cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name) cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name) cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name) + cfg.MaxConcurrentDials = ctx.Int(MaxConcurrentDials.Name) configureMinimumPeers(ctx, cfg) Init(cfg) } +// MaxDialIsActive checks if the user has enabled the max dial flag. +func MaxDialIsActive() bool { + return Get().MaxConcurrentDials > 0 +} + func configureMinimumPeers(ctx *cli.Context, cfg *GlobalFlags) { cfg.MinimumSyncPeers = ctx.Int(MinSyncPeers.Name) maxPeers := ctx.Int(cmd.P2PMaxPeers.Name) diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index c7dec5e72431..97092232d3de 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -72,6 +72,7 @@ var appFlags = []cli.Flag{ flags.WeakSubjectivityCheckpoint, flags.Eth1HeaderReqLimit, flags.MinPeersPerSubnet, + flags.MaxConcurrentDials, flags.SuggestedFeeRecipient, flags.TerminalTotalDifficultyOverride, flags.TerminalBlockHashOverride, diff --git a/cmd/beacon-chain/usage.go b/cmd/beacon-chain/usage.go index adc4c87a3085..747533c0f0d9 100644 --- a/cmd/beacon-chain/usage.go +++ b/cmd/beacon-chain/usage.go @@ -124,6 +124,7 @@ var appHelpFlagGroups = []flagGroup{ flags.WeakSubjectivityCheckpoint, flags.Eth1HeaderReqLimit, flags.MinPeersPerSubnet, + flags.MaxConcurrentDials, flags.MevRelayEndpoint, flags.MaxBuilderEpochMissedSlots, flags.MaxBuilderConsecutiveMissedSlots,