diff --git a/daemon/collector/main.go b/daemon/collector/main.go index 0fa6f8134..f3a35626d 100644 --- a/daemon/collector/main.go +++ b/daemon/collector/main.go @@ -11,6 +11,7 @@ import ( "time" "github.com/opensvc/om3/core/cluster" + "github.com/opensvc/om3/core/clusternode" "github.com/opensvc/om3/core/collector" "github.com/opensvc/om3/core/instance" "github.com/opensvc/om3/core/naming" @@ -110,6 +111,12 @@ type ( // version is the data version version string + + // clusterObject is a map of cluster objects + clusterObject map[string]struct{} + + // clusterNode is a map of cluster nodenames + clusterNode map[string]struct{} } requester interface { @@ -134,14 +141,6 @@ type ( InstanceStatusDeletes []msgbus.InstanceStatusDeleted `json:"instance_status_delete"` } - statusPost struct { - PreviousUpdatedAt time.Time `json:"previous_updated_at"` - UpdatedAt time.Time `json:"updated_at"` - Data *cluster.Data `json:"data"` - Changes []string `json:"changes"` - Version string `json:"version"` - } - // End action: // { // "level":"error", @@ -294,6 +293,9 @@ func (t *T) Stop() error { func (t *T) startSubscriptions() *pubsub.Subscription { sub := t.bus.Sub("daemon.collector", t.subQS) labelLocalhost := pubsub.Label{"node", t.localhost} + + sub.AddFilter(&msgbus.ClusterConfigUpdated{}, labelLocalhost) + sub.AddFilter(&msgbus.InstanceConfigUpdated{}) sub.AddFilter(&msgbus.InstanceConfigDeleted{}) sub.AddFilter(&msgbus.InstanceStatusDeleted{}) @@ -306,6 +308,7 @@ func (t *T) startSubscriptions() *pubsub.Subscription { sub.AddFilter(&msgbus.NodeStatusUpdated{}) sub.AddFilter(&msgbus.ObjectStatusUpdated{}) sub.AddFilter(&msgbus.ObjectStatusDeleted{}) + sub.Start() return sub } @@ -334,6 +337,8 @@ func (t *T) loop() { select { case ev := <-sub.C: switch c := ev.(type) { + case *msgbus.ClusterConfigUpdated: + t.onClusterConfigUpdated(c) case *msgbus.InstanceConfigDeleted: t.onInstanceConfigDeleted(c) case *msgbus.InstanceConfigUpdated: @@ -377,12 +382,15 @@ func (t *T) initChanges() { instanceStatusDeletes: make(map[string]*msgbus.InstanceStatusDeleted), } t.daemonStatusChange = make(map[string]struct{}) + t.clusterObject = make(map[string]struct{}) + t.clusterNode = make(map[string]struct{}) t.nodeFrozenAt = map[string]time.Time{} t.objectConfigToSend = make(map[naming.Path]*msgbus.InstanceConfigUpdated) t.objectConfigSent = make(map[naming.Path]objectConfigSent) for _, v := range object.StatusData.GetAll() { t.daemonStatusChange[v.Path.String()] = struct{}{} + t.clusterObject[v.Path.String()] = struct{}{} } for _, v := range instance.StatusData.GetAll() { @@ -409,6 +417,10 @@ func (t *T) initChanges() { Value: *v.Value, }) } + + for _, nodename := range clusternode.Get() { + t.clusterNode[nodename] = struct{}{} + } } func (t *T) dropChanges() { diff --git a/daemon/collector/on_events.go b/daemon/collector/on_events.go index fedd93afe..8bcb925d2 100644 --- a/daemon/collector/on_events.go +++ b/daemon/collector/on_events.go @@ -32,6 +32,15 @@ func (t *T) onRefreshTicker() { } } +func (t *T) onClusterConfigUpdated(c *msgbus.ClusterConfigUpdated) { + for _, nodename := range c.NodesAdded { + t.clusterNode[nodename] = struct{}{} + } + for _, nodename := range c.NodesRemoved { + delete(t.clusterNode, nodename) + } +} + func (t *T) onConfigUpdated() { t.log.Debugf("reconfigure") if collector.Alive.Load() { @@ -156,8 +165,10 @@ func (t *T) onNodeStatusUpdated(c *msgbus.NodeStatusUpdated) { func (t *T) onObjectStatusDeleted(c *msgbus.ObjectStatusDeleted) { t.daemonStatusChange[c.Path.String()] = struct{}{} + delete(t.clusterObject, c.Path.String()) } func (t *T) onObjectStatusUpdated(c *msgbus.ObjectStatusUpdated) { t.daemonStatusChange[c.Path.String()] = struct{}{} + t.clusterObject[c.Path.String()] = struct{}{} } diff --git a/daemon/collector/post_feed_daemon_status_and_ping.go b/daemon/collector/post_feed_daemon_status_and_ping.go index 73a27a805..2e39cb7a8 100644 --- a/daemon/collector/post_feed_daemon_status_and_ping.go +++ b/daemon/collector/post_feed_daemon_status_and_ping.go @@ -11,12 +11,29 @@ import ( "net/http" "time" + "github.com/opensvc/om3/core/cluster" "github.com/opensvc/om3/core/naming" "github.com/opensvc/om3/daemon/msgbus" "github.com/opensvc/om3/util/xmap" ) type ( + // postFeedDaemonPing describes the POST feed daemon ping payload + postFeedDaemonPing struct { + Nodes []string `json:"nodes"` + objects []string `json:"objects"` + Version string `json:"version"` + } + + // postFeedDaemonStatus describes the POST feed daemon status payload + postFeedDaemonStatus struct { + PreviousUpdatedAt time.Time `json:"previous_updated_at"` + UpdatedAt time.Time `json:"updated_at"` + Data *cluster.Data `json:"data"` + Changes []string `json:"changes"` + Version string `json:"version"` + } + // ObjectWithoutConfig used to decode response of post feed daemon status and ping ObjectWithoutConfig struct { ObjectWithoutConfig *[]string `json:"object_without_config"` @@ -79,17 +96,23 @@ func (t *T) postPing() error { method = http.MethodPost path = "/oc3/feed/daemon/ping" + + ioReader io.Reader ) - instances := make([]string, 0, len(t.instances)) - for k := range t.instances { - instances = append(instances, k) + + body := *t.postPingBody() + if b, err := json.Marshal(body); err != nil { + return fmt.Errorf("post daemon status body: %s", err) + } else { + ioReader = bytes.NewReader(b) } + t.log.Debugf("postFeedDaemonPing: %v", body) now := time.Now() ctx, cancel := context.WithTimeout(t.ctx, defaultPostMaxDuration) defer cancel() - req, err = t.client.NewRequestWithContext(ctx, method, path, nil) + req, err = t.client.NewRequestWithContext(ctx, method, path, ioReader) if err != nil { return fmt.Errorf("%s %s create request: %w", method, path, err) } @@ -126,6 +149,22 @@ func (t *T) postPing() error { } } +func (t *T) postPingBody() *postFeedDaemonPing { + objects := make([]string, 0, len(t.clusterObject)) + for k := range t.clusterObject { + objects = append(objects, k) + } + nodes := make([]string, 0, len(t.clusterNode)) + for k := range t.clusterNode { + nodes = append(nodes, k) + } + return &postFeedDaemonPing{ + Nodes: nodes, + objects: objects, + Version: t.version, + } +} + func (t *T) postChanges() error { if t.client == nil { t.previousUpdatedAt = time.Time{} @@ -199,7 +238,7 @@ func (t *T) postStatus() error { path = "/oc3/feed/daemon/status" ) now := time.Now() - body := statusPost{ + body := postFeedDaemonStatus{ PreviousUpdatedAt: t.previousUpdatedAt, UpdatedAt: now, Data: t.clusterData.ClusterData(), @@ -212,7 +251,7 @@ func (t *T) postStatus() error { if b, err := json.Marshal(body); err != nil { return fmt.Errorf("post daemon status body: %s", err) } else { - ioReader = bytes.NewBuffer(b) + ioReader = bytes.NewReader(b) } ctx, cancel := context.WithTimeout(t.ctx, defaultPostMaxDuration)