Skip to content

Commit

Permalink
Scheduler enhancements
Browse files Browse the repository at this point in the history
* Set localhost label on all subscriptions, to be delivered less
messages. And remove the localhost testing from the event
handlers.

* Listen on InstanceConfigUpdated, and reschedule jobs in case
the schedule definitions changed.
  • Loading branch information
cvaroqui committed Aug 14, 2023
1 parent 420b81c commit a64bf30
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions daemon/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,10 @@ func (t *T) startSubscriptions() *pubsub.Subscription {
bus := pubsub.BusFromContext(t.ctx)
sub := bus.Sub("scheduler")
labelLocalhost := pubsub.Label{"node", hostname.Hostname()}
sub.AddFilter(&msgbus.InstanceConfigUpdated{}, labelLocalhost)
sub.AddFilter(&msgbus.InstanceStatusDeleted{}, labelLocalhost)
sub.AddFilter(&msgbus.ObjectStatusDeleted{})
sub.AddFilter(&msgbus.ObjectStatusDeleted{}, labelLocalhost)
sub.AddFilter(&msgbus.ObjectStatusUpdated{}, labelLocalhost)
sub.AddFilter(&msgbus.NodeConfigUpdated{}, labelLocalhost)
sub.AddFilter(&msgbus.NodeMonitorUpdated{}, labelLocalhost)
sub.Start()
Expand All @@ -253,6 +255,8 @@ func (t *T) loop() {
select {
case ev := <-sub.C:
switch c := ev.(type) {
case *msgbus.InstanceConfigUpdated:
t.onInstConfigUpdated(c)
case *msgbus.InstanceStatusDeleted:
t.onInstStatusDeleted(c)
case *msgbus.NodeMonitorUpdated:
Expand Down Expand Up @@ -297,10 +301,19 @@ func (t *T) onMonObjectStatusUpdated(c *msgbus.ObjectStatusUpdated) {
}
}

func (t *T) onInstConfigUpdated(c *msgbus.InstanceConfigUpdated) {
switch {
case t.enabled:
t.log.Info().Stringer("path", c.Path).Msg("update instance schedules")
t.unschedule(c.Path)
t.scheduleObject(c.Path)
}
}

func (t *T) onNodeConfigUpdated(c *msgbus.NodeConfigUpdated) {
switch {
case t.enabled:
t.log.Info().Msgf("update node schedules")
t.log.Info().Msg("update node schedules")
t.unschedule(path.T{})
t.scheduleNode()
}
Expand Down

0 comments on commit a64bf30

Please sign in to comment.