Skip to content

Commit

Permalink
simplify and block connection at earlier stage.
Browse files Browse the repository at this point in the history
  • Loading branch information
y0sher committed Dec 8, 2024
1 parent 414df73 commit 9f7ea8a
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 72 deletions.
98 changes: 49 additions & 49 deletions network/p2p/p2p_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,54 +117,6 @@ func (n *p2pNetwork) IsBadPeer(logger *zap.Logger, peerID peer.ID) bool {
return n.idx.IsBad(logger, peerID)
}

func (n *p2pNetwork) isBadInbound(lg *zap.Logger, peerID peer.ID) bool {
if n.idx == nil {
return false
}

maxPeers := n.cfg.MaxPeers
inBoundLimit := int(float64(maxPeers) * inboundRatio)

// should never happen
if maxPeers < inBoundLimit {
return true
}

in, _ := n.connectionStats()

if in >= inBoundLimit {
n.interfaceLogger.Debug("preventing inbound connections due to inbound limit", zap.Int("inbound", in), zap.Int("inbound_limit", inBoundLimit))
// todo: should we disconnect to make sure its ==?
return true
}

return n.idx.IsBad(lg, peerID)
}

func (n *p2pNetwork) connectionStats() (inbound, outbound int) {
cns := n.host.Network().Conns()

for _, cn := range cns {

if n.host.Network().Connectedness(cn.RemotePeer()) != network.Connected {
continue
}

dir := cn.Stat().Direction
if dir == network.DirUnknown {
continue // TODO: how can it happen?
}

if dir == network.DirOutbound {
outbound++
} else {
inbound++
}
}

return inbound, outbound
}

// SetupHost configures a libp2p host and backoff connector utility
func (n *p2pNetwork) SetupHost(logger *zap.Logger) error {
opts, err := n.cfg.Libp2pOptions(logger)
Expand All @@ -178,7 +130,7 @@ func (n *p2pNetwork) SetupHost(logger *zap.Logger) error {
if err != nil {
return errors.Wrap(err, "could not create resource manager")
}
n.connGater = connections.NewConnectionGater(logger, n.cfg.DisableIPRateLimit, n.connectionsAtLimit, n.IsBadPeer, n.isBadInbound)
n.connGater = connections.NewConnectionGater(logger, n.cfg.DisableIPRateLimit, n.connectionsAtLimit, n.IsBadPeer, n.inboundLimit)
opts = append(opts, libp2p.ResourceManager(rmgr), libp2p.ConnectionGater(n.connGater))
host, err := libp2p.New(opts...)
if err != nil {
Expand Down Expand Up @@ -382,3 +334,51 @@ func (n *p2pNetwork) connectionsAtLimit() bool {
}
return n.idx.AtLimit(network.DirOutbound)
}

func (n *p2pNetwork) inboundLimit() bool {
if n.idx == nil {
return false
}

maxPeers := n.cfg.MaxPeers
inBoundLimit := int(float64(maxPeers) * inboundRatio)

// should never happen
if maxPeers < inBoundLimit {
return true
}

in, _ := n.connectionStats()

if in >= inBoundLimit {
n.interfaceLogger.Debug("preventing inbound connections due to inbound limit", zap.Int("inbound", in), zap.Int("inbound_limit", inBoundLimit))
// todo: should we disconnect to stay at limit?
return true
}

return false
}

func (n *p2pNetwork) connectionStats() (inbound, outbound int) {
cns := n.host.Network().Conns()

for _, cn := range cns {

if n.host.Network().Connectedness(cn.RemotePeer()) != network.Connected {
continue
}

dir := cn.Stat().Direction
if dir == network.DirUnknown {
continue // TODO: how can it happen?
}

if dir == network.DirOutbound {
outbound++
} else {
inbound++
}
}

return inbound, outbound
}
48 changes: 25 additions & 23 deletions network/peers/connections/conn_gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,29 @@ const (
//
)

type IsBadF func(logger *zap.Logger, peerID peer.ID) bool
type IsBadPeerF func(logger *zap.Logger, peerID peer.ID) bool
type InboundLimitF func() bool // todo: consider being more flex at the limit for specific peers, e.g: peers that share multiple subnets

// connGater implements ConnectionGater interface:
// https://github.com/libp2p/go-libp2p/core/blob/master/connmgr/gater.go
type connGater struct {
logger *zap.Logger // struct logger to implement connmgr.ConnectionGater
disable bool
atLimit func() bool
ipLimiter *leakybucket.Collector
isBadOutbound IsBadF
isBadInbound IsBadF
logger *zap.Logger // struct logger to implement connmgr.ConnectionGater
disable bool
atLimit func() bool
ipLimiter *leakybucket.Collector
isBadPeer IsBadPeerF
inboundLimit InboundLimitF
}

// NewConnectionGater creates a new instance of ConnectionGater
func NewConnectionGater(logger *zap.Logger, disable bool, atLimit func() bool, canConnectOutbound, canConnectInbound IsBadF) connmgr.ConnectionGater {
func NewConnectionGater(logger *zap.Logger, disable bool, atLimit func() bool, isBadPeer IsBadPeerF, inboundLimit InboundLimitF) connmgr.ConnectionGater {
return &connGater{
logger: logger,
disable: disable,
atLimit: atLimit,
ipLimiter: leakybucket.NewCollector(ipLimitRate, ipLimitBurst, ipLimitPeriod, true),
isBadInbound: canConnectInbound,
isBadOutbound: canConnectOutbound,
logger: logger,
disable: disable,
atLimit: atLimit,
ipLimiter: leakybucket.NewCollector(ipLimitRate, ipLimitBurst, ipLimitPeriod, true),
isBadPeer: isBadPeer,
inboundLimit: inboundLimit,
}
}

Expand All @@ -62,7 +63,7 @@ func (n *connGater) InterceptPeerDial(id peer.ID) bool {
// particular address. Blocking connections at this stage is typical for
// address filtering.
func (n *connGater) InterceptAddrDial(id peer.ID, multiaddr ma.Multiaddr) bool {
if n.isBadOutbound(n.logger, id) {
if n.isBadPeer(n.logger, id) {
n.logger.Debug("preventing outbound connection due to bad peer", fields.PeerID(id))
return false
}
Expand All @@ -77,6 +78,11 @@ func (n *connGater) InterceptAccept(multiaddrs libp2pnetwork.ConnMultiaddrs) boo
if n.disable {
return true
}

if n.inboundLimit() { // inbound limit
return false
}

remoteAddr := multiaddrs.RemoteMultiaddr()
if !n.validateDial(remoteAddr) {
// Yield this goroutine to allow others to run in-between connection attempts.
Expand All @@ -85,21 +91,17 @@ func (n *connGater) InterceptAccept(multiaddrs libp2pnetwork.ConnMultiaddrs) boo
n.logger.Debug("connection rejected due to IP rate limit", zap.String("remote_addr", remoteAddr.String()))
return false
}
return !n.atLimit()
return !n.atLimit() // maxpeers limit
}

// InterceptSecured is called for both inbound and outbound connections,
// after a security handshake has taken place and we've authenticated the peer.
func (n *connGater) InterceptSecured(direction libp2pnetwork.Direction, id peer.ID, multiaddrs libp2pnetwork.ConnMultiaddrs) bool {
if direction == libp2pnetwork.DirUnknown {
if n.isBadPeer(n.logger, id) {
n.logger.Debug("rejecting inbound connection due to bad peer", fields.PeerID(id))
return false
}

if direction == libp2pnetwork.DirOutbound {
return n.isBadOutbound(n.logger, id)
} else {
return n.isBadInbound(n.logger, id)
}
return true
}

// InterceptUpgraded is called for inbound and outbound connections, after
Expand Down

0 comments on commit 9f7ea8a

Please sign in to comment.