Skip to content

Commit

Permalink
Reconfigure the scheduler node tasks on NodeConfigUpdated
Browse files Browse the repository at this point in the history
  • Loading branch information
cvaroqui committed Aug 14, 2023
1 parent 737def7 commit 420b81c
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions daemon/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,11 @@ func (t *T) MainStop() error {
func (t *T) startSubscriptions() *pubsub.Subscription {
bus := pubsub.BusFromContext(t.ctx)
sub := bus.Sub("scheduler")
sub.AddFilter(&msgbus.InstanceStatusDeleted{})
labelLocalhost := pubsub.Label{"node", hostname.Hostname()}
sub.AddFilter(&msgbus.InstanceStatusDeleted{}, labelLocalhost)
sub.AddFilter(&msgbus.ObjectStatusDeleted{})
sub.AddFilter(&msgbus.NodeMonitorUpdated{})
sub.AddFilter(&msgbus.NodeConfigUpdated{}, labelLocalhost)
sub.AddFilter(&msgbus.NodeMonitorUpdated{}, labelLocalhost)
sub.Start()
return sub
}
Expand All @@ -255,6 +257,8 @@ func (t *T) loop() {
t.onInstStatusDeleted(c)
case *msgbus.NodeMonitorUpdated:
t.onNodeMonitorUpdated(c)
case *msgbus.NodeConfigUpdated:
t.onNodeConfigUpdated(c)
case *msgbus.ObjectStatusUpdated:
t.onMonObjectStatusUpdated(c)
}
Expand All @@ -276,10 +280,6 @@ func (t *T) loop() {
}

func (t *T) onInstStatusDeleted(c *msgbus.InstanceStatusDeleted) {
if c.Node != hostname.Hostname() {
// discard peer node events
return
}
t.log.Info().Stringer("path", c.Path).Msgf("unschedule (instance deleted)")
t.unschedule(c.Path)
}
Expand All @@ -297,11 +297,16 @@ func (t *T) onMonObjectStatusUpdated(c *msgbus.ObjectStatusUpdated) {
}
}

func (t *T) onNodeMonitorUpdated(c *msgbus.NodeMonitorUpdated) {
if c.Node != hostname.Hostname() {
// discard peer node events
return
func (t *T) onNodeConfigUpdated(c *msgbus.NodeConfigUpdated) {
switch {
case t.enabled:
t.log.Info().Msgf("update node schedules")
t.unschedule(path.T{})
t.scheduleNode()
}
}

func (t *T) onNodeMonitorUpdated(c *msgbus.NodeMonitorUpdated) {
_, incompatible := incompatibleNodeMonitorStatus[c.Value.State]
switch {
case incompatible && t.enabled:
Expand Down

0 comments on commit 420b81c

Please sign in to comment.