Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improves daemon <-> pubsub memory footprint when many objects #400

Merged
merged 9 commits into from
Aug 24, 2023
2 changes: 1 addition & 1 deletion cmd/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func newCmdDaemonJoin() *cobra.Command {
panic(err)
}
flags.StringVar(&options.Token, "token", "", "auth token with 'join' role"+
" (created from 'om daemon auth --role json')")
" (created from 'om daemon auth --role join')")
if err := cmd.MarkFlagRequired("token"); err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions daemon/daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (t *T) MainStart(ctx context.Context) error {
}()

bus := pubsub.NewBus("daemon")
bus.SetDefaultSubscriptionQueueSize(200)
bus.SetDrainChanDuration(3 * daemonenv.DrainChanDuration)
bus.Start(t.ctx)
t.cancelFuncs = append(t.cancelFuncs, func() {
Expand Down
5 changes: 4 additions & 1 deletion daemon/daemondata/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ var (
// subHbRefreshInterval is the minimum interval for update of: sub.hb
subHbRefreshInterval = 100 * propagationInterval

// SubscriptionQueueSize is size of "daemondata" subscription
SubscriptionQueueSize = 40000

countRoutineInterval = 1 * time.Second

ErrDrained = errors.New("drained command")
Expand Down Expand Up @@ -375,7 +378,7 @@ func gensEqual(a, b gens) bool {

// startSubscriptions subscribes to label local node messages that change the cluster data view
func (d *data) startSubscriptions() {
sub := d.bus.Sub("daemondata")
sub := d.bus.Sub("daemondata", pubsub.WithQueueSize(SubscriptionQueueSize))
sub.AddFilter(&msgbus.ClusterConfigUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.ClusterStatusUpdated{}, d.labelLocalNode)

Expand Down
7 changes: 6 additions & 1 deletion daemon/discover/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ import (
"github.com/opensvc/om3/util/pubsub"
)

var (
// SubscriptionQueueSizeCfg is size of "discover.cfg" subscription
SubscriptionQueueSizeCfg = 30000
)

func (d *discover) startSubscriptions() *pubsub.Subscription {
bus := pubsub.BusFromContext(d.ctx)
sub := bus.Sub("discover.cfg")
sub := bus.Sub("discover.cfg", pubsub.WithQueueSize(SubscriptionQueueSizeCfg))
sub.AddFilter(&msgbus.InstanceConfigUpdated{})
sub.AddFilter(&msgbus.InstanceConfigDeleted{})
sub.AddFilter(&msgbus.ConfigFileUpdated{})
Expand Down
7 changes: 6 additions & 1 deletion daemon/discover/omon.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ import (
"github.com/opensvc/om3/util/pubsub"
)

var (
// SubscriptionQueueSizeOmon is size of "discover.omon" subscription
SubscriptionQueueSizeOmon = 16000
)

func (d *discover) omon(started chan<- bool) {
log := d.log.With().Str("func", "omon").Logger()
log.Info().Msg("started")
bus := pubsub.BusFromContext(d.ctx)
sub := bus.Sub("omon-from-cfg-create")
sub := bus.Sub("discover.omon", pubsub.WithQueueSize(SubscriptionQueueSizeOmon))
sub.AddFilter(&msgbus.InstanceConfigUpdated{})
sub.Start()
started <- true
Expand Down
5 changes: 4 additions & 1 deletion daemon/dns/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type (

var (
cmdC chan any

// SubscriptionQueueSize is size of "dns" subscription
SubscriptionQueueSize = 1000
)

func init() {
Expand Down Expand Up @@ -121,7 +124,7 @@ func Start(parent context.Context, drainDuration time.Duration) error {
}

func (t *dns) startSubscriptions() {
sub := t.bus.Sub("dns")
sub := t.bus.Sub("dns", pubsub.WithQueueSize(SubscriptionQueueSize))
sub.AddFilter(&msgbus.InstanceStatusUpdated{})
sub.AddFilter(&msgbus.InstanceStatusDeleted{})
sub.AddFilter(&msgbus.ClusterConfigUpdated{})
Expand Down
7 changes: 6 additions & 1 deletion daemon/istat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type (
}
)

var (
// SubscriptionQueueSize is size of "istats" subscription
SubscriptionQueueSize = 1000
)

func Start(ctx context.Context) error {
localhost := hostname.Hostname()
t := T{
Expand All @@ -60,7 +65,7 @@ func Start(ctx context.Context) error {
labelLocalhost: pubsub.Label{"node", localhost},
}

sub := t.bus.Sub("istats")
sub := t.bus.Sub("istats", pubsub.WithQueueSize(SubscriptionQueueSize))
sub.AddFilter(&msgbus.InstanceConfigDeleted{}, t.labelLocalhost)
sub.AddFilter(&msgbus.InstanceFrozenFileRemoved{}, t.labelLocalhost)
sub.AddFilter(&msgbus.InstanceFrozenFileUpdated{}, t.labelLocalhost)
Expand Down
7 changes: 5 additions & 2 deletions daemon/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ var (
node.MonitorStateShutting: nil,
node.MonitorStateMaintenance: nil,
}

// SubscriptionQueueSize is size of "scheduler" subscription
SubscriptionQueueSize = 16000
)

func New(opts ...funcopt.O) *T {
Expand Down Expand Up @@ -232,7 +235,7 @@ func (t *T) MainStop() error {

func (t *T) startSubscriptions() *pubsub.Subscription {
t.pubsub = pubsub.BusFromContext(t.ctx)
sub := t.pubsub.Sub("scheduler")
sub := t.pubsub.Sub("scheduler", pubsub.WithQueueSize(SubscriptionQueueSize))
labelLocalhost := pubsub.Label{"node", t.localhost}
sub.AddFilter(&msgbus.InstanceConfigUpdated{}, labelLocalhost)
sub.AddFilter(&msgbus.InstanceStatusDeleted{}, labelLocalhost)
Expand Down Expand Up @@ -379,7 +382,7 @@ func (t *T) scheduleObject(p path.T) {
t.log.Debug().Msgf("schedule object %s: provisioned state has not been discovered yet", p)
return
} else if !isProvisioned {
t.log.Error().Msgf("schedule object %s: not provisioned", p)
t.log.Info().Msgf("schedule object %s: not provisioned", p)
return
}
i, err := object.New(p, object.WithVolatile(true))
Expand Down
74 changes: 58 additions & 16 deletions util/pubsub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ type (
// drainChanDuration is the max duration during draining channels
drainChanDuration time.Duration

queuedMin uint64
queuedMax uint64
queued atomic.Uint64
queuedMin uint64
queuedMax uint64
queuedSize uint64
queued atomic.Uint64
}

cmdPub struct {
Expand Down Expand Up @@ -150,6 +151,9 @@ type (
// drainChanDuration is the max duration during draining private and exposed
// channel
drainChanDuration time.Duration

// default queue size for subscriptions
subQueueSize uint64
}

stringer interface {
Expand Down Expand Up @@ -204,6 +208,9 @@ func (p *Msg) AddLabels(l ...Label) {
}

var (
// defaultSubscriptionQueueSize is default size of internal subscription queue
defaultSubscriptionQueueSize uint64 = 4000

cmdDurationWarn = time.Second
notifyDurationWarn = 5 * time.Second

Expand Down Expand Up @@ -310,6 +317,7 @@ func NewBus(name string) *Bus {
b.endNotify = make(chan uuid.UUID)
b.log = log.Logger.With().Str("bus", name).Logger()
b.drainChanDuration = defaultDrainChanDuration
b.subQueueSize = defaultSubscriptionQueueSize
return b
}

Expand Down Expand Up @@ -379,6 +387,16 @@ func (b *Bus) SetDrainChanDuration(duration time.Duration) {
b.drainChanDuration = duration
}

// SetDefaultSubscriptionQueueSize overrides the default queue size of subscribers for not yet started bus.
//
// It panics if called on started bus.
func (b *Bus) SetDefaultSubscriptionQueueSize(i uint64) {
if b.started {
panic("can't set default subscription queue size on started bus")
}
b.subQueueSize = i
}

func (b *Bus) onSubCmd(c cmdSub) {
id := uuid.New()
sub := &Subscription{
Expand All @@ -392,6 +410,7 @@ func (b *Bus) onSubCmd(c cmdSub) {
drainChanDuration: b.drainChanDuration,
queuedMax: c.queueSize / 32,
queuedMin: c.queueSize / 32,
queuedSize: c.queueSize,
}
b.subs[id] = sub
c.resp <- sub
Expand Down Expand Up @@ -433,13 +452,36 @@ func (b *Bus) onPubCmd(c cmdPub) {
sub.q <- c.data
publicationPushedTotal.With(prometheus.Labels{"filterkey": toFilterKey}).Inc()
if queueLen > sub.queuedMax {
sub.queuedMax *= 2
go sub.bus.Pub(&SubscriptionQueueThreshold{Name: sub.name, Id: sub.id, Value: queueLen, Next: sub.queuedMax}, Label{"counter", ""})
b.log.Debug().Msgf("subscription %s has reached %d queued increase next threshold %d", sub.name, queueLen, sub.queuedMax)
inc := sub.queuedSize / 4
previous := sub.queuedMax
sub.queuedMax += inc
left := sub.queuedSize - sub.queuedMax
level := "debug"
if left < inc {
// 3/4 full
level = "warn"
b.log.Error().Msgf("subscription %s has reached high %d queued pending message, increase threshold %d -> %d of limit %d", sub.name, queueLen, previous, sub.queuedMax, sub.queuedSize)
} else if left < sub.queuedSize/2 {
// 1/2 full
level = "info"
b.log.Warn().Msgf("subscription %s has reached high %d queued pending message, increase threshold %d -> %d of limit %d", sub.name, queueLen, previous, sub.queuedMax, sub.queuedSize)
} else {
b.log.Debug().Msgf("subscription %s has reached high %d queued pending message, increase threshold %d -> %d of limit %d", sub.name, queueLen, previous, sub.queuedMax, sub.queuedSize)
}
go sub.bus.Pub(&SubscriptionQueueThreshold{Name: sub.name, Id: sub.id, Count: queueLen, From: previous, To: sub.queuedMax, Limit: sub.queuedSize}, Label{"counter", ""}, Label{"level", level})
} else if queueLen > sub.queuedMin && queueLen < sub.queuedMax/4 {
sub.queuedMax /= 2
b.log.Debug().Msgf("subscription %s has reached %d queued decrease next threshold %d", sub.name, queueLen, sub.queuedMax)
go sub.bus.Pub(&SubscriptionQueueThreshold{Name: sub.name, Id: sub.id, Value: queueLen, Next: sub.queuedMax}, Label{"counter", ""})
previous := sub.queuedMax
sub.queuedMax /= 8
left := sub.queuedSize - sub.queuedMax
level := "debug"
if left < sub.queuedSize/2 {
// 1/2 full
level = "info"
b.log.Info().Msgf("subscription %s has reached low %d queued pending message, decrease threshold %d -> %d of limit %d", sub.name, queueLen, previous, sub.queuedMax, sub.queuedSize)
} else {
b.log.Debug().Msgf("subscription %s has reached low %d queued pending message, decrease threshold %d -> %d of limit %d", sub.name, queueLen, previous, sub.queuedMax, sub.queuedSize)
}
go sub.bus.Pub(&SubscriptionQueueThreshold{Name: sub.name, Id: sub.id, Count: queueLen, From: previous, To: sub.queuedMax, Limit: sub.queuedSize}, Label{"counter", ""}, Label{"level", level})
}
}
}
Expand Down Expand Up @@ -529,12 +571,12 @@ type (
)

type (
QueueSize uint64
Timeout time.Duration
WithQueueSize uint64
Timeout time.Duration
)

// queueSize implements QueueSizer for QueueSize
func (t QueueSize) queueSize() uint64 {
// queueSize implements QueueSizer for WithQueueSize
func (t WithQueueSize) queueSize() uint64 {
return uint64(t)
}

Expand All @@ -553,13 +595,13 @@ func (t Timeout) timout() time.Duration {
// defaults is no timeout
//
// when QueueSizer, it sets the subscriber queue size.
// default is 4000
// default value is bus dependent (see SetDefaultSubscriptionQueueSize)
func (b *Bus) Sub(name string, options ...interface{}) *Subscription {
respC := make(chan *Subscription)
op := cmdSub{
name: name,
resp: respC,
queueSize: 4000,
queueSize: b.subQueueSize,
}

for _, opt := range options {
Expand Down Expand Up @@ -821,7 +863,7 @@ func (sub *Subscription) Start() {
if err := sub.push(i); err != nil {
// the subscription got push error, cancel it and ask for unsubscribe
sub.bus.log.Warn().Msgf("%s error: %s. stop subscription", sub, err)
go sub.bus.Pub(&SubscriptionError{Name: sub.name, Id: sub.id, Error: err})
go sub.bus.Pub(&SubscriptionError{Name: sub.name, Id: sub.id, ErrS: err.Error()})
sub.cancel()
go func() {
if err := sub.Stop(); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion util/pubsub/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

func newRun(name string) *Bus {
bus := NewBus(name)
bus.SetDefaultSubscriptionQueueSize(200)
bus.Start(context.Background())
return bus
}
Expand Down Expand Up @@ -187,7 +188,7 @@ func TestDropSlowSubscription(t *testing.T) {
assert.NoError(t, subAlert.Stop(), "%s stop error", subAlert)
}()

queueSize := QueueSize(2)
queueSize := WithQueueSize(2)
t.Log("subscribe with a short timeout, and small queue size")
slowSub := bus.Sub("listen with short timeout", Timeout(timeout), queueSize)
slowSub.AddFilter(&msgT{})
Expand Down
30 changes: 20 additions & 10 deletions util/pubsub/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,30 @@ type (
// SubscriptionError is an emitted publication made when a subscriber notification
// exceeds its timeout
SubscriptionError struct {
Msg `yaml:",inline"`
Id uuid.UUID
Name string
Error error
Msg `yaml:",inline"`
Id uuid.UUID `json:"id" yaml:"id"`
Name string `json:"name" yaml:"name"`
ErrS string `json:"error" yaml:"error"`
}

// SubscriptionQueueThreshold is an emitted publication made when a subscriber queue
// reach/leave its current max value
// reach/leave its current high threshold value
SubscriptionQueueThreshold struct {
Msg `yaml:",inline"`
Id uuid.UUID
Name string
Value uint64
Next uint64
Msg `yaml:",inline"`
Id uuid.UUID
Name string `json:"name" yaml:"name"`

// Count is the current used slots in internal subscriber queue
Count uint64 `json:"count" yaml:"count"`

// From is the previous high threshold value
From uint64 `json:"from" yaml:"from"`

// To is the new high threshold value
To uint64 `json:"to" yaml:"to"`

// Limit is the maximum queue size
Limit uint64 `json:"limit" yaml:"limit"`
}
)

Expand Down