Skip to content

Commit

Permalink
TODO
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Mar 1, 2023
1 parent 1957fda commit 1d8f705
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 85 deletions.
22 changes: 15 additions & 7 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,20 @@ type swarmLimits struct{}
var _ json.Unmarshaler = swarmLimits{}

func (swarmLimits) UnmarshalJSON(b []byte) error {
switch _, err := json.NewDecoder(bytes.NewReader(b)).Token(); err {
case io.EOF:
return nil
case nil:
return fmt.Errorf("field Swarm.ResourceMgr.Limits has been removed in 0.19 and must be empty or not present")
default:
return err
d := json.NewDecoder(bytes.NewReader(b))
for {
switch tok, err := d.Token(); err {
case io.EOF:
return nil
case nil:
switch tok {
case json.Delim('{'), json.Delim('}'):
// accept empty objects
continue
}
return fmt.Errorf("field Swarm.ResourceMgr.Limits has been removed in 0.19 and must be empty or not present")
default:
return err
}
}
}
3 changes: 1 addition & 2 deletions core/commands/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,10 @@ The output of this command is JSON.
}

stats := rapi.Stat()
result := libp2p.StatToLimitConfig(stats).ToPartialLimitConfig()

b := new(bytes.Buffer)
enc := json.NewEncoder(b)
err = enc.Encode(result)
err = enc.Encode(stats)
if err != nil {
return err
}
Expand Down
12 changes: 5 additions & 7 deletions core/node/libp2p/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func ResourceManager(cfg config.SwarmConfig, userOverrides rcmgr.PartialLimitCon
// We want to see this message on startup, that's why we are using fmt instead of log.
fmt.Print(msg)

if err := ensureConnMgrMakeSenseVsResourceMgr(limitConfig, cfg.ConnMgr); err != nil {
if err := ensureConnMgrMakeSenseVsResourceMgr(limitConfig, cfg); err != nil {
return nil, opts, err
}

Expand Down Expand Up @@ -138,8 +138,6 @@ func LimitConfig(cfg config.SwarmConfig, userOverrides rcmgr.PartialLimitConfig)
return limitConfig, msg, nil
}

// StatToLimitConfig converts a stats object into a LimitConfig one. This is useful when you want to generate
// JSONs with the same structure for current stats and limits.
func StatToLimitConfig(stats rcmgr.ResourceManagerStat) rcmgr.ConcreteLimitConfig {
result := rcmgr.PartialLimitConfig{}

Expand Down Expand Up @@ -323,14 +321,14 @@ func scopeStatToBaseLimit(ss network.ScopeStat) rcmgr.BaseLimit {
}
}

func ensureConnMgrMakeSenseVsResourceMgr(concreteLimits rcmgr.ConcreteLimitConfig, cmgr config.ConnMgr) error {
if cmgr.Type.WithDefault(config.DefaultConnMgrType) == "none" {
return nil // none connmgr, no checks to do
func ensureConnMgrMakeSenseVsResourceMgr(concreteLimits rcmgr.ConcreteLimitConfig, cfg config.SwarmConfig) error {
if cfg.ConnMgr.Type.WithDefault(config.DefaultConnMgrType) == "none" || len(cfg.ResourceMgr.Allowlist) != 0 {
return nil // none connmgr, or setup with an allow list, no checks to do
}

rcm := concreteLimits.ToPartialLimitConfig()

highWater := cmgr.HighWater.WithDefault(config.DefaultConnMgrHighWater)
highWater := cfg.ConnMgr.HighWater.WithDefault(config.DefaultConnMgrHighWater)
if rcm.System.Conns != rcmgr.Unlimited && int64(rcm.System.Conns) <= highWater {
// nolint
return fmt.Errorf(`
Expand Down
16 changes: 4 additions & 12 deletions repo/fsrepo/fsrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package fsrepo

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -447,18 +446,11 @@ func (r *FSRepo) openConfig() error {
// openUserRessourceOverrides will remove all overrides if the file is not present.
// It will error if the decoding fails.
func (r *FSRepo) openUserRessourceOverrides() error {
f, err := os.Open(filepath.Join(r.path, "limits.json"))
switch {
case err == nil:
case os.IsNotExist(err):
r.userRessourceOverrides = rcmgr.PartialLimitConfig{}
return nil
default:
return err
err := serialize.ReadConfigFile(filepath.Join(r.path, "limits.json"), &r.userRessourceOverrides)
if err == serialize.ErrNotInitialized {
err = nil
}
defer f.Close()

return json.NewDecoder(f).Decode(&r.userRessourceOverrides)
return err
}

func (r *FSRepo) openKeystore() error {
Expand Down
33 changes: 33 additions & 0 deletions test/cli/harness/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ipfs/kubo/config"
serial "github.com/ipfs/kubo/config/serialize"
"github.com/libp2p/go-libp2p/core/peer"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
Expand Down Expand Up @@ -96,6 +97,38 @@ func (n *Node) UpdateConfig(f func(cfg *config.Config)) {
n.WriteConfig(cfg)
}

func (n *Node) ReadUserSuppliedRessourceOverrides() *rcmgr.PartialLimitConfig {
var r rcmgr.PartialLimitConfig
err := serial.ReadConfigFile(filepath.Join(n.Dir, "limits.json"), &r)
switch err {
case nil, serial.ErrNotInitialized:
return &r
default:
panic(err)
}
}

func (n *Node) WriteUserSuppliedRessourceOverrides(c *rcmgr.PartialLimitConfig) {
err := serial.WriteConfigFile(filepath.Join(n.Dir, "limits.json"), c)
if err != nil {
panic(err)
}
}

func (n *Node) UpdateUserSuppliedRessourceManagerOverrides(f func(overrides *rcmgr.PartialLimitConfig)) {
overrides := n.ReadUserSuppliedRessourceOverrides()
f(overrides)
n.WriteUserSuppliedRessourceOverrides(overrides)
}

func (n *Node) UpdateConfigAndUserSuppliedRessourceManagerOverrides(f func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig)) {
overrides := n.ReadUserSuppliedRessourceOverrides()
cfg := n.ReadConfig()
f(cfg, overrides)
n.WriteConfig(cfg)
n.WriteUserSuppliedRessourceOverrides(overrides)
}

func (n *Node) IPFS(args ...string) RunResult {
res := n.RunIPFS(args...)
n.Runner.AssertNoError(res)
Expand Down
104 changes: 47 additions & 57 deletions test/cli/rcmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -47,12 +48,12 @@ func TestRcmgr(t *testing.T) {
t.Run("swarm limit should fail", func(t *testing.T) {
res := node.RunIPFS("swarm", "limit")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.Lines()[0], "missing ResourceMgr")
assert.Contains(t, res.Stderr.String(), "missing ResourceMgr")
})
t.Run("swarm stats should fail", func(t *testing.T) {
res := node.RunIPFS("swarm", "stats")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.Lines()[0], "missing ResourceMgr")
assert.Contains(t, res.Stderr.String(), "missing ResourceMgr")
})
})

Expand Down Expand Up @@ -131,14 +132,8 @@ func TestRcmgr(t *testing.T) {
assert.Equal(t, resetRes.Stdout.Bytes(), limitRes.Stdout.Bytes())
})

t.Run("scope is required using reset flags", func(t *testing.T) {
res := node.RunIPFS("swarm", "limit", "--reset")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.Lines()[0], `Error: argument "scope" is required`)
})

t.Run("swarm stats works", func(t *testing.T) {
res := node.RunIPFS("swarm", "stats", "all", "--enc=json")
res := node.RunIPFS("swarm", "stats")
require.Equal(t, 0, res.ExitCode())

stats := rcmgr.PartialLimitConfig{}
Expand Down Expand Up @@ -178,46 +173,36 @@ func TestRcmgr(t *testing.T) {
})
})

t.Run("set the system memory limit while the daemon is running", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init().StartDaemon()
updateLimitsWithFile(t, node, "system", func(limits *rcmgr.ResourceLimits) {
limits.Memory = 99998
})

assert.Equal(t, rcmgr.LimitVal64(99998), node.ReadConfig().Swarm.ResourceMgr.Limits.System.Memory)

res := node.RunIPFS("swarm", "limit")
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.Equal(t, rcmgr.LimitVal64(99998), limits.System.Memory)
})

t.Run("smoke test transient scope", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init().StartDaemon()
updateLimitsWithFile(t, node, "transient", func(limits *rcmgr.ResourceLimits) {
limits.Memory = 88888
node := harness.NewT(t).NewNode().Init()
node.UpdateUserSuppliedRessourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) {
overrides.Transient.Memory = 88888
})
node.StartDaemon()

res := node.RunIPFS("swarm", "limit")
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.Equal(t, rcmgr.LimitVal64(88888), limits.Transient.Memory)
})

t.Run("smoke test service scope", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init().StartDaemon()
updateLimitsWithFile(t, node, "svc:foo", func(limits *rcmgr.ResourceLimits) {
limits.Memory = 77777
node := harness.NewT(t).NewNode().Init()
node.UpdateUserSuppliedRessourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) {
overrides.Service = map[string]rcmgr.ResourceLimits{"foo": {Memory: 77777}}
})
node.StartDaemon()

res := node.RunIPFS("swarm", "limit")
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.Equal(t, rcmgr.LimitVal64(77777), limits.Service["foo"].Memory)
})

t.Run("smoke test protocol scope", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init().StartDaemon()
updateLimitsWithFile(t, node, "proto:foo", func(limits *rcmgr.ResourceLimits) {
limits.Memory = 66666
node := harness.NewT(t).NewNode().Init()
node.UpdateUserSuppliedRessourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) {
overrides.Protocol = map[protocol.ID]rcmgr.ResourceLimits{"foo": {Memory: 66666}}
})
node.StartDaemon()

res := node.RunIPFS("swarm", "limit")
limits := unmarshalLimits(t, res.Stdout.Bytes())
Expand All @@ -227,19 +212,20 @@ func TestRcmgr(t *testing.T) {
t.Run("smoke test peer scope", func(t *testing.T) {
validPeerID, err := peer.Decode("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN")
assert.NoError(t, err)
node := harness.NewT(t).NewNode().Init().StartDaemon()
updateLimitsWithFile(t, node, "peer:"+validPeerID.Pretty(), func(limits *rcmgr.ResourceLimits) {
limits.Memory = 66666
node := harness.NewT(t).NewNode().Init()
node.UpdateUserSuppliedRessourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) {
overrides.Peer = map[peer.ID]rcmgr.ResourceLimits{validPeerID: {Memory: 55555}}
})
node.StartDaemon()

res := node.RunIPFS("swarm", "limit", "peer:"+validPeerID.Pretty(), "--enc=json")
res := node.RunIPFS("swarm", "limit")
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.Equal(t, rcmgr.LimitVal64(66666), limits.Peer[validPeerID].Memory)
assert.Equal(t, rcmgr.LimitVal64(55555), limits.Peer[validPeerID].Memory)

t.Parallel()

t.Run("getting limit for invalid peer ID fails", func(t *testing.T) {
res := node.RunIPFS("swarm", "limit", "peer:foo")
res := node.RunIPFS("swarm", "limit")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.String(), "invalid peer ID")
})
Expand All @@ -259,20 +245,20 @@ func TestRcmgr(t *testing.T) {
// peerID0, peerID1, peerID2 := node0.PeerID(), node1.PeerID(), node2.PeerID()
peerID1, peerID2 := node1.PeerID().String(), node2.PeerID().String()

node0.UpdateConfig(func(cfg *config.Config) {
node0.UpdateConfigAndUserSuppliedRessourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) {
*overrides = rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{
Conns: rcmgr.BlockAllLimit,
ConnsInbound: rcmgr.BlockAllLimit,
ConnsOutbound: rcmgr.BlockAllLimit,
},
}
cfg.Swarm.ResourceMgr.Enabled = config.True
cfg.Swarm.ResourceMgr.Allowlist = []string{"/ip4/0.0.0.0/ipcidr/0/p2p/" + peerID2}
})

nodes.StartDaemons()

// change system limits on node 0
updateLimitsWithFile(t, node0, "system", func(limits *rcmgr.ResourceLimits) {
limits.Conns = rcmgr.BlockAllLimit
limits.ConnsInbound = rcmgr.BlockAllLimit
limits.ConnsOutbound = rcmgr.BlockAllLimit
})

t.Parallel()
t.Run("node 0 should fail to connect to node 1", func(t *testing.T) {
res := node0.Runner.Run(harness.RunRequest{
Expand Down Expand Up @@ -307,9 +293,10 @@ func TestRcmgr(t *testing.T) {
t.Parallel()
t.Run("system conns", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init()
node.UpdateConfig(func(cfg *config.Config) {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{}
cfg.Swarm.ResourceMgr.Limits.System.Conns = 128
node.UpdateConfigAndUserSuppliedRessourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) {
*overrides = rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{Conns: 128},
}
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128)
cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64)
})
Expand All @@ -319,9 +306,10 @@ func TestRcmgr(t *testing.T) {
})
t.Run("system conns inbound", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init()
node.UpdateConfig(func(cfg *config.Config) {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{}
cfg.Swarm.ResourceMgr.Limits.System.ConnsInbound = 128
node.UpdateConfigAndUserSuppliedRessourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) {
*overrides = rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{ConnsInbound: 128},
}
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128)
cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64)
})
Expand All @@ -331,9 +319,10 @@ func TestRcmgr(t *testing.T) {
})
t.Run("system streams", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init()
node.UpdateConfig(func(cfg *config.Config) {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{}
cfg.Swarm.ResourceMgr.Limits.System.Streams = 128
node.UpdateConfigAndUserSuppliedRessourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) {
*overrides = rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{Streams: 128},
}
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128)
cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64)
})
Expand All @@ -343,9 +332,10 @@ func TestRcmgr(t *testing.T) {
})
t.Run("system streams inbound", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init()
node.UpdateConfig(func(cfg *config.Config) {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{}
cfg.Swarm.ResourceMgr.Limits.System.StreamsInbound = 128
node.UpdateConfigAndUserSuppliedRessourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) {
*overrides = rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{StreamsInbound: 128},
}
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128)
cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64)
})
Expand Down

0 comments on commit 1d8f705

Please sign in to comment.