diff --git a/network/p2p/p2p_setup.go b/network/p2p/p2p_setup.go index bc9a9d0abe..ec96735b99 100644 --- a/network/p2p/p2p_setup.go +++ b/network/p2p/p2p_setup.go @@ -117,54 +117,6 @@ func (n *p2pNetwork) IsBadPeer(logger *zap.Logger, peerID peer.ID) bool { return n.idx.IsBad(logger, peerID) } -func (n *p2pNetwork) isBadInbound(lg *zap.Logger, peerID peer.ID) bool { - if n.idx == nil { - return false - } - - maxPeers := n.cfg.MaxPeers - inBoundLimit := int(float64(maxPeers) * inboundRatio) - - // should never happen - if maxPeers < inBoundLimit { - return true - } - - in, _ := n.connectionStats() - - if in >= inBoundLimit { - n.interfaceLogger.Debug("preventing inbound connections due to inbound limit", zap.Int("inbound", in), zap.Int("inbound_limit", inBoundLimit)) - // todo: should we disconnect to make sure its ==? - return true - } - - return n.idx.IsBad(lg, peerID) -} - -func (n *p2pNetwork) connectionStats() (inbound, outbound int) { - cns := n.host.Network().Conns() - - for _, cn := range cns { - - if n.host.Network().Connectedness(cn.RemotePeer()) != network.Connected { - continue - } - - dir := cn.Stat().Direction - if dir == network.DirUnknown { - continue // TODO: how can it happen? - } - - if dir == network.DirOutbound { - outbound++ - } else { - inbound++ - } - } - - return inbound, outbound -} - // SetupHost configures a libp2p host and backoff connector utility func (n *p2pNetwork) SetupHost(logger *zap.Logger) error { opts, err := n.cfg.Libp2pOptions(logger) @@ -178,7 +130,7 @@ func (n *p2pNetwork) SetupHost(logger *zap.Logger) error { if err != nil { return errors.Wrap(err, "could not create resource manager") } - n.connGater = connections.NewConnectionGater(logger, n.cfg.DisableIPRateLimit, n.connectionsAtLimit, n.IsBadPeer, n.isBadInbound) + n.connGater = connections.NewConnectionGater(logger, n.cfg.DisableIPRateLimit, n.connectionsAtLimit, n.IsBadPeer, n.inboundLimit) opts = append(opts, libp2p.ResourceManager(rmgr), libp2p.ConnectionGater(n.connGater)) host, err := libp2p.New(opts...) if err != nil { @@ -382,3 +334,51 @@ func (n *p2pNetwork) connectionsAtLimit() bool { } return n.idx.AtLimit(network.DirOutbound) } + +func (n *p2pNetwork) inboundLimit() bool { + if n.idx == nil { + return false + } + + maxPeers := n.cfg.MaxPeers + inBoundLimit := int(float64(maxPeers) * inboundRatio) + + // should never happen + if maxPeers < inBoundLimit { + return true + } + + in, _ := n.connectionStats() + + if in >= inBoundLimit { + n.interfaceLogger.Debug("preventing inbound connections due to inbound limit", zap.Int("inbound", in), zap.Int("inbound_limit", inBoundLimit)) + // todo: should we disconnect to stay at limit? + return true + } + + return false +} + +func (n *p2pNetwork) connectionStats() (inbound, outbound int) { + cns := n.host.Network().Conns() + + for _, cn := range cns { + + if n.host.Network().Connectedness(cn.RemotePeer()) != network.Connected { + continue + } + + dir := cn.Stat().Direction + if dir == network.DirUnknown { + continue // TODO: how can it happen? + } + + if dir == network.DirOutbound { + outbound++ + } else { + inbound++ + } + } + + return inbound, outbound +} diff --git a/network/peers/connections/conn_gater.go b/network/peers/connections/conn_gater.go index f1cee10d9c..fc03dc8575 100644 --- a/network/peers/connections/conn_gater.go +++ b/network/peers/connections/conn_gater.go @@ -26,28 +26,29 @@ const ( // ) -type IsBadF func(logger *zap.Logger, peerID peer.ID) bool +type IsBadPeerF func(logger *zap.Logger, peerID peer.ID) bool +type InboundLimitF func() bool // todo: consider being more flex at the limit for specific peers, e.g: peers that share multiple subnets // connGater implements ConnectionGater interface: // https://github.com/libp2p/go-libp2p/core/blob/master/connmgr/gater.go type connGater struct { - logger *zap.Logger // struct logger to implement connmgr.ConnectionGater - disable bool - atLimit func() bool - ipLimiter *leakybucket.Collector - isBadOutbound IsBadF - isBadInbound IsBadF + logger *zap.Logger // struct logger to implement connmgr.ConnectionGater + disable bool + atLimit func() bool + ipLimiter *leakybucket.Collector + isBadPeer IsBadPeerF + inboundLimit InboundLimitF } // NewConnectionGater creates a new instance of ConnectionGater -func NewConnectionGater(logger *zap.Logger, disable bool, atLimit func() bool, canConnectOutbound, canConnectInbound IsBadF) connmgr.ConnectionGater { +func NewConnectionGater(logger *zap.Logger, disable bool, atLimit func() bool, isBadPeer IsBadPeerF, inboundLimit InboundLimitF) connmgr.ConnectionGater { return &connGater{ - logger: logger, - disable: disable, - atLimit: atLimit, - ipLimiter: leakybucket.NewCollector(ipLimitRate, ipLimitBurst, ipLimitPeriod, true), - isBadInbound: canConnectInbound, - isBadOutbound: canConnectOutbound, + logger: logger, + disable: disable, + atLimit: atLimit, + ipLimiter: leakybucket.NewCollector(ipLimitRate, ipLimitBurst, ipLimitPeriod, true), + isBadPeer: isBadPeer, + inboundLimit: inboundLimit, } } @@ -62,7 +63,7 @@ func (n *connGater) InterceptPeerDial(id peer.ID) bool { // particular address. Blocking connections at this stage is typical for // address filtering. func (n *connGater) InterceptAddrDial(id peer.ID, multiaddr ma.Multiaddr) bool { - if n.isBadOutbound(n.logger, id) { + if n.isBadPeer(n.logger, id) { n.logger.Debug("preventing outbound connection due to bad peer", fields.PeerID(id)) return false } @@ -77,6 +78,11 @@ func (n *connGater) InterceptAccept(multiaddrs libp2pnetwork.ConnMultiaddrs) boo if n.disable { return true } + + if n.inboundLimit() { // inbound limit + return false + } + remoteAddr := multiaddrs.RemoteMultiaddr() if !n.validateDial(remoteAddr) { // Yield this goroutine to allow others to run in-between connection attempts. @@ -85,21 +91,17 @@ func (n *connGater) InterceptAccept(multiaddrs libp2pnetwork.ConnMultiaddrs) boo n.logger.Debug("connection rejected due to IP rate limit", zap.String("remote_addr", remoteAddr.String())) return false } - return !n.atLimit() + return !n.atLimit() // maxpeers limit } // InterceptSecured is called for both inbound and outbound connections, // after a security handshake has taken place and we've authenticated the peer. func (n *connGater) InterceptSecured(direction libp2pnetwork.Direction, id peer.ID, multiaddrs libp2pnetwork.ConnMultiaddrs) bool { - if direction == libp2pnetwork.DirUnknown { + if n.isBadPeer(n.logger, id) { + n.logger.Debug("rejecting inbound connection due to bad peer", fields.PeerID(id)) return false } - - if direction == libp2pnetwork.DirOutbound { - return n.isBadOutbound(n.logger, id) - } else { - return n.isBadInbound(n.logger, id) - } + return true } // InterceptUpgraded is called for inbound and outbound connections, after