diff --git a/config/swarm.go b/config/swarm.go index abff9142207..d7e40e27cb6 100644 --- a/config/swarm.go +++ b/config/swarm.go @@ -1,7 +1,5 @@ package config -import rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" - type SwarmConfig struct { // AddrFilters specifies a set libp2p addresses that we should never // dial or receive connections from. @@ -141,8 +139,8 @@ type ConnMgr struct { // type ResourceMgr struct { // Enables the Network Resource Manager feature, default to on. - Enabled Flag `json:",omitempty"` - Limits *rcmgr.PartialLimitConfig `json:",omitempty"` + Enabled Flag `json:",omitempty"` + Limits swarmLimits `json:",omitempty"` MaxMemory *OptionalString `json:",omitempty"` MaxFileDescriptors *OptionalInteger `json:",omitempty"` diff --git a/config/types.go b/config/types.go index f753827ab1d..ba05f03639f 100644 --- a/config/types.go +++ b/config/types.go @@ -1,8 +1,10 @@ package config import ( + "bytes" "encoding/json" "fmt" + "io" "strings" "time" ) @@ -412,3 +414,26 @@ func (p OptionalString) String() string { var _ json.Unmarshaler = (*OptionalInteger)(nil) var _ json.Marshaler = (*OptionalInteger)(nil) + +type swarmLimits struct{} + +var _ json.Unmarshaler = swarmLimits{} + +func (swarmLimits) UnmarshalJSON(b []byte) error { + d := json.NewDecoder(bytes.NewReader(b)) + for { + switch tok, err := d.Token(); err { + case io.EOF: + return nil + case nil: + switch tok { + case json.Delim('{'), json.Delim('}'): + // accept empty objects + continue + } + return fmt.Errorf("field Swarm.ResourceMgr.Limits has been removed in 0.19 and must be empty or not present") + default: + return err + } + } +} diff --git a/core/commands/commands_test.go b/core/commands/commands_test.go index 8f94e5d6413..48cafef11e2 100644 --- a/core/commands/commands_test.go +++ b/core/commands/commands_test.go @@ -255,6 +255,7 @@ func TestCommands(t *testing.T) { "/swarm/peering/ls", "/swarm/peering/rm", "/swarm/stats", + "/swarm/resources", "/tar", "/tar/add", "/tar/cat", diff --git a/core/commands/swarm.go b/core/commands/swarm.go index a4bb705f983..94e1b37cdf9 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -10,9 +10,9 @@ import ( "path" "sort" "sync" + "text/tabwriter" "time" - "github.com/ipfs/go-libipfs/files" "github.com/ipfs/kubo/commands" "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/core/commands/cmdenv" @@ -21,6 +21,7 @@ import ( "github.com/ipfs/kubo/repo/fsrepo" cmds "github.com/ipfs/go-ipfs-cmds" + "github.com/libp2p/go-libp2p/core/network" inet "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" @@ -57,8 +58,10 @@ ipfs peers in the internet. "filters": swarmFiltersCmd, "peers": swarmPeersCmd, "peering": swarmPeeringCmd, - "stats": swarmStatsCmd, // libp2p Network Resource Manager - "limit": swarmLimitCmd, // libp2p Network Resource Manager + "stats": swarmStatsCmd, // libp2p Network Resource Manager + "limit": swarmLimitCmd, // libp2p Network Resource Manager + "resources": swarmResourcesCmd, // libp2p Network Resource Manager + }, } @@ -326,27 +329,10 @@ var swarmPeersCmd = &cmds.Command{ var swarmStatsCmd = &cmds.Command{ Status: cmds.Experimental, Helptext: cmds.HelpText{ - Tagline: "Report resource usage for a scope.", - LongDescription: `Report resource usage for a scope. -The scope can be one of the following: -- system -- reports the system aggregate resource usage. -- transient -- reports the transient resource usage. -- svc: -- reports the resource usage of a specific service. -- proto: -- reports the resource usage of a specific protocol. -- peer: -- reports the resource usage of a specific peer. -- all -- reports the resource usage for all currently active scopes. - + Tagline: "Report resource usage reported by libp2p Resource Manager.", + LongDescription: `Report resource usage reported by libp2p Resource Manager. The output of this command is JSON. - -To see all resources that are close to hitting their respective limit, one can do something like: - ipfs swarm stats --min-used-limit-perc=90 all `}, - Arguments: []cmds.Argument{ - cmds.StringArg("scope", true, false, "scope of the stat report"), - }, - Options: []cmds.Option{ - cmds.IntOption(swarmUsedResourcesPercentageName, "Only display resources that are using above the specified percentage of their respective limit"), - }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { node, err := cmdenv.GetNode(env) if err != nil { @@ -357,25 +343,16 @@ To see all resources that are close to hitting their respective limit, one can d return libp2p.ErrNoResourceMgr } - if len(req.Arguments) != 1 { - return fmt.Errorf("must specify exactly one scope") - } - - percentage, _ := req.Options[swarmUsedResourcesPercentageName].(int) - scope := req.Arguments[0] - - if percentage != 0 && scope != "all" { - return fmt.Errorf("%q can only be used when scope is %q", swarmUsedResourcesPercentageName, "all") + rapi, ok := node.ResourceManager.(rcmgr.ResourceManagerState) + if !ok { // NullResourceManager + return libp2p.ErrNoResourceMgr } - result, err := libp2p.NetStat(node.ResourceManager, scope, percentage) - if err != nil { - return err - } + stats := rapi.Stat() b := new(bytes.Buffer) enc := json.NewEncoder(b) - err = enc.Encode(result) + err = enc.Encode(stats) if err != nil { return err } @@ -389,88 +366,38 @@ To see all resources that are close to hitting their respective limit, one can d var swarmLimitCmd = &cmds.Command{ Status: cmds.Experimental, Helptext: cmds.HelpText{ - Tagline: "Get or set resource limits for a scope.", - LongDescription: `Get or set resource limits for a scope. -The scope can be one of the following: -- all -- all limits actually being applied. -- system -- limits for the system aggregate resource usage. -- transient -- limits for the transient resource usage. -- svc: -- limits for the resource usage of a specific service. -- proto: -- limits for the resource usage of a specific protocol. -- peer: -- limits for the resource usage of a specific peer. - + Tagline: "Get actual limits reported by libp2p Resource Manager.", + LongDescription: `Get actual limits reported by libp2p Resource Manager. The output of this command is JSON. - -It is possible to use this command to inspect and tweak limits at runtime: - - $ ipfs swarm limit system > limit.json - $ vi limit.json - $ ipfs swarm limit system limit.json - -Changes made via command line are persisted in the Swarm.ResourceMgr.Limits field of the $IPFS_PATH/config file. `}, - Arguments: []cmds.Argument{ - cmds.StringArg("scope", true, false, "scope of the limit"), - cmds.FileArg("limit.json", false, false, "limits to be set").EnableStdin(), - }, - Options: []cmds.Option{ - cmds.BoolOption(swarmResetLimitsOptionName, "reset limit to default"), - }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { node, err := cmdenv.GetNode(env) if err != nil { return err } - if node.ResourceManager == nil { + if _, ok := node.ResourceManager.(*network.NullResourceManager); ok { return libp2p.ErrNoResourceMgr } - scope := req.Arguments[0] - - // set scope limit to new values (when limit.json is passed as a second arg) - if req.Files != nil { - var newLimit rcmgr.ResourceLimits - it := req.Files.Entries() - if it.Next() { - file := files.FileFromEntry(it) - if file == nil { - return errors.New("expected a JSON file") - } - - r := io.LimitReader(file, 32*1024*1024) // 32MiB - - if err := json.NewDecoder(r).Decode(&newLimit); err != nil { - return fmt.Errorf("decoding JSON as ResourceMgrScopeConfig: %w", err) - } - return libp2p.NetSetLimit(node.ResourceManager, node.Repo, scope, newLimit) - } - if err := it.Err(); err != nil { - return fmt.Errorf("error opening limit JSON file: %w", err) - } + cfg, err := node.Repo.Config() + if err != nil { + return err } - var result interface{} - switch _, reset := req.Options[swarmResetLimitsOptionName]; { - case reset: - result, err = libp2p.NetResetLimit(node.ResourceManager, node.Repo, scope) - case scope == "all": - result, err = libp2p.NetLimitAll(node.ResourceManager) - default: - // get scope limit - result, err = libp2p.NetLimit(node.ResourceManager, scope) - } + userRessourceOverrides, err := node.Repo.UserRessourceOverrides() if err != nil { return err } - if base, ok := result.(rcmgr.BaseLimit); ok { - result = base.ToResourceLimits() + result, _, err := libp2p.LimitConfig(cfg.Swarm, userRessourceOverrides) + if err != nil { + return err } b := new(bytes.Buffer) enc := json.NewEncoder(b) - err = enc.Encode(result) + err = enc.Encode(result.ToPartialLimitConfig()) if err != nil { return err } @@ -481,6 +408,70 @@ Changes made via command line are persisted in the Swarm.ResourceMgr.Limits fiel }, } +var swarmResourcesCmd = &cmds.Command{ + Status: cmds.Experimental, + Helptext: cmds.HelpText{ + Tagline: "Get a summary about all resources in use by libp2p Resource Manager.", + LongDescription: `Get a summary about all resources in use by libp2p Resource Manager. +The output of this command is JSON. +`}, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + node, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if node.ResourceManager == nil { + return libp2p.ErrNoResourceMgr + } + + cfg, err := node.Repo.Config() + if err != nil { + return err + } + + userRessourceOverrides, err := node.Repo.UserRessourceOverrides() + if err != nil { + return err + } + + defaultConfig, _, err := libp2p.LimitConfig(cfg.Swarm, userRessourceOverrides) + if err != nil { + return err + } + + rapi, ok := node.ResourceManager.(rcmgr.ResourceManagerState) + if !ok { // NullResourceManager + return libp2p.ErrNoResourceMgr + } + + stats := rapi.Stat() + statsConfig := libp2p.StatToLimitConfig(stats) + + return cmds.EmitOnce(res, libp2p.LimitConfigsToInfo(defaultConfig, statsConfig)) + }, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, ris libp2p.ResourceInfos) error { + tw := tabwriter.NewWriter(w, 30, 8, 0, '\t', 0) + defer tw.Flush() + + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t\n", "scope name", "limit name", "limit", "usage", "percent usage") + for _, ri := range ris { + fmt.Fprintf(tw, "%s\t%s\t%d\t%d\t%.1f%%\t\n", + ri.ScopeName, + ri.LimitName, + ri.Limit, + ri.CurrentUsage, + (float32(ri.CurrentUsage)/float32(ri.Limit))*100, + ) + } + + return nil + }), + }, + Type: libp2p.ResourceInfos{}, +} + type streamInfo struct { Protocol string } diff --git a/core/node/groups.go b/core/node/groups.go index e640feff1ab..31040bdb33c 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -6,21 +6,19 @@ import ( "fmt" "time" + "github.com/dustin/go-humanize" blockstore "github.com/ipfs/go-ipfs-blockstore" + offline "github.com/ipfs/go-ipfs-exchange-offline" util "github.com/ipfs/go-ipfs-util" "github.com/ipfs/go-log" + uio "github.com/ipfs/go-unixfs/io" "github.com/ipfs/kubo/config" + "github.com/ipfs/kubo/core/node/libp2p" + "github.com/ipfs/kubo/p2p" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p-pubsub/timecache" "github.com/libp2p/go-libp2p/core/peer" - - "github.com/ipfs/kubo/core/node/libp2p" - "github.com/ipfs/kubo/p2p" - - offline "github.com/ipfs/go-ipfs-exchange-offline" - uio "github.com/ipfs/go-unixfs/io" - - "github.com/dustin/go-humanize" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "go.uber.org/fx" ) @@ -37,7 +35,7 @@ var BaseLibP2P = fx.Options( fx.Invoke(libp2p.PNetChecker), ) -func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { +func LibP2P(bcfg *BuildCfg, cfg *config.Config, userRessourceOverrides rcmgr.PartialLimitConfig) fx.Option { var connmgr fx.Option // set connmgr based on Swarm.ConnMgr.Type @@ -150,7 +148,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(libp2p.UserAgent()), // Services (resource management) - fx.Provide(libp2p.ResourceManager(cfg.Swarm)), + fx.Provide(libp2p.ResourceManager(cfg.Swarm, userRessourceOverrides)), fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)), fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.AppendAnnounce, cfg.Addresses.NoAnnounce)), fx.Provide(libp2p.SmuxTransport(cfg.Swarm.Transports)), @@ -249,7 +247,7 @@ var IPNS = fx.Options( ) // Online groups online-only units -func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option { +func Online(bcfg *BuildCfg, cfg *config.Config, userRessourceOverrides rcmgr.PartialLimitConfig) fx.Option { // Namesys params @@ -303,7 +301,7 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(p2p.New), - LibP2P(bcfg, cfg), + LibP2P(bcfg, cfg, userRessourceOverrides), OnlineProviders( cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, @@ -340,9 +338,9 @@ var Core = fx.Options( fx.Provide(Files), ) -func Networked(bcfg *BuildCfg, cfg *config.Config) fx.Option { +func Networked(bcfg *BuildCfg, cfg *config.Config, userRessourceOverrides rcmgr.PartialLimitConfig) fx.Option { if bcfg.Online { - return Online(bcfg, cfg) + return Online(bcfg, cfg, userRessourceOverrides) } return Offline(cfg) } @@ -358,6 +356,11 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option { return bcfgOpts // error } + userRessourceOverrides, err := bcfg.Repo.UserRessourceOverrides() + if err != nil { + return fx.Error(err) + } + // Auto-sharding settings shardSizeString := cfg.Internal.UnixFSShardingSizeThreshold.WithDefault("256kiB") shardSizeInt, err := humanize.ParseBytes(shardSizeString) @@ -381,7 +384,7 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option { Storage(bcfg, cfg), Identity(cfg), IPNS, - Networked(bcfg, cfg), + Networked(bcfg, cfg, userRessourceOverrides), Core, ) diff --git a/core/node/libp2p/rcmgr.go b/core/node/libp2p/rcmgr.go index 0d7ac761c17..42e3ed7052a 100644 --- a/core/node/libp2p/rcmgr.go +++ b/core/node/libp2p/rcmgr.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "path/filepath" - "strings" "github.com/benbjohnson/clock" logging "github.com/ipfs/go-log/v2" @@ -17,21 +16,18 @@ import ( rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs" "github.com/multiformats/go-multiaddr" "go.uber.org/fx" - "golang.org/x/exp/constraints" "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/core/node/helpers" "github.com/ipfs/kubo/repo" ) -// FIXME(@Jorropo): for go-libp2p v0.26.0 use .MustConcrete and .MustBaseLimit instead of .Build(rcmgr.BaseLimit{}). - -const NetLimitDefaultFilename = "limit.json" const NetLimitTraceFilename = "rcmgr.json.gz" var ErrNoResourceMgr = fmt.Errorf("missing ResourceMgr: make sure the daemon is running with Swarm.ResourceMgr.Enabled") -func ResourceManager(cfg config.SwarmConfig) interface{} { +// limitsFile may be nil, if it is it will be ignored +func ResourceManager(cfg config.SwarmConfig, userOverrides rcmgr.PartialLimitConfig) interface{} { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) { var manager network.ResourceManager var opts Libp2pOpts @@ -54,32 +50,18 @@ func ResourceManager(cfg config.SwarmConfig) interface{} { return nil, opts, fmt.Errorf("opening IPFS_PATH: %w", err) } - var limitConfig rcmgr.ConcreteLimitConfig - defaultComputedLimitConfig, err := createDefaultLimitConfig(cfg) + limitConfig, msg, err := LimitConfig(cfg, userOverrides) if err != nil { - return nil, opts, err + return nil, opts, fmt.Errorf("creating final Resource Manager config: %w", err) } - // The logic for defaults and overriding with specified SwarmConfig.ResourceMgr.Limits - // is documented in docs/config.md. - // Any changes here should be reflected there. - if cfg.ResourceMgr.Limits != nil { - userSuppliedOverrideLimitConfig := *cfg.ResourceMgr.Limits - // This effectively overrides the computed default LimitConfig with any non-zero values from cfg.ResourceMgr.Limits. - // Because of how how Apply works, any 0 value for a user supplied override - // will be overriden with a computed default value. - // There currently isn't a way for a user to supply a 0-value override. - limitConfig = userSuppliedOverrideLimitConfig.Build(defaultComputedLimitConfig) - } else { - limitConfig = defaultComputedLimitConfig - } + // We want to see this message on startup, that's why we are using fmt instead of log. + fmt.Print(msg) - if err := ensureConnMgrMakeSenseVsResourceMgr(limitConfig, cfg.ConnMgr); err != nil { + if err := ensureConnMgrMakeSenseVsResourceMgr(limitConfig, cfg); err != nil { return nil, opts, err } - limiter := rcmgr.NewFixedLimiter(limitConfig) - str, err := rcmgrObs.NewStatsTraceReporter() if err != nil { return nil, opts, err @@ -106,6 +88,8 @@ func ResourceManager(cfg config.SwarmConfig) interface{} { ropts = append(ropts, rcmgr.WithTrace(traceFilePath)) } + limiter := rcmgr.NewFixedLimiter(limitConfig) + manager, err = rcmgr.NewResourceManager(limiter, ropts...) if err != nil { return nil, opts, fmt.Errorf("creating libp2p resource manager: %w", err) @@ -133,540 +117,245 @@ func ResourceManager(cfg config.SwarmConfig) interface{} { } } -type notOmitEmptyResourceLimit struct { - Streams rcmgr.LimitVal - StreamsInbound rcmgr.LimitVal - StreamsOutbound rcmgr.LimitVal - Conns rcmgr.LimitVal - ConnsInbound rcmgr.LimitVal - ConnsOutbound rcmgr.LimitVal - FD rcmgr.LimitVal - Memory rcmgr.LimitVal64 -} - -func resourceLimitsToNotOmitEmpty(r rcmgr.ResourceLimits) notOmitEmptyResourceLimit { - return notOmitEmptyResourceLimit{ - Streams: r.Streams, - StreamsInbound: r.StreamsInbound, - StreamsOutbound: r.StreamsOutbound, - Conns: r.Conns, - ConnsInbound: r.ConnsInbound, - ConnsOutbound: r.ConnsOutbound, - FD: r.FD, - Memory: r.Memory, +// LimitConfig returns the actual computed limits depending on the configuration. +// limitsFile may be nil, then it will be ignored. +func LimitConfig(cfg config.SwarmConfig, userOverrides rcmgr.PartialLimitConfig) (rcmgr.ConcreteLimitConfig, string, error) { + var limitConfig rcmgr.ConcreteLimitConfig + limitConfig, msg, err := createDefaultLimitConfig(cfg) + if err != nil { + return rcmgr.ConcreteLimitConfig{}, msg, err } -} - -type NetStatOut struct { - System *notOmitEmptyResourceLimit `json:",omitempty"` - Transient *notOmitEmptyResourceLimit `json:",omitempty"` - Services map[string]notOmitEmptyResourceLimit `json:",omitempty"` - Protocols map[string]notOmitEmptyResourceLimit `json:",omitempty"` - Peers map[string]notOmitEmptyResourceLimit `json:",omitempty"` -} -func NetStat(mgr network.ResourceManager, scope string, percentage int) (NetStatOut, error) { - var err error - var result NetStatOut - switch { - case scope == "all": - rapi, ok := mgr.(rcmgr.ResourceManagerState) - if !ok { // NullResourceManager - return result, ErrNoResourceMgr - } + // The logic for defaults and overriding with specified limitsFile + // is documented in docs/config.md. + // Any changes here should be reflected there. - limits, err := NetLimitAll(mgr) - if err != nil { - return result, err - } + // This effectively overrides the computed default LimitConfig with any non-zero values from the limits file. + // Because of how how Build works, any 0 value for a user supplied override + // will be overriden with a computed default value. + limitConfig = userOverrides.Build(limitConfig) - stat := rapi.Stat() - if s := scopeToLimit(stat.System); compareLimits(s, *limits.System, percentage) { - result.System = &s - } - if s := scopeToLimit(stat.Transient); compareLimits(s, *limits.Transient, percentage) { - result.Transient = &s - } - if len(stat.Services) > 0 { - result.Services = make(map[string]notOmitEmptyResourceLimit, len(stat.Services)) - for srv, s := range stat.Services { - ls := limits.Services[srv] - if stat := scopeToLimit(s); compareLimits(stat, ls, percentage) { - result.Services[srv] = stat - } - } - } - if len(stat.Protocols) > 0 { - result.Protocols = make(map[string]notOmitEmptyResourceLimit, len(stat.Protocols)) - for proto, s := range stat.Protocols { - ls := limits.Protocols[string(proto)] - if stat := scopeToLimit(s); compareLimits(stat, ls, percentage) { - result.Protocols[string(proto)] = stat - } - } - } - if len(stat.Peers) > 0 { - result.Peers = make(map[string]notOmitEmptyResourceLimit, len(stat.Peers)) - for p, s := range stat.Peers { - ls := limits.Peers[p.Pretty()] - if stat := scopeToLimit(s); compareLimits(stat, ls, percentage) { - result.Peers[p.Pretty()] = stat - } - } - } - - return result, nil - - case scope == config.ResourceMgrSystemScope: - err = mgr.ViewSystem(func(s network.ResourceScope) error { - stat := scopeToLimit(s.Stat()) - result.System = &stat - return nil - }) - return result, err - - case scope == config.ResourceMgrTransientScope: - err = mgr.ViewTransient(func(s network.ResourceScope) error { - stat := scopeToLimit(s.Stat()) - result.Transient = &stat - return nil - }) - return result, err - - case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix): - svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix) - err = mgr.ViewService(svc, func(s network.ServiceScope) error { - result.Services = map[string]notOmitEmptyResourceLimit{ - svc: scopeToLimit(s.Stat()), - } - return nil - }) - return result, err - - case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix): - proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix) - err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { - result.Protocols = map[string]notOmitEmptyResourceLimit{ - proto: scopeToLimit(s.Stat()), - } - return nil - }) - return result, err - - case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix): - p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix) - pid, err := peer.Decode(p) - if err != nil { - return result, fmt.Errorf("invalid peer ID: %q: %w", p, err) - } - err = mgr.ViewPeer(pid, func(s network.PeerScope) error { - result.Peers = map[string]notOmitEmptyResourceLimit{ - p: scopeToLimit(s.Stat()), - } - return nil - }) - return result, err - - default: - return result, fmt.Errorf("invalid scope %q", scope) - } + return limitConfig, msg, nil } -var scopes = []string{ - config.ResourceMgrSystemScope, - config.ResourceMgrTransientScope, - config.ResourceMgrServiceScopePrefix, - config.ResourceMgrProtocolScopePrefix, - config.ResourceMgrPeerScopePrefix, -} +func StatToLimitConfig(stats rcmgr.ResourceManagerStat) rcmgr.ConcreteLimitConfig { + result := rcmgr.PartialLimitConfig{} -func scopeToLimit(s network.ScopeStat) notOmitEmptyResourceLimit { - return notOmitEmptyResourceLimit{ - Streams: rcmgr.LimitVal(s.NumStreamsInbound + s.NumStreamsOutbound), - StreamsInbound: rcmgr.LimitVal(s.NumStreamsInbound), - StreamsOutbound: rcmgr.LimitVal(s.NumStreamsOutbound), - Conns: rcmgr.LimitVal(s.NumConnsInbound + s.NumConnsOutbound), - ConnsInbound: rcmgr.LimitVal(s.NumConnsInbound), - ConnsOutbound: rcmgr.LimitVal(s.NumConnsOutbound), - FD: rcmgr.LimitVal(s.NumFD), - Memory: rcmgr.LimitVal64(s.Memory), + result.Peer = make(map[peer.ID]rcmgr.ResourceLimits) + for i, p := range stats.Peers { + result.Peer[i] = scopeStatToBaseLimit(p).ToResourceLimits() } -} -// compareLimits compares stat and limit. -// If any of the stats value are equals or above the specified percentage, -// it returns true. -func compareLimits(stat, limit notOmitEmptyResourceLimit, percentage int) bool { - if abovePercentage(int(stat.Memory), int(limit.Memory), percentage) { - return true - } - if abovePercentage(stat.ConnsInbound, limit.ConnsInbound, percentage) { - return true - } - if abovePercentage(stat.ConnsOutbound, limit.ConnsOutbound, percentage) { - return true - } - if abovePercentage(stat.Conns, limit.Conns, percentage) { - return true - } - if abovePercentage(stat.FD, limit.FD, percentage) { - return true - } - if abovePercentage(stat.StreamsInbound, limit.StreamsInbound, percentage) { - return true - } - if abovePercentage(stat.StreamsOutbound, limit.StreamsOutbound, percentage) { - return true + stats.Protocols = make(map[protocol.ID]network.ScopeStat) + for i, p := range stats.Protocols { + result.Protocol[i] = scopeStatToBaseLimit(p).ToResourceLimits() } - if abovePercentage(stat.Streams, limit.Streams, percentage) { - return true - } - - return false -} -func abovePercentage[T constraints.Integer | constraints.Float](v1, v2 T, percentage int) bool { - if percentage == 0 { - return true + stats.Services = make(map[string]network.ScopeStat) + for i, s := range stats.Services { + result.Service[i] = scopeStatToBaseLimit(s).ToResourceLimits() } - if v2 == 0 { - return false - } + result.System = scopeStatToBaseLimit(stats.System).ToResourceLimits() + result.Transient = scopeStatToBaseLimit(stats.Transient).ToResourceLimits() - return int((float64(v1)/float64(v2))*100) >= percentage + return result.Build(rcmgr.ConcreteLimitConfig{}) // fill zeros with zeros } -func NetLimitAll(mgr network.ResourceManager) (*NetStatOut, error) { - var result = &NetStatOut{} - lister, ok := mgr.(rcmgr.ResourceManagerState) - if !ok { // NullResourceManager - return result, ErrNoResourceMgr - } - - for _, s := range scopes { - switch s { - case config.ResourceMgrSystemScope: - s, err := NetLimit(mgr, config.ResourceMgrSystemScope) - if err != nil { - return nil, err - } - result.System = &s - case config.ResourceMgrTransientScope: - s, err := NetLimit(mgr, config.ResourceMgrSystemScope) - if err != nil { - return nil, err - } - result.Transient = &s - case config.ResourceMgrServiceScopePrefix: - result.Services = make(map[string]notOmitEmptyResourceLimit) - for _, serv := range lister.ListServices() { - s, err := NetLimit(mgr, config.ResourceMgrServiceScopePrefix+serv) - if err != nil { - return nil, err - } - result.Services[serv] = s - } - case config.ResourceMgrProtocolScopePrefix: - result.Protocols = make(map[string]notOmitEmptyResourceLimit) - for _, prot := range lister.ListProtocols() { - ps := string(prot) - s, err := NetLimit(mgr, config.ResourceMgrProtocolScopePrefix+ps) - if err != nil { - return nil, err - } - result.Protocols[ps] = s - } - case config.ResourceMgrPeerScopePrefix: - result.Peers = make(map[string]notOmitEmptyResourceLimit) - for _, peer := range lister.ListPeers() { - ps := peer.Pretty() - s, err := NetLimit(mgr, config.ResourceMgrPeerScopePrefix+ps) - if err != nil { - return nil, err - } - result.Peers[ps] = s - } - } - } +type ResourceInfos []*ResourceInfo - return result, nil +type ResourceInfo struct { + ScopeName string + LimitName string + Limit int64 + CurrentUsage int64 } -func NetLimit(mgr network.ResourceManager, scope string) (notOmitEmptyResourceLimit, error) { - var result rcmgr.ResourceLimits - getLimit := func(s network.ResourceScope) error { - limiter, ok := s.(rcmgr.ResourceScopeLimiter) - if !ok { // NullResourceManager - return ErrNoResourceMgr - } - - switch limit := limiter.Limit(); l := limit.(type) { - case *rcmgr.BaseLimit: - result = l.ToResourceLimits() - case rcmgr.BaseLimit: - result = l.ToResourceLimits() - default: - return fmt.Errorf("unknown limit type %T", limit) - } +// LimitConfigsToInfo gets limits and stats and generates a list of scopes and limits to be printed. +func LimitConfigsToInfo(l, s rcmgr.ConcreteLimitConfig) ResourceInfos { + result := ResourceInfos{} - return nil - } + limits := l.ToPartialLimitConfig() + stats := s.ToPartialLimitConfig() - var err error - switch { - case scope == config.ResourceMgrSystemScope: - err = mgr.ViewSystem(func(s network.ResourceScope) error { return getLimit(s) }) - case scope == config.ResourceMgrTransientScope: - err = mgr.ViewTransient(func(s network.ResourceScope) error { return getLimit(s) }) - case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix): - svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix) - err = mgr.ViewService(svc, func(s network.ServiceScope) error { return getLimit(s) }) - case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix): - proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix) - err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return getLimit(s) }) - case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix): - p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix) - var pid peer.ID - pid, err = peer.Decode(p) - if err != nil { - return notOmitEmptyResourceLimit{}, fmt.Errorf("invalid peer ID: %q: %w", p, err) - } - err = mgr.ViewPeer(pid, func(s network.PeerScope) error { return getLimit(s) }) - default: - err = fmt.Errorf("invalid scope %q", scope) - } - return resourceLimitsToNotOmitEmpty(result), err -} + result = append(result, ressourceLimitToRessourceInfo(config.ResourceMgrSystemScope, limits.System, stats.System)...) + result = append(result, ressourceLimitToRessourceInfo(config.ResourceMgrTransientScope, limits.Transient, stats.Transient)...) -// NetSetLimit sets new ResourceManager limits for the given scope. The limits take effect immediately, and are also persisted to the repo config. -func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limit rcmgr.ResourceLimits) error { - setLimit := func(s network.ResourceScope) error { - limiter, ok := s.(rcmgr.ResourceScopeLimiter) - if !ok { // NullResourceManager - return ErrNoResourceMgr + for i, p := range stats.Peer { + // check if we have specific limits for this peer + var bl rcmgr.ResourceLimits + lp, ok := limits.Peer[i] + if !ok { + bl = limits.PeerDefault + } else { + bl = lp } - l := rcmgr.InfiniteLimits.ToPartialLimitConfig().System - limiter.SetLimit(limit.Build(l.Build(rcmgr.BaseLimit{}))) - return nil + result = append(result, ressourceLimitToRessourceInfo( + config.ResourceMgrPeerScopePrefix+i.Pretty(), + bl, + p, + )...) } - cfg, err := repo.Config() - if err != nil { - return fmt.Errorf("reading config to set limit: %w", err) - } - - if cfg.Swarm.ResourceMgr.Limits == nil { - cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{} - } - configLimits := cfg.Swarm.ResourceMgr.Limits - - var setConfigFunc func() - switch { - case scope == config.ResourceMgrSystemScope: - err = mgr.ViewSystem(func(s network.ResourceScope) error { return setLimit(s) }) - setConfigFunc = func() { configLimits.System = limit } - case scope == config.ResourceMgrTransientScope: - err = mgr.ViewTransient(func(s network.ResourceScope) error { return setLimit(s) }) - setConfigFunc = func() { configLimits.Transient = limit } - case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix): - svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix) - err = mgr.ViewService(svc, func(s network.ServiceScope) error { return setLimit(s) }) - setConfigFunc = func() { - if configLimits.Service == nil { - configLimits.Service = map[string]rcmgr.ResourceLimits{} - } - configLimits.Service[svc] = limit - } - case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix): - proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix) - err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return setLimit(s) }) - setConfigFunc = func() { - if configLimits.Protocol == nil { - configLimits.Protocol = map[protocol.ID]rcmgr.ResourceLimits{} - } - configLimits.Protocol[protocol.ID(proto)] = limit - } - case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix): - p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix) - var pid peer.ID - pid, err = peer.Decode(p) - if err != nil { - return fmt.Errorf("invalid peer ID: %q: %w", p, err) - } - err = mgr.ViewPeer(pid, func(s network.PeerScope) error { return setLimit(s) }) - setConfigFunc = func() { - if configLimits.Peer == nil { - configLimits.Peer = map[peer.ID]rcmgr.ResourceLimits{} - } - configLimits.Peer[pid] = limit + for i, p := range stats.Protocol { + // check if we have specific limits for this protocol + var bl rcmgr.ResourceLimits + lp, ok := limits.Protocol[i] + if !ok { + bl = limits.ProtocolDefault + } else { + bl = lp } - default: - return fmt.Errorf("invalid scope %q", scope) - } - if err != nil { - return fmt.Errorf("setting new limits on resource manager: %w", err) + result = append(result, ressourceLimitToRessourceInfo( + config.ResourceMgrProtocolScopePrefix+string(i), + bl, + p, + )...) } - if cfg.Swarm.ResourceMgr.Limits == nil { - cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{} - } - setConfigFunc() - - if err := repo.SetConfig(cfg); err != nil { - return fmt.Errorf("writing new limits to repo config: %w", err) - } - - return nil -} - -// NetResetLimit resets ResourceManager limits to defaults. The limits take effect immediately, and are also persisted to the repo config. -func NetResetLimit(mgr network.ResourceManager, repo repo.Repo, scope string) (rcmgr.BaseLimit, error) { - var result rcmgr.BaseLimit - - setLimit := func(s network.ResourceScope, l rcmgr.Limit) error { - limiter, ok := s.(rcmgr.ResourceScopeLimiter) + for i, s := range stats.Service { + // check if we have specific limits for this service + var bl rcmgr.ResourceLimits + lp, ok := limits.Service[i] if !ok { - return ErrNoResourceMgr + bl = limits.ServiceDefault + } else { + bl = lp } - limiter.SetLimit(l) - return nil - } - - cfg, err := repo.Config() - if err != nil { - return rcmgr.BaseLimit{}, fmt.Errorf("reading config to reset limit: %w", err) + result = append(result, ressourceLimitToRessourceInfo( + config.ResourceMgrServiceScopePrefix+i, + bl, + s, + )...) } - defaultsOrig, err := createDefaultLimitConfig(cfg.Swarm) - if err != nil { - return rcmgr.BaseLimit{}, fmt.Errorf("creating default limit config: %w", err) - } - defaults := defaultsOrig.ToPartialLimitConfig() + return result +} - // INVESTIGATE(@Jorropo): Why do we save scaled configs in the repo ? +const ( + limitNameStreams = "Streams" + limitNameStreamsInbound = "StreamsInbound" + limitNameStreamsOutbound = "StreamsOutbound" + limitNameConns = "Conns" + limitNameConnsInbound = "ConnsInbound" + limitNameConnsOutbound = "ConnsOutbound" + limitNameFD = "FD" + limitNameMemory = "Memory" +) - if cfg.Swarm.ResourceMgr.Limits == nil { - cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{} - } - configLimits := cfg.Swarm.ResourceMgr.Limits - - var setConfigFunc func() rcmgr.BaseLimit - switch { - case scope == config.ResourceMgrSystemScope: - err = mgr.ViewSystem(func(s network.ResourceScope) error { return setLimit(s, defaults.System.Build(rcmgr.BaseLimit{})) }) - setConfigFunc = func() rcmgr.BaseLimit { - configLimits.System = defaults.System - return defaults.System.Build(rcmgr.BaseLimit{}) - } - case scope == config.ResourceMgrTransientScope: - err = mgr.ViewTransient(func(s network.ResourceScope) error { return setLimit(s, defaults.Transient.Build(rcmgr.BaseLimit{})) }) - setConfigFunc = func() rcmgr.BaseLimit { - configLimits.Transient = defaults.Transient - return defaults.Transient.Build(rcmgr.BaseLimit{}) - } - case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix): - svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix) - - err = mgr.ViewService(svc, func(s network.ServiceScope) error { - return setLimit(s, defaults.ServiceDefault.Build(rcmgr.BaseLimit{})) - }) - setConfigFunc = func() rcmgr.BaseLimit { - if configLimits.Service == nil { - configLimits.Service = map[string]rcmgr.ResourceLimits{} - } - configLimits.Service[svc] = defaults.ServiceDefault - return defaults.ServiceDefault.Build(rcmgr.BaseLimit{}) - } - case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix): - proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix) - - err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { - return setLimit(s, defaults.ProtocolDefault.Build(rcmgr.BaseLimit{})) - }) - setConfigFunc = func() rcmgr.BaseLimit { - if configLimits.Protocol == nil { - configLimits.Protocol = map[protocol.ID]rcmgr.ResourceLimits{} - } - configLimits.Protocol[protocol.ID(proto)] = defaults.ProtocolDefault +var limits = []string{ + limitNameStreams, + limitNameStreamsInbound, + limitNameStreamsOutbound, + limitNameConns, + limitNameConnsInbound, + limitNameConnsOutbound, + limitNameFD, + limitNameMemory, +} - return defaults.ProtocolDefault.Build(rcmgr.BaseLimit{}) +func ressourceLimitToRessourceInfo(scopeName string, limit, stat rcmgr.ResourceLimits) ResourceInfos { + result := ResourceInfos{} + for _, l := range limits { + ri := &ResourceInfo{ + ScopeName: scopeName, } - case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix): - p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix) - - var pid peer.ID - pid, err = peer.Decode(p) - if err != nil { - return result, fmt.Errorf("invalid peer ID: %q: %w", p, err) + switch l { + case limitNameStreams: + ri.LimitName = limitNameStreams + ri.Limit = int64(limit.Streams) + ri.CurrentUsage = int64(stat.Streams) + case limitNameStreamsInbound: + ri.LimitName = limitNameStreamsInbound + ri.Limit = int64(limit.StreamsInbound) + ri.CurrentUsage = int64(stat.StreamsInbound) + case limitNameStreamsOutbound: + ri.LimitName = limitNameStreamsOutbound + ri.Limit = int64(limit.StreamsOutbound) + ri.CurrentUsage = int64(stat.StreamsOutbound) + case limitNameConns: + ri.LimitName = limitNameConns + ri.Limit = int64(limit.Conns) + ri.CurrentUsage = int64(stat.Conns) + case limitNameConnsInbound: + ri.LimitName = limitNameConnsInbound + ri.Limit = int64(limit.ConnsInbound) + ri.CurrentUsage = int64(stat.ConnsInbound) + case limitNameConnsOutbound: + ri.LimitName = limitNameConnsOutbound + ri.Limit = int64(limit.ConnsOutbound) + ri.CurrentUsage = int64(stat.ConnsOutbound) + case limitNameFD: + ri.LimitName = limitNameFD + ri.Limit = int64(limit.FD) + ri.CurrentUsage = int64(stat.FD) + case limitNameMemory: + ri.LimitName = limitNameMemory + ri.Limit = int64(limit.Memory) + ri.CurrentUsage = int64(stat.Memory) } - err = mgr.ViewPeer(pid, func(s network.PeerScope) error { return setLimit(s, defaults.PeerDefault.Build(rcmgr.BaseLimit{})) }) - setConfigFunc = func() rcmgr.BaseLimit { - if configLimits.Peer == nil { - configLimits.Peer = map[peer.ID]rcmgr.ResourceLimits{} - } - configLimits.Peer[pid] = defaults.PeerDefault - - return defaults.PeerDefault.Build(rcmgr.BaseLimit{}) + if ri.Limit == 0 { + continue } - default: - return result, fmt.Errorf("invalid scope %q", scope) - } - if err != nil { - return result, fmt.Errorf("resetting new limits on resource manager: %w", err) + result = append(result, ri) } - result = setConfigFunc() + return result +} - if err := repo.SetConfig(cfg); err != nil { - return result, fmt.Errorf("writing new limits to repo config: %w", err) +func scopeStatToBaseLimit(ss network.ScopeStat) rcmgr.BaseLimit { + return rcmgr.BaseLimit{ + Streams: ss.NumStreamsInbound + ss.NumStreamsOutbound, + StreamsInbound: ss.NumStreamsInbound, + StreamsOutbound: ss.NumStreamsOutbound, + Conns: ss.NumConnsInbound + ss.NumConnsOutbound, + ConnsInbound: ss.NumConnsInbound, + ConnsOutbound: ss.NumConnsOutbound, + FD: ss.NumFD, + Memory: ss.Memory, } - - return result, nil } -func ensureConnMgrMakeSenseVsResourceMgr(orig rcmgr.ConcreteLimitConfig, cmgr config.ConnMgr) error { - if cmgr.Type.WithDefault(config.DefaultConnMgrType) == "none" { - return nil // none connmgr, no checks to do +func ensureConnMgrMakeSenseVsResourceMgr(concreteLimits rcmgr.ConcreteLimitConfig, cfg config.SwarmConfig) error { + if cfg.ConnMgr.Type.WithDefault(config.DefaultConnMgrType) == "none" || len(cfg.ResourceMgr.Allowlist) != 0 { + return nil // none connmgr, or setup with an allow list, no checks to do } - rcm := orig.ToPartialLimitConfig() + rcm := concreteLimits.ToPartialLimitConfig() - highWater := cmgr.HighWater.WithDefault(config.DefaultConnMgrHighWater) - if rcm.System.ConnsInbound <= rcm.System.Conns { - if int64(rcm.System.ConnsInbound) <= highWater { - // nolint - return fmt.Errorf(` -Unable to initialize libp2p due to conflicting limit configuration: -ResourceMgr.Limits.System.ConnsInbound (%d) must be bigger than ConnMgr.HighWater (%d) -`, rcm.System.ConnsInbound, highWater) - } - } else if int64(rcm.System.Conns) <= highWater { + highWater := cfg.ConnMgr.HighWater.WithDefault(config.DefaultConnMgrHighWater) + if rcm.System.Conns != rcmgr.Unlimited && int64(rcm.System.Conns) <= highWater { // nolint return fmt.Errorf(` Unable to initialize libp2p due to conflicting limit configuration: ResourceMgr.Limits.System.Conns (%d) must be bigger than ConnMgr.HighWater (%d) `, rcm.System.Conns, highWater) } - if rcm.System.StreamsInbound <= rcm.System.Streams { - if int64(rcm.System.StreamsInbound) <= highWater { - // nolint - return fmt.Errorf(` + if rcm.System.ConnsInbound != rcmgr.Unlimited && int64(rcm.System.ConnsInbound) <= highWater { + // nolint + return fmt.Errorf(` Unable to initialize libp2p due to conflicting limit configuration: -ResourceMgr.Limits.System.StreamsInbound (%d) must be bigger than ConnMgr.HighWater (%d) -`, rcm.System.StreamsInbound, highWater) - } - } else if int64(rcm.System.Streams) <= highWater { +ResourceMgr.Limits.System.ConnsInbound (%d) must be bigger than ConnMgr.HighWater (%d) +`, rcm.System.ConnsInbound, highWater) + } + if rcm.System.Streams != rcmgr.Unlimited && int64(rcm.System.Streams) <= highWater { // nolint return fmt.Errorf(` Unable to initialize libp2p due to conflicting limit configuration: ResourceMgr.Limits.System.Streams (%d) must be bigger than ConnMgr.HighWater (%d) `, rcm.System.Streams, highWater) + } + if rcm.System.StreamsInbound != rcmgr.Unlimited && int64(rcm.System.StreamsInbound) <= highWater { + // nolint + return fmt.Errorf(` +Unable to initialize libp2p due to conflicting limit configuration: +ResourceMgr.Limits.System.StreamsInbound (%d) must be bigger than ConnMgr.HighWater (%d) +`, rcm.System.StreamsInbound, highWater) } return nil } diff --git a/core/node/libp2p/rcmgr_defaults.go b/core/node/libp2p/rcmgr_defaults.go index 05dda7745a8..a29764d7669 100644 --- a/core/node/libp2p/rcmgr_defaults.go +++ b/core/node/libp2p/rcmgr_defaults.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/dustin/go-humanize" - "github.com/libp2p/go-libp2p" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/pbnjay/memory" @@ -12,53 +11,23 @@ import ( "github.com/ipfs/kubo/core/node/libp2p/fd" ) -// We are doing some magic when parsing config files (we are using a map[string]interface{} to compare config files). -// When you don't have a type the JSON Parse function cast numbers to float64 by default, -// losing precision when writing the final number. So if we use math.MaxInt as our infinite number, -// after writing the config file we will have 9223372036854776000 instead of 9223372036854775807, -// making the parsing process fail. Setting 1e9 (1000000000) as "no limit" value. It also avoids to overflow on 32 bit architectures. -const bigEnough = 1e9 - -var infiniteBaseLimit = rcmgr.BaseLimit{ - Streams: bigEnough, - StreamsInbound: bigEnough, - StreamsOutbound: bigEnough, - Conns: bigEnough, - ConnsInbound: bigEnough, - ConnsOutbound: bigEnough, - FD: bigEnough, - Memory: bigEnough, -} - -var noLimitIncrease = rcmgr.BaseLimitIncrease{ - ConnsInbound: 0, - ConnsOutbound: 0, - Conns: 0, - StreamsInbound: 0, - StreamsOutbound: 0, - Streams: 0, - Memory: 0, - FDFraction: 0, -} - // This file defines implicit limit defaults used when Swarm.ResourceMgr.Enabled // createDefaultLimitConfig creates LimitConfig to pass to libp2p's resource manager. // The defaults follow the documentation in docs/libp2p-resource-management.md. // Any changes in the logic here should be reflected there. -func createDefaultLimitConfig(cfg config.SwarmConfig) (rcmgr.ConcreteLimitConfig, error) { +func createDefaultLimitConfig(cfg config.SwarmConfig) (rcmgr.ConcreteLimitConfig, string, error) { maxMemoryDefaultString := humanize.Bytes(uint64(memory.TotalMemory()) / 2) maxMemoryString := cfg.ResourceMgr.MaxMemory.WithDefault(maxMemoryDefaultString) maxMemory, err := humanize.ParseBytes(maxMemoryString) if err != nil { - return rcmgr.ConcreteLimitConfig{}, err + return rcmgr.ConcreteLimitConfig{}, "", err } maxMemoryMB := maxMemory / (1024 * 1024) maxFD := int(cfg.ResourceMgr.MaxFileDescriptors.WithDefault(int64(fd.GetNumFDs()) / 2)) - // We want to see this message on startup, that's why we are using fmt instead of log. - fmt.Printf(` + msg := fmt.Sprintf(` Computing default go-libp2p Resource Manager limits based on: - 'Swarm.ResourceMgr.MaxMemory': %q - 'Swarm.ResourceMgr.MaxFileDescriptors': %d @@ -79,109 +48,79 @@ Run 'ipfs swarm limit all' to see the resulting limits. // (see https://github.com/libp2p/go-libp2p/blob/master/p2p/host/resource-manager/limit_defaults.go#L357 ). systemConnsInbound := int(1 * maxMemoryMB) - scalingLimitConfig := rcmgr.ScalingLimitConfig{ - SystemBaseLimit: rcmgr.BaseLimit{ - Memory: int64(maxMemory), - FD: maxFD, + partialLimits := rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ + Memory: rcmgr.LimitVal64(maxMemory), + FD: rcmgr.LimitVal(maxFD), - // By default, we just limit connections on the inbound side. - Conns: bigEnough, - ConnsInbound: systemConnsInbound, - ConnsOutbound: bigEnough, + Conns: rcmgr.Unlimited, + ConnsInbound: rcmgr.LimitVal(systemConnsInbound), + ConnsOutbound: rcmgr.Unlimited, - Streams: bigEnough, - StreamsInbound: bigEnough, - StreamsOutbound: bigEnough, + Streams: rcmgr.Unlimited, + StreamsOutbound: rcmgr.Unlimited, + StreamsInbound: rcmgr.Unlimited, }, - SystemLimitIncrease: noLimitIncrease, // Transient connections won't cause any memory to accounted for by the resource manager. // Only established connections do. // As a result, we can't rely on System.Memory to protect us from a bunch of transient connection being opened. // We limit the same values as the System scope, but only allow the Transient scope to take 25% of what is allowed for the System scope. - TransientBaseLimit: rcmgr.BaseLimit{ - Memory: int64(maxMemory / 4), - FD: maxFD / 4, + Transient: rcmgr.ResourceLimits{ + Memory: rcmgr.LimitVal64(maxMemory / 4), + FD: rcmgr.LimitVal(maxFD / 4), - Conns: bigEnough, - ConnsInbound: systemConnsInbound / 4, - ConnsOutbound: bigEnough, + Conns: rcmgr.Unlimited, + ConnsInbound: rcmgr.LimitVal(systemConnsInbound / 4), + ConnsOutbound: rcmgr.Unlimited, - Streams: bigEnough, - StreamsInbound: bigEnough, - StreamsOutbound: bigEnough, + Streams: rcmgr.Unlimited, + StreamsInbound: rcmgr.Unlimited, + StreamsOutbound: rcmgr.Unlimited, }, - TransientLimitIncrease: noLimitIncrease, - // Lets get out of the way of the allow list functionality. // If someone specified "Swarm.ResourceMgr.Allowlist" we should let it go through. - AllowlistedSystemBaseLimit: infiniteBaseLimit, - AllowlistedSystemLimitIncrease: noLimitIncrease, + AllowlistedSystem: rcmgr.InfiniteLimits.ToPartialLimitConfig().System, - AllowlistedTransientBaseLimit: infiniteBaseLimit, - AllowlistedTransientLimitIncrease: noLimitIncrease, + AllowlistedTransient: rcmgr.InfiniteLimits.ToPartialLimitConfig().System, // Keep it simple by not having Service, ServicePeer, Protocol, ProtocolPeer, Conn, or Stream limits. - ServiceBaseLimit: infiniteBaseLimit, - ServiceLimitIncrease: noLimitIncrease, + ServiceDefault: rcmgr.InfiniteLimits.ToPartialLimitConfig().System, - ServicePeerBaseLimit: infiniteBaseLimit, - ServicePeerLimitIncrease: noLimitIncrease, + ServicePeerDefault: rcmgr.InfiniteLimits.ToPartialLimitConfig().System, - ProtocolBaseLimit: infiniteBaseLimit, - ProtocolLimitIncrease: noLimitIncrease, + ProtocolDefault: rcmgr.InfiniteLimits.ToPartialLimitConfig().System, - ProtocolPeerBaseLimit: infiniteBaseLimit, - ProtocolPeerLimitIncrease: noLimitIncrease, + ProtocolPeerDefault: rcmgr.InfiniteLimits.ToPartialLimitConfig().System, - ConnBaseLimit: infiniteBaseLimit, - ConnLimitIncrease: noLimitIncrease, + Conn: rcmgr.InfiniteLimits.ToPartialLimitConfig().System, - StreamBaseLimit: infiniteBaseLimit, - StreamLimitIncrease: noLimitIncrease, + Stream: rcmgr.InfiniteLimits.ToPartialLimitConfig().System, // Limit the resources consumed by a peer. // This doesn't protect us against intentional DoS attacks since an attacker can easily spin up multiple peers. // We specify this limit against unintentional DoS attacks (e.g., a peer has a bug and is sending too much traffic intentionally). // In that case we want to keep that peer's resource consumption contained. // To keep this simple, we only constrain inbound connections and streams. - PeerBaseLimit: rcmgr.BaseLimit{ - Memory: bigEnough, - FD: bigEnough, - Conns: bigEnough, - ConnsInbound: rcmgr.DefaultLimits.PeerBaseLimit.ConnsInbound, - ConnsOutbound: bigEnough, - Streams: bigEnough, - StreamsInbound: rcmgr.DefaultLimits.PeerBaseLimit.StreamsInbound, - StreamsOutbound: bigEnough, - }, - // Most limits don't see an increase because they're already infinite/bigEnough. - // The values that should scale based on the amount of memory allocated to libp2p need to increase accordingly. - PeerLimitIncrease: rcmgr.BaseLimitIncrease{ - Memory: 0, - FDFraction: 0, - Conns: 0, - ConnsInbound: rcmgr.DefaultLimits.PeerLimitIncrease.ConnsInbound, - ConnsOutbound: 0, - Streams: 0, - StreamsInbound: rcmgr.DefaultLimits.PeerLimitIncrease.StreamsInbound, - StreamsOutbound: 0, + PeerDefault: rcmgr.ResourceLimits{ + Memory: rcmgr.Unlimited64, + FD: rcmgr.Unlimited, + Conns: rcmgr.Unlimited, + ConnsInbound: rcmgr.DefaultLimit, + ConnsOutbound: rcmgr.Unlimited, + Streams: rcmgr.Unlimited, + StreamsInbound: rcmgr.DefaultLimit, + StreamsOutbound: rcmgr.Unlimited, }, } - // Whatever limits libp2p has specifically tuned for its protocols/services we'll apply. - libp2p.SetDefaultServiceLimits(&scalingLimitConfig) - - orig := scalingLimitConfig.Scale(int64(maxMemory), maxFD) - defaultLimitConfig := orig.ToPartialLimitConfig() - // Simple checks to overide autoscaling ensuring limits make sense versus the connmgr values. // There are ways to break this, but this should catch most problems already. // We might improve this in the future. // See: https://github.com/ipfs/kubo/issues/9545 if cfg.ConnMgr.Type.WithDefault(config.DefaultConnMgrType) != "none" { - maxInboundConns := int64(defaultLimitConfig.System.ConnsInbound) + maxInboundConns := int64(partialLimits.System.ConnsInbound) if connmgrHighWaterTimesTwo := cfg.ConnMgr.HighWater.WithDefault(config.DefaultConnMgrHighWater) * 2; maxInboundConns < connmgrHighWaterTimesTwo { maxInboundConns = connmgrHighWaterTimesTwo } @@ -191,9 +130,9 @@ Run 'ipfs swarm limit all' to see the resulting limits. } // Scale System.StreamsInbound as well, but use the existing ratio of StreamsInbound to ConnsInbound - defaultLimitConfig.System.StreamsInbound = rcmgr.LimitVal(maxInboundConns * int64(defaultLimitConfig.System.StreamsInbound) / int64(defaultLimitConfig.System.ConnsInbound)) - defaultLimitConfig.System.ConnsInbound = rcmgr.LimitVal(maxInboundConns) + partialLimits.System.StreamsInbound = rcmgr.LimitVal(maxInboundConns * int64(partialLimits.System.StreamsInbound) / int64(partialLimits.System.ConnsInbound)) + partialLimits.System.ConnsInbound = rcmgr.LimitVal(maxInboundConns) } - return defaultLimitConfig.Build(orig), nil + return partialLimits.Build(rcmgr.DefaultLimits.Scale(int64(maxMemory), maxFD)), msg, nil } diff --git a/core/node/libp2p/rcmgr_test.go b/core/node/libp2p/rcmgr_test.go deleted file mode 100644 index e273ff756aa..00000000000 --- a/core/node/libp2p/rcmgr_test.go +++ /dev/null @@ -1,12 +0,0 @@ -package libp2p - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestPercentage(t *testing.T) { - require.True(t, abovePercentage(10, 100, 10)) - require.True(t, abovePercentage(100, 100, 99)) -} diff --git a/core/node/storage.go b/core/node/storage.go index 2e831fae22e..d109210587d 100644 --- a/core/node/storage.go +++ b/core/node/storage.go @@ -14,7 +14,8 @@ import ( // RepoConfig loads configuration from the repo func RepoConfig(repo repo.Repo) (*config.Config, error) { - return repo.Config() + cfg, err := repo.Config() + return cfg, err } // Datastore provides the datastore diff --git a/go.mod b/go.mod index 762a3add547..c2f47d35fd2 100644 --- a/go.mod +++ b/go.mod @@ -106,7 +106,6 @@ require ( go.uber.org/fx v1.18.2 go.uber.org/zap v1.24.0 golang.org/x/crypto v0.5.0 - golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 golang.org/x/mod v0.7.0 golang.org/x/sync v0.1.0 golang.org/x/sys v0.4.0 @@ -228,6 +227,7 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect go4.org v0.0.0-20200411211856-f5505b9728dd // indirect + golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 // indirect golang.org/x/net v0.5.0 // indirect golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect golang.org/x/term v0.4.0 // indirect diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 99104b08318..765ac3bf63c 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -16,6 +16,7 @@ import ( repo "github.com/ipfs/kubo/repo" "github.com/ipfs/kubo/repo/common" dir "github.com/ipfs/kubo/thirdparty/dir" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" ds "github.com/ipfs/go-datastore" measure "github.com/ipfs/go-ds-measure" @@ -102,11 +103,12 @@ type FSRepo struct { configFilePath string // lockfile is the file system lock to prevent others from opening // the same fsrepo path concurrently - lockfile io.Closer - config *config.Config - ds repo.Datastore - keystore keystore.Keystore - filemgr *filestore.FileManager + lockfile io.Closer + config *config.Config + userRessourceOverrides rcmgr.PartialLimitConfig + ds repo.Datastore + keystore keystore.Keystore + filemgr *filestore.FileManager } var _ repo.Repo = (*FSRepo)(nil) @@ -180,6 +182,10 @@ func open(repoPath string, userConfigFilePath string) (repo.Repo, error) { return nil, err } + if err := r.openUserRessourceOverrides(); err != nil { + return nil, err + } + if err := r.openDatastore(); err != nil { return nil, err } @@ -437,6 +443,16 @@ func (r *FSRepo) openConfig() error { return nil } +// openUserRessourceOverrides will remove all overrides if the file is not present. +// It will error if the decoding fails. +func (r *FSRepo) openUserRessourceOverrides() error { + err := serialize.ReadConfigFile(filepath.Join(r.path, "limits.json"), &r.userRessourceOverrides) + if err == serialize.ErrNotInitialized { + err = nil + } + return err +} + func (r *FSRepo) openKeystore() error { ksp := filepath.Join(r.path, "keystore") ks, err := keystore.NewFSKeystore(ksp) @@ -554,6 +570,21 @@ func (r *FSRepo) Config() (*config.Config, error) { return r.config, nil } +func (r *FSRepo) UserRessourceOverrides() (rcmgr.PartialLimitConfig, error) { + // It is not necessary to hold the package lock since the repo is in an + // opened state. The package lock is _not_ meant to ensure that the repo is + // thread-safe. The package lock is only meant to guard against removal and + // coordinate the lockfile. However, we provide thread-safety to keep + // things simple. + packageLock.Lock() + defer packageLock.Unlock() + + if r.closed { + return rcmgr.PartialLimitConfig{}, errors.New("cannot access config, repo not open") + } + return r.userRessourceOverrides, nil +} + func (r *FSRepo) FileManager() *filestore.FileManager { return r.filemgr } diff --git a/repo/mock.go b/repo/mock.go index a50d448ed02..901a5979ed7 100644 --- a/repo/mock.go +++ b/repo/mock.go @@ -7,6 +7,7 @@ import ( filestore "github.com/ipfs/go-filestore" keystore "github.com/ipfs/go-ipfs-keystore" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" config "github.com/ipfs/kubo/config" ma "github.com/multiformats/go-multiaddr" @@ -26,6 +27,10 @@ func (m *Mock) Config() (*config.Config, error) { return &m.C, nil // FIXME threadsafety } +func (m *Mock) UserRessourceOverrides() (rcmgr.PartialLimitConfig, error) { + return rcmgr.PartialLimitConfig{}, nil +} + func (m *Mock) SetConfig(updated *config.Config) error { m.C = *updated // FIXME threadsafety return nil diff --git a/repo/repo.go b/repo/repo.go index bec02049d18..d65b4c07dd9 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -8,6 +8,7 @@ import ( filestore "github.com/ipfs/go-filestore" keystore "github.com/ipfs/go-ipfs-keystore" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" ds "github.com/ipfs/go-datastore" config "github.com/ipfs/kubo/config" @@ -24,6 +25,10 @@ type Repo interface { // to the returned config are not automatically persisted. Config() (*config.Config, error) + // UserRessourceOverrides returns optional user ressource overrides for the + // libp2p ressource manager. + UserRessourceOverrides() (rcmgr.PartialLimitConfig, error) + // BackupConfig creates a backup of the current configuration file using // the given prefix for naming. BackupConfig(prefix string) (string, error) diff --git a/test/cli/basic_commands_test.go b/test/cli/basic_commands_test.go index 30c1f1f9a9a..220ef285452 100644 --- a/test/cli/basic_commands_test.go +++ b/test/cli/basic_commands_test.go @@ -88,6 +88,7 @@ func TestAllSubcommandsAcceptHelp(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode() for _, cmd := range node.IPFSCommands() { + cmd := cmd t.Run(fmt.Sprintf("command %q accepts help", cmd), func(t *testing.T) { t.Parallel() splitCmd := strings.Split(cmd, " ")[1:] diff --git a/test/cli/harness/node.go b/test/cli/harness/node.go index e486771cad9..640f56df799 100644 --- a/test/cli/harness/node.go +++ b/test/cli/harness/node.go @@ -20,6 +20,7 @@ import ( "github.com/ipfs/kubo/config" serial "github.com/ipfs/kubo/config/serialize" "github.com/libp2p/go-libp2p/core/peer" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) @@ -96,6 +97,38 @@ func (n *Node) UpdateConfig(f func(cfg *config.Config)) { n.WriteConfig(cfg) } +func (n *Node) ReadUserSuppliedRessourceOverrides() *rcmgr.PartialLimitConfig { + var r rcmgr.PartialLimitConfig + err := serial.ReadConfigFile(filepath.Join(n.Dir, "limits.json"), &r) + switch err { + case nil, serial.ErrNotInitialized: + return &r + default: + panic(err) + } +} + +func (n *Node) WriteUserSuppliedRessourceOverrides(c *rcmgr.PartialLimitConfig) { + err := serial.WriteConfigFile(filepath.Join(n.Dir, "limits.json"), c) + if err != nil { + panic(err) + } +} + +func (n *Node) UpdateUserSuppliedRessourceManagerOverrides(f func(overrides *rcmgr.PartialLimitConfig)) { + overrides := n.ReadUserSuppliedRessourceOverrides() + f(overrides) + n.WriteUserSuppliedRessourceOverrides(overrides) +} + +func (n *Node) UpdateConfigAndUserSuppliedRessourceManagerOverrides(f func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig)) { + overrides := n.ReadUserSuppliedRessourceOverrides() + cfg := n.ReadConfig() + f(cfg, overrides) + n.WriteConfig(cfg) + n.WriteUserSuppliedRessourceOverrides(overrides) +} + func (n *Node) IPFS(args ...string) RunResult { res := n.RunIPFS(args...) n.Runner.AssertNoError(res) diff --git a/test/cli/rcmgr_test.go b/test/cli/rcmgr_test.go index 23e123655f7..3870e22bbb7 100644 --- a/test/cli/rcmgr_test.go +++ b/test/cli/rcmgr_test.go @@ -5,8 +5,9 @@ import ( "testing" "github.com/ipfs/kubo/config" - "github.com/ipfs/kubo/core/node/libp2p" "github.com/ipfs/kubo/test/cli/harness" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -25,18 +26,18 @@ func TestRcmgr(t *testing.T) { node.StartDaemon() t.Run("swarm limit should fail", func(t *testing.T) { - res := node.RunIPFS("swarm", "limit", "system") + res := node.RunIPFS("swarm", "limit") assert.Equal(t, 1, res.ExitCode()) - assert.Contains(t, res.Stderr.Lines()[0], "missing ResourceMgr") + assert.Contains(t, res.Stderr.String(), "missing ResourceMgr") }) t.Run("swarm stats should fail", func(t *testing.T) { - res := node.RunIPFS("swarm", "stats", "all") + res := node.RunIPFS("swarm", "stats") assert.Equal(t, 1, res.ExitCode()) - assert.Contains(t, res.Stderr.Lines()[0], "missing ResourceMgr") + assert.Contains(t, res.Stderr.String(), "missing ResourceMgr") }) }) - t.Run("Node in offline mode", func(t *testing.T) { + t.Run("Node in with resource manager disabled", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init() node.UpdateConfig(func(cfg *config.Config) { @@ -45,14 +46,14 @@ func TestRcmgr(t *testing.T) { node.StartDaemon() t.Run("swarm limit should fail", func(t *testing.T) { - res := node.RunIPFS("swarm", "limit", "system") + res := node.RunIPFS("swarm", "limit") assert.Equal(t, 1, res.ExitCode()) - assert.Contains(t, res.Stderr.Lines()[0], "missing ResourceMgr") + assert.Contains(t, res.Stderr.String(), "missing ResourceMgr") }) t.Run("swarm stats should fail", func(t *testing.T) { - res := node.RunIPFS("swarm", "stats", "all") + res := node.RunIPFS("swarm", "stats") assert.Equal(t, 1, res.ExitCode()) - assert.Contains(t, res.Stderr.Lines()[0], "missing ResourceMgr") + assert.Contains(t, res.Stderr.String(), "missing ResourceMgr") }) }) @@ -63,12 +64,16 @@ func TestRcmgr(t *testing.T) { }) node.StartDaemon() - res := node.RunIPFS("swarm", "limit", "system", "--enc=json") + res := node.RunIPFS("swarm", "limit") require.Equal(t, 0, res.ExitCode()) limits := unmarshalLimits(t, res.Stdout.Bytes()) - assert.GreaterOrEqual(t, limits.ConnsInbound, 2000) - assert.GreaterOrEqual(t, limits.StreamsInbound, 2000) + if limits.System.ConnsInbound != rcmgr.Unlimited { + assert.GreaterOrEqual(t, limits.System.ConnsInbound, 2000) + } + if limits.System.StreamsInbound != rcmgr.Unlimited { + assert.GreaterOrEqual(t, limits.System.StreamsInbound, 2000) + } }) t.Run("default configuration", func(t *testing.T) { @@ -80,165 +85,109 @@ func TestRcmgr(t *testing.T) { node.StartDaemon() t.Run("conns and streams are above 800 for default connmgr settings", func(t *testing.T) { - res := node.RunIPFS("swarm", "limit", "system", "--enc=json") + res := node.RunIPFS("swarm", "limit") require.Equal(t, 0, res.ExitCode()) limits := unmarshalLimits(t, res.Stdout.Bytes()) - assert.GreaterOrEqual(t, limits.ConnsInbound, 800) - assert.GreaterOrEqual(t, limits.StreamsInbound, 800) + if limits.System.ConnsInbound != rcmgr.Unlimited { + assert.GreaterOrEqual(t, limits.System.ConnsInbound, 800) + } + if limits.System.StreamsInbound != rcmgr.Unlimited { + assert.GreaterOrEqual(t, limits.System.StreamsInbound, 800) + } }) - t.Run("limits|stats should succeed", func(t *testing.T) { - res := node.RunIPFS("swarm", "limit", "all") + t.Run("limits should succeed", func(t *testing.T) { + res := node.RunIPFS("swarm", "limit") assert.Equal(t, 0, res.ExitCode()) - limits := map[string]rcmgr.ResourceLimits{} + limits := rcmgr.PartialLimitConfig{} err := json.Unmarshal(res.Stdout.Bytes(), &limits) require.NoError(t, err) - assert.Greater(t, limits["System"].Memory, int64(0)) - assert.Greater(t, limits["System"].FD, 0) - assert.Greater(t, limits["System"].Conns, 0) - assert.Greater(t, limits["System"].ConnsInbound, 0) - assert.Greater(t, limits["System"].ConnsOutbound, 0) - assert.Greater(t, limits["System"].Streams, 0) - assert.Greater(t, limits["System"].StreamsInbound, 0) - assert.Greater(t, limits["System"].StreamsOutbound, 0) - assert.Greater(t, limits["Transient"].Memory, int64(0)) - }) - - t.Run("resetting limits should produce the same default limits", func(t *testing.T) { - resetRes := node.RunIPFS("swarm", "limit", "system", "--reset", "--enc=json") - require.Equal(t, 0, resetRes.ExitCode()) - limitRes := node.RunIPFS("swarm", "limit", "system", "--enc=json") - require.Equal(t, 0, limitRes.ExitCode()) - - assert.Equal(t, resetRes.Stdout.Bytes(), limitRes.Stdout.Bytes()) - }) - - t.Run("swarm stats system with filter should fail", func(t *testing.T) { - res := node.RunIPFS("swarm", "stats", "system", "--min-used-limit-perc=99") - assert.Equal(t, 1, res.ExitCode()) - assert.Contains(t, res.Stderr.Lines()[0], `Error: "min-used-limit-perc" can only be used when scope is "all"`) - }) - - t.Run("swarm limit reset on map values should work", func(t *testing.T) { - resetRes := node.RunIPFS("swarm", "limit", "peer:12D3KooWL7i1T9VSPeF8AgQApbyM51GNKZsYPvNvL347aMDmvNzG", "--reset", "--enc=json") - require.Equal(t, 0, resetRes.ExitCode()) - limitRes := node.RunIPFS("swarm", "limit", "peer:12D3KooWL7i1T9VSPeF8AgQApbyM51GNKZsYPvNvL347aMDmvNzG", "--enc=json") - require.Equal(t, 0, limitRes.ExitCode()) - - assert.Equal(t, resetRes.Stdout.Bytes(), limitRes.Stdout.Bytes()) - }) - - t.Run("scope is required using reset flags", func(t *testing.T) { - res := node.RunIPFS("swarm", "limit", "--reset") - assert.Equal(t, 1, res.ExitCode()) - assert.Contains(t, res.Stderr.Lines()[0], `Error: argument "scope" is required`) + assert.NotEqual(t, limits.Transient.Memory, rcmgr.BlockAllLimit64) + assert.NotEqual(t, limits.System.Memory, rcmgr.BlockAllLimit64) + assert.NotEqual(t, limits.System.FD, rcmgr.BlockAllLimit) + assert.NotEqual(t, limits.System.Conns, rcmgr.BlockAllLimit) + assert.NotEqual(t, limits.System.ConnsInbound, rcmgr.BlockAllLimit) + assert.NotEqual(t, limits.System.ConnsOutbound, rcmgr.BlockAllLimit) + assert.NotEqual(t, limits.System.Streams, rcmgr.BlockAllLimit) + assert.NotEqual(t, limits.System.StreamsInbound, rcmgr.BlockAllLimit) + assert.NotEqual(t, limits.System.StreamsOutbound, rcmgr.BlockAllLimit) }) t.Run("swarm stats works", func(t *testing.T) { - res := node.RunIPFS("swarm", "stats", "all", "--enc=json") + res := node.RunIPFS("swarm", "stats") require.Equal(t, 0, res.ExitCode()) - stats := libp2p.NetStatOut{} + stats := rcmgr.ResourceManagerStat{} err := json.Unmarshal(res.Stdout.Bytes(), &stats) require.NoError(t, err) // every scope has the same fields, so we only inspect system - assert.Equal(t, rcmgr.LimitVal64(0), stats.System.Memory) - assert.Equal(t, rcmgr.LimitVal(0), stats.System.FD) - assert.Equal(t, rcmgr.LimitVal(0), stats.System.Conns) - assert.Equal(t, rcmgr.LimitVal(0), stats.System.ConnsInbound) - assert.Equal(t, rcmgr.LimitVal(0), stats.System.ConnsOutbound) - assert.Equal(t, rcmgr.LimitVal(0), stats.System.Streams) - assert.Equal(t, rcmgr.LimitVal(0), stats.System.StreamsInbound) - assert.Equal(t, rcmgr.LimitVal(0), stats.System.StreamsOutbound) - assert.Equal(t, rcmgr.LimitVal64(0), stats.Transient.Memory) + assert.Zero(t, stats.System.Memory) + assert.Zero(t, stats.System.NumFD) + assert.Zero(t, stats.System.NumConnsInbound) + assert.Zero(t, stats.System.NumConnsOutbound) + assert.Zero(t, stats.System.NumStreamsInbound) + assert.Zero(t, stats.System.NumStreamsOutbound) + assert.Zero(t, stats.Transient.Memory) }) }) - t.Run("set system conns limit while daemon is not running", func(t *testing.T) { + t.Run("smoke test transient scope", func(t *testing.T) { node := harness.NewT(t).NewNode().Init() - res := node.RunIPFS("config", "--json", "Swarm.ResourceMgr.Limits.System.Conns", "99999") - require.Equal(t, 0, res.ExitCode()) - - t.Run("set an invalid limit which should result in a failure", func(t *testing.T) { - res := node.RunIPFS("config", "--json", "Swarm.ResourceMgr.Limits.System.Conns", "asdf") - assert.Equal(t, 1, res.ExitCode()) - assert.Contains(t, res.Stderr.String(), "failed to unmarshal") + node.UpdateUserSuppliedRessourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { + overrides.Transient.Memory = 88888 }) - node.StartDaemon() - t.Run("new system conns limit is applied", func(t *testing.T) { - res := node.RunIPFS("swarm", "limit", "system", "--enc=json") - limits := unmarshalLimits(t, res.Stdout.Bytes()) - assert.Equal(t, limits.Conns, rcmgr.LimitVal(99999)) - }) - }) - - t.Run("set the system memory limit while the daemon is running", func(t *testing.T) { - node := harness.NewT(t).NewNode().Init().StartDaemon() - updateLimitsWithFile(t, node, "system", func(limits *rcmgr.ResourceLimits) { - limits.Memory = 99998 - }) - - assert.Equal(t, rcmgr.LimitVal64(99998), node.ReadConfig().Swarm.ResourceMgr.Limits.System.Memory) - - res := node.RunIPFS("swarm", "limit", "system", "--enc=json") + res := node.RunIPFS("swarm", "limit") limits := unmarshalLimits(t, res.Stdout.Bytes()) - assert.Equal(t, rcmgr.LimitVal64(99998), limits.Memory) - }) - - t.Run("smoke test transient scope", func(t *testing.T) { - node := harness.NewT(t).NewNode().Init().StartDaemon() - updateLimitsWithFile(t, node, "transient", func(limits *rcmgr.ResourceLimits) { - limits.Memory = 88888 - }) - - res := node.RunIPFS("swarm", "limit", "transient", "--enc=json") - limits := unmarshalLimits(t, res.Stdout.Bytes()) - assert.Equal(t, rcmgr.LimitVal64(88888), limits.Memory) + assert.Equal(t, rcmgr.LimitVal64(88888), limits.Transient.Memory) }) t.Run("smoke test service scope", func(t *testing.T) { - node := harness.NewT(t).NewNode().Init().StartDaemon() - updateLimitsWithFile(t, node, "svc:foo", func(limits *rcmgr.ResourceLimits) { - limits.Memory = 77777 + node := harness.NewT(t).NewNode().Init() + node.UpdateUserSuppliedRessourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { + overrides.Service = map[string]rcmgr.ResourceLimits{"foo": {Memory: 77777}} }) + node.StartDaemon() - res := node.RunIPFS("swarm", "limit", "svc:foo", "--enc=json") + res := node.RunIPFS("swarm", "limit") limits := unmarshalLimits(t, res.Stdout.Bytes()) - assert.Equal(t, rcmgr.LimitVal64(77777), limits.Memory) + assert.Equal(t, rcmgr.LimitVal64(77777), limits.Service["foo"].Memory) }) t.Run("smoke test protocol scope", func(t *testing.T) { - node := harness.NewT(t).NewNode().Init().StartDaemon() - updateLimitsWithFile(t, node, "proto:foo", func(limits *rcmgr.ResourceLimits) { - limits.Memory = 66666 + node := harness.NewT(t).NewNode().Init() + node.UpdateUserSuppliedRessourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { + overrides.Protocol = map[protocol.ID]rcmgr.ResourceLimits{"foo": {Memory: 66666}} }) + node.StartDaemon() - res := node.RunIPFS("swarm", "limit", "proto:foo", "--enc=json") + res := node.RunIPFS("swarm", "limit") limits := unmarshalLimits(t, res.Stdout.Bytes()) - assert.Equal(t, rcmgr.LimitVal64(66666), limits.Memory) + assert.Equal(t, rcmgr.LimitVal64(66666), limits.Protocol["foo"].Memory) }) t.Run("smoke test peer scope", func(t *testing.T) { - validPeerID := "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN" - node := harness.NewT(t).NewNode().Init().StartDaemon() - updateLimitsWithFile(t, node, "peer:"+validPeerID, func(limits *rcmgr.ResourceLimits) { - limits.Memory = 66666 + validPeerID, err := peer.Decode("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN") + assert.NoError(t, err) + node := harness.NewT(t).NewNode().Init() + node.UpdateUserSuppliedRessourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { + overrides.Peer = map[peer.ID]rcmgr.ResourceLimits{validPeerID: {Memory: 55555}} }) + node.StartDaemon() - res := node.RunIPFS("swarm", "limit", "peer:"+validPeerID, "--enc=json") + res := node.RunIPFS("swarm", "limit") limits := unmarshalLimits(t, res.Stdout.Bytes()) - assert.Equal(t, rcmgr.LimitVal64(66666), limits.Memory) + assert.Equal(t, rcmgr.LimitVal64(55555), limits.Peer[validPeerID].Memory) t.Parallel() t.Run("getting limit for invalid peer ID fails", func(t *testing.T) { - res := node.RunIPFS("swarm", "limit", "peer:foo") + res := node.RunIPFS("swarm", "limit") assert.Equal(t, 1, res.ExitCode()) assert.Contains(t, res.Stderr.String(), "invalid peer ID") }) @@ -258,20 +207,20 @@ func TestRcmgr(t *testing.T) { // peerID0, peerID1, peerID2 := node0.PeerID(), node1.PeerID(), node2.PeerID() peerID1, peerID2 := node1.PeerID().String(), node2.PeerID().String() - node0.UpdateConfig(func(cfg *config.Config) { + node0.UpdateConfigAndUserSuppliedRessourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) { + *overrides = rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ + Conns: rcmgr.BlockAllLimit, + ConnsInbound: rcmgr.BlockAllLimit, + ConnsOutbound: rcmgr.BlockAllLimit, + }, + } cfg.Swarm.ResourceMgr.Enabled = config.True cfg.Swarm.ResourceMgr.Allowlist = []string{"/ip4/0.0.0.0/ipcidr/0/p2p/" + peerID2} }) nodes.StartDaemons() - // change system limits on node 0 - updateLimitsWithFile(t, node0, "system", func(limits *rcmgr.ResourceLimits) { - limits.Conns = rcmgr.BlockAllLimit - limits.ConnsInbound = rcmgr.BlockAllLimit - limits.ConnsOutbound = rcmgr.BlockAllLimit - }) - t.Parallel() t.Run("node 0 should fail to connect to node 1", func(t *testing.T) { res := node0.Runner.Run(harness.RunRequest{ @@ -306,9 +255,10 @@ func TestRcmgr(t *testing.T) { t.Parallel() t.Run("system conns", func(t *testing.T) { node := harness.NewT(t).NewNode().Init() - node.UpdateConfig(func(cfg *config.Config) { - cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{} - cfg.Swarm.ResourceMgr.Limits.System.Conns = 128 + node.UpdateConfigAndUserSuppliedRessourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) { + *overrides = rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{Conns: 128}, + } cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) }) @@ -318,9 +268,10 @@ func TestRcmgr(t *testing.T) { }) t.Run("system conns inbound", func(t *testing.T) { node := harness.NewT(t).NewNode().Init() - node.UpdateConfig(func(cfg *config.Config) { - cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{} - cfg.Swarm.ResourceMgr.Limits.System.ConnsInbound = 128 + node.UpdateConfigAndUserSuppliedRessourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) { + *overrides = rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ConnsInbound: 128}, + } cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) }) @@ -330,9 +281,10 @@ func TestRcmgr(t *testing.T) { }) t.Run("system streams", func(t *testing.T) { node := harness.NewT(t).NewNode().Init() - node.UpdateConfig(func(cfg *config.Config) { - cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{} - cfg.Swarm.ResourceMgr.Limits.System.Streams = 128 + node.UpdateConfigAndUserSuppliedRessourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) { + *overrides = rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{Streams: 128}, + } cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) }) @@ -342,9 +294,10 @@ func TestRcmgr(t *testing.T) { }) t.Run("system streams inbound", func(t *testing.T) { node := harness.NewT(t).NewNode().Init() - node.UpdateConfig(func(cfg *config.Config) { - cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{} - cfg.Swarm.ResourceMgr.Limits.System.StreamsInbound = 128 + node.UpdateConfigAndUserSuppliedRessourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) { + *overrides = rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{StreamsInbound: 128}, + } cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) }) @@ -355,7 +308,7 @@ func TestRcmgr(t *testing.T) { }) } -func updateLimitsWithFile(t *testing.T, node *harness.Node, limit string, f func(*rcmgr.ResourceLimits)) { +func updateLimitsWithFile(t *testing.T, node *harness.Node, limit string, f func(*rcmgr.PartialLimitConfig)) { filename := limit + ".json" res := node.RunIPFS("swarm", "limit", limit) limits := unmarshalLimits(t, res.Stdout.Bytes()) @@ -369,8 +322,8 @@ func updateLimitsWithFile(t *testing.T, node *harness.Node, limit string, f func assert.Equal(t, 0, res.ExitCode()) } -func unmarshalLimits(t *testing.T, b []byte) *rcmgr.ResourceLimits { - limits := &rcmgr.ResourceLimits{} +func unmarshalLimits(t *testing.T, b []byte) *rcmgr.PartialLimitConfig { + limits := &rcmgr.PartialLimitConfig{} err := json.Unmarshal(b, limits) require.NoError(t, err) return limits