Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): limit inbound connections #1919

Open
wants to merge 4 commits into
base: stage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion network/p2p/p2p_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
connectTimeout = time.Minute
// connectorQueueSize is the buffer size of the channel used by the connector
connectorQueueSize = 256
// inboundRatio is the ratio of inbound connections to outbound connections
inboundRatio = float64(0.8)
Comment on lines +50 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably inboundShare or inboundFraction and not inboundRatio (which would imply totalInboud = 0.8 * totalOutbound or something similar)

)

// Setup is used to setup the network
Expand Down Expand Up @@ -115,6 +117,54 @@ func (n *p2pNetwork) IsBadPeer(logger *zap.Logger, peerID peer.ID) bool {
return n.idx.IsBad(logger, peerID)
}

func (n *p2pNetwork) isBadInbound(lg *zap.Logger, peerID peer.ID) bool {
if n.idx == nil {
return false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could maybe just use n.IsBadPeer (instead of this and n.idx.IsBad(lg, peerID) below)


maxPeers := n.cfg.MaxPeers
inBoundLimit := int(float64(maxPeers) * inboundRatio)

// should never happen
if maxPeers < inBoundLimit {
return true
}

in, _ := n.connectionStats()

if in >= inBoundLimit {
n.interfaceLogger.Debug("preventing inbound connections due to inbound limit", zap.Int("inbound", in), zap.Int("inbound_limit", inBoundLimit))
// todo: should we disconnect to make sure its ==?
return true
}

return n.idx.IsBad(lg, peerID)
}

func (n *p2pNetwork) connectionStats() (inbound, outbound int) {
cns := n.host.Network().Conns()

for _, cn := range cns {

if n.host.Network().Connectedness(cn.RemotePeer()) != network.Connected {
continue
}

dir := cn.Stat().Direction
if dir == network.DirUnknown {
continue // TODO: how can it happen?
}

if dir == network.DirOutbound {
outbound++
} else {
inbound++
}
}

return inbound, outbound
}

// SetupHost configures a libp2p host and backoff connector utility
func (n *p2pNetwork) SetupHost(logger *zap.Logger) error {
opts, err := n.cfg.Libp2pOptions(logger)
Expand All @@ -128,7 +178,7 @@ func (n *p2pNetwork) SetupHost(logger *zap.Logger) error {
if err != nil {
return errors.Wrap(err, "could not create resource manager")
}
n.connGater = connections.NewConnectionGater(logger, n.cfg.DisableIPRateLimit, n.connectionsAtLimit, n.IsBadPeer)
n.connGater = connections.NewConnectionGater(logger, n.cfg.DisableIPRateLimit, n.connectionsAtLimit, n.IsBadPeer, n.isBadInbound)
opts = append(opts, libp2p.ResourceManager(rmgr), libp2p.ConnectionGater(n.connGater))
host, err := libp2p.New(opts...)
if err != nil {
Expand Down
38 changes: 22 additions & 16 deletions network/peers/connections/conn_gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,28 @@ const (
//
)

type BadPeerF func(logger *zap.Logger, peerID peer.ID) bool
type IsBadF func(logger *zap.Logger, peerID peer.ID) bool

// connGater implements ConnectionGater interface:
// https://github.com/libp2p/go-libp2p/core/blob/master/connmgr/gater.go
type connGater struct {
logger *zap.Logger // struct logger to implement connmgr.ConnectionGater
disable bool
atLimit func() bool
ipLimiter *leakybucket.Collector
isBadPeer BadPeerF
logger *zap.Logger // struct logger to implement connmgr.ConnectionGater
disable bool
atLimit func() bool
ipLimiter *leakybucket.Collector
isBadOutbound IsBadF
isBadInbound IsBadF
}

// NewConnectionGater creates a new instance of ConnectionGater
func NewConnectionGater(logger *zap.Logger, disable bool, atLimit func() bool, isBadPeerF BadPeerF) connmgr.ConnectionGater {
func NewConnectionGater(logger *zap.Logger, disable bool, atLimit func() bool, canConnectOutbound, canConnectInbound IsBadF) connmgr.ConnectionGater {
return &connGater{
logger: logger,
disable: disable,
atLimit: atLimit,
ipLimiter: leakybucket.NewCollector(ipLimitRate, ipLimitBurst, ipLimitPeriod, true),
isBadPeer: isBadPeerF,
logger: logger,
disable: disable,
atLimit: atLimit,
ipLimiter: leakybucket.NewCollector(ipLimitRate, ipLimitBurst, ipLimitPeriod, true),
isBadInbound: canConnectInbound,
isBadOutbound: canConnectOutbound,
}
}

Expand All @@ -60,7 +62,7 @@ func (n *connGater) InterceptPeerDial(id peer.ID) bool {
// particular address. Blocking connections at this stage is typical for
// address filtering.
func (n *connGater) InterceptAddrDial(id peer.ID, multiaddr ma.Multiaddr) bool {
if n.isBadPeer(n.logger, id) {
if n.isBadOutbound(n.logger, id) {
n.logger.Debug("preventing outbound connection due to bad peer", fields.PeerID(id))
return false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't InterceptAddrDial seem a better place to check for this ?

Expand Down Expand Up @@ -89,11 +91,15 @@ func (n *connGater) InterceptAccept(multiaddrs libp2pnetwork.ConnMultiaddrs) boo
// InterceptSecured is called for both inbound and outbound connections,
// after a security handshake has taken place and we've authenticated the peer.
func (n *connGater) InterceptSecured(direction libp2pnetwork.Direction, id peer.ID, multiaddrs libp2pnetwork.ConnMultiaddrs) bool {
if n.isBadPeer(n.logger, id) {
n.logger.Debug("rejecting inbound connection due to bad peer", fields.PeerID(id))
if direction == libp2pnetwork.DirUnknown {
return false
}
return true

if direction == libp2pnetwork.DirOutbound {
return n.isBadOutbound(n.logger, id)
} else {
return n.isBadInbound(n.logger, id)
}
Copy link
Contributor

@iurii-ssv iurii-ssv Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here we essentially do isBadOutbound check 2nd time (because the first time, I think, we did it in InterceptAddrDial), which maybe means - instead of all this added code to InterceptSecured we maybe should just check isBadInbound in InterceptAccept ?

Although in InterceptAccept we don't have peerID looks like ...

}

// InterceptUpgraded is called for inbound and outbound connections, after
Expand Down
Loading