Skip to content

Commit

Permalink
Add resource manager support. Follow Kubo.
Browse files Browse the repository at this point in the history
This adds libp2p resource manager support. It follows Kubo's resource manager
initialization (copy-pasted mostly), as it seems dealing with a default setup
has many pitfalls.

Fixes #10
  • Loading branch information
hsanjuan committed Oct 16, 2023
1 parent 858df29 commit 7714145
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 0 deletions.
7 changes: 7 additions & 0 deletions internal/fd/sys_not_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//go:build !linux && !darwin && !windows

package fd

func GetNumFDs() int {
return 0
}
17 changes: 17 additions & 0 deletions internal/fd/sys_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//go:build linux || darwin
// +build linux darwin

// Package fd provides filesystem descriptor count for different architectures.
package fd

import (
"golang.org/x/sys/unix"
)

func GetNumFDs() int {
var l unix.Rlimit
if err := unix.Getrlimit(unix.RLIMIT_NOFILE, &l); err != nil {
return 0
}
return int(l.Cur)

Check warning on line 16 in internal/fd/sys_unix.go

View check run for this annotation

Codecov / codecov/patch

internal/fd/sys_unix.go#L11-L16

Added lines #L11 - L16 were not covered by tests
}
11 changes: 11 additions & 0 deletions internal/fd/sys_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//go:build windows

package fd

import (
"math"
)

func GetNumFDs() int {
return math.MaxInt

Check warning on line 10 in internal/fd/sys_windows.go

View check run for this annotation

Codecov / codecov/patch

internal/fd/sys_windows.go#L9-L10

Added lines #L9 - L10 were not covered by tests
}
12 changes: 12 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ func main() {
Value: 1 << 30,
Usage: "Size of the in-memory block cache. 0 to disable (disables compression too)",
},
&cli.Uint64Flag{
Name: "max-memory",
Value: 0,
Usage: "Libp2p resource manager max memory. Defaults to system's memory * 0.85",
},
&cli.Uint64Flag{
Name: "max-fd",
Value: 0,
Usage: "Libp2p resource manager file description limit. Defaults to the process' fd-limit/2",
},

Check warning on line 73 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L64-L73

Added lines #L64 - L73 were not covered by tests
&cli.DurationFlag{
Name: "connmgr-grace",
Value: time.Minute,
Expand Down Expand Up @@ -90,6 +100,8 @@ func main() {
ConnMgrLow: cctx.Int("connmgr-low"),
ConnMgrHi: cctx.Int("connmgr-hi"),
ConnMgrGrace: cctx.Duration("connmgr-grace"),
MaxMemory: cctx.Uint64("max-memory"),
MaxFD: cctx.Int("max-fd"),

Check warning on line 104 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L103-L104

Added lines #L103 - L104 were not covered by tests
InMemBlockCache: cctx.Int64("inmem-block-cache"),
Libp2pKeyFile: filepath.Join(ddir, "libp2p.key"),
RoutingV1: cctx.String("routing"),
Expand Down
144 changes: 144 additions & 0 deletions rcmgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package main

import (
"log"

"github.com/dustin/go-humanize"
"github.com/ipfs/rainbow/internal/fd"
"github.com/libp2p/go-libp2p"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/pbnjay/memory"
)

// Note: this comes from kubo/core/node/libp2p/rcmgr_defaults.go with minimal
// adaptations.

var infiniteResourceLimits = rcmgr.InfiniteLimits.ToPartialLimitConfig().System

func makeResourceManagerConfig(maxMemory uint64, maxFD int, connMgrHighWater int) (limitConfig rcmgr.ConcreteLimitConfig) {
if maxMemory == 0 {
maxMemory = uint64((float64(memory.TotalMemory()) * 0.85))
}
if maxFD == 0 {
maxFD = fd.GetNumFDs() / 2
}

Check warning on line 24 in rcmgr.go

View check run for this annotation

Codecov / codecov/patch

rcmgr.go#L18-L24

Added lines #L18 - L24 were not covered by tests

maxMemoryMB := maxMemory / (1024 * 1024)

// At least as of 2023-01-25, it's possible to open a connection that
// doesn't ask for any memory usage with the libp2p Resource Manager/Accountant
// (see https://github.com/libp2p/go-libp2p/issues/2010#issuecomment-1404280736).
// As a result, we can't currently rely on Memory limits to full protect us.
// Until https://github.com/libp2p/go-libp2p/issues/2010 is addressed,
// we take a proxy now of restricting to 1 inbound connection per MB.
// Note: this is more generous than go-libp2p's default autoscaled limits which do
// 64 connections per 1GB
// (see https://github.com/libp2p/go-libp2p/blob/master/p2p/host/resource-manager/limit_defaults.go#L357 ).
systemConnsInbound := int(1 * maxMemoryMB)

partialLimits := rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{
Memory: rcmgr.LimitVal64(maxMemory),
FD: rcmgr.LimitVal(maxFD),

Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.LimitVal(systemConnsInbound),
ConnsOutbound: rcmgr.Unlimited,

Streams: rcmgr.Unlimited,
StreamsOutbound: rcmgr.Unlimited,
StreamsInbound: rcmgr.Unlimited,
},

// Transient connections won't cause any memory to be accounted for by the resource manager/accountant.
// Only established connections do.
// As a result, we can't rely on System.Memory to protect us from a bunch of transient connection being opened.
// We limit the same values as the System scope, but only allow the Transient scope to take 25% of what is allowed for the System scope.
Transient: rcmgr.ResourceLimits{
Memory: rcmgr.LimitVal64(maxMemory / 4),
FD: rcmgr.LimitVal(maxFD / 4),

Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.LimitVal(systemConnsInbound / 4),
ConnsOutbound: rcmgr.Unlimited,

Streams: rcmgr.Unlimited,
StreamsInbound: rcmgr.Unlimited,
StreamsOutbound: rcmgr.Unlimited,
},

// Lets get out of the way of the allow list functionality.
// If someone specified "Swarm.ResourceMgr.Allowlist" we should let it go through.
AllowlistedSystem: infiniteResourceLimits,

AllowlistedTransient: infiniteResourceLimits,

// Keep it simple by not having Service, ServicePeer, Protocol, ProtocolPeer, Conn, or Stream limits.
ServiceDefault: infiniteResourceLimits,

ServicePeerDefault: infiniteResourceLimits,

ProtocolDefault: infiniteResourceLimits,

ProtocolPeerDefault: infiniteResourceLimits,

Conn: infiniteResourceLimits,

Stream: infiniteResourceLimits,

// Limit the resources consumed by a peer.
// This doesn't protect us against intentional DoS attacks since an attacker can easily spin up multiple peers.
// We specify this limit against unintentional DoS attacks (e.g., a peer has a bug and is sending too much traffic intentionally).
// In that case we want to keep that peer's resource consumption contained.
// To keep this simple, we only constrain inbound connections and streams.
PeerDefault: rcmgr.ResourceLimits{
Memory: rcmgr.Unlimited64,
FD: rcmgr.Unlimited,
Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.DefaultLimit,
ConnsOutbound: rcmgr.Unlimited,
Streams: rcmgr.Unlimited,
StreamsInbound: rcmgr.DefaultLimit,
StreamsOutbound: rcmgr.Unlimited,
},
}

scalingLimitConfig := rcmgr.DefaultLimits
libp2p.SetDefaultServiceLimits(&scalingLimitConfig)

// Anything set above in partialLimits that had a value of rcmgr.DefaultLimit will be overridden.
// Anything in scalingLimitConfig that wasn't defined in partialLimits above will be added (e.g., libp2p's default service limits).
partialLimits = partialLimits.Build(scalingLimitConfig.Scale(int64(maxMemory), maxFD)).ToPartialLimitConfig()

// Simple checks to override autoscaling ensuring limits make sense versus the connmgr values.
// There are ways to break this, but this should catch most problems already.
// We might improve this in the future.
// See: https://github.com/ipfs/kubo/issues/9545
if partialLimits.System.ConnsInbound > rcmgr.DefaultLimit {
maxInboundConns := int(partialLimits.System.ConnsInbound)
if connmgrHighWaterTimesTwo := connMgrHighWater * 2; maxInboundConns < connmgrHighWaterTimesTwo {
maxInboundConns = connmgrHighWaterTimesTwo
}

Check warning on line 121 in rcmgr.go

View check run for this annotation

Codecov / codecov/patch

rcmgr.go#L26-L121

Added lines #L26 - L121 were not covered by tests

if maxInboundConns < 800 {
maxInboundConns = 800
}

Check warning on line 125 in rcmgr.go

View check run for this annotation

Codecov / codecov/patch

rcmgr.go#L123-L125

Added lines #L123 - L125 were not covered by tests

// Scale System.StreamsInbound as well, but use the existing ratio of StreamsInbound to ConnsInbound
if partialLimits.System.StreamsInbound > rcmgr.DefaultLimit {
partialLimits.System.StreamsInbound = rcmgr.LimitVal(int64(maxInboundConns) * int64(partialLimits.System.StreamsInbound) / int64(partialLimits.System.ConnsInbound))
}
partialLimits.System.ConnsInbound = rcmgr.LimitVal(maxInboundConns)

Check warning on line 131 in rcmgr.go

View check run for this annotation

Codecov / codecov/patch

rcmgr.go#L128-L131

Added lines #L128 - L131 were not covered by tests
}

log.Printf(`
go-libp2p Resource Manager limits based on:
- '--max-memory': %q
- '--max-fd': %d
`, humanize.Bytes(maxMemory), maxFD)

// We already have a complete value thus pass in an empty ConcreteLimitConfig.
return partialLimits.Build(rcmgr.ConcreteLimitConfig{})

Check warning on line 143 in rcmgr.go

View check run for this annotation

Codecov / codecov/patch

rcmgr.go#L134-L143

Added lines #L134 - L143 were not covered by tests
}
10 changes: 10 additions & 0 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -82,6 +83,8 @@ type Config struct {
ConnMgrGrace time.Duration

InMemBlockCache int64
MaxMemory uint64
MaxFD int

RoutingV1 string
KuboRPCURLs []string
Expand All @@ -107,6 +110,12 @@ func Setup(ctx context.Context, cfg Config) (*Node, error) {
return nil, err
}

limiter := rcmgr.NewFixedLimiter(makeResourceManagerConfig(cfg.MaxMemory, cfg.MaxFD, cfg.ConnMgrHi))
mgr, err := rcmgr.NewResourceManager(limiter)
if err != nil {
return nil, err
}

Check warning on line 117 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L113-L117

Added lines #L113 - L117 were not covered by tests

opts := []libp2p.Option{
libp2p.ListenAddrStrings(cfg.ListenAddrs...),
libp2p.NATPortMap(),
Expand All @@ -115,6 +124,7 @@ func Setup(ctx context.Context, cfg Config) (*Node, error) {
libp2p.BandwidthReporter(bwc),
libp2p.DefaultTransports,
libp2p.DefaultMuxers,
libp2p.ResourceManager(mgr),

Check warning on line 127 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L127

Added line #L127 was not covered by tests
}

if len(cfg.AnnounceAddrs) > 0 {
Expand Down

0 comments on commit 7714145

Please sign in to comment.