Skip to content

Commit

Permalink
[collector] POST /oc3/feed/daemon/ping with cluster nodes and objects
Browse files Browse the repository at this point in the history
Oc3 use it to know alive objects and instances
  • Loading branch information
cgalibern committed Aug 26, 2024
1 parent 3275386 commit da31663
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 14 deletions.
28 changes: 20 additions & 8 deletions daemon/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/opensvc/om3/core/cluster"
"github.com/opensvc/om3/core/clusternode"
"github.com/opensvc/om3/core/collector"
"github.com/opensvc/om3/core/instance"
"github.com/opensvc/om3/core/naming"
Expand Down Expand Up @@ -110,6 +111,12 @@ type (

// version is the data version
version string

// clusterObject is a map of cluster objects
clusterObject map[string]struct{}

// clusterNode is a map of cluster nodenames
clusterNode map[string]struct{}
}

requester interface {
Expand All @@ -134,14 +141,6 @@ type (
InstanceStatusDeletes []msgbus.InstanceStatusDeleted `json:"instance_status_delete"`
}

statusPost struct {
PreviousUpdatedAt time.Time `json:"previous_updated_at"`
UpdatedAt time.Time `json:"updated_at"`
Data *cluster.Data `json:"data"`
Changes []string `json:"changes"`
Version string `json:"version"`
}

// End action:
// {
// "level":"error",
Expand Down Expand Up @@ -294,6 +293,9 @@ func (t *T) Stop() error {
func (t *T) startSubscriptions() *pubsub.Subscription {
sub := t.bus.Sub("daemon.collector", t.subQS)
labelLocalhost := pubsub.Label{"node", t.localhost}

sub.AddFilter(&msgbus.ClusterConfigUpdated{}, labelLocalhost)

sub.AddFilter(&msgbus.InstanceConfigUpdated{})
sub.AddFilter(&msgbus.InstanceConfigDeleted{})
sub.AddFilter(&msgbus.InstanceStatusDeleted{})
Expand All @@ -306,6 +308,7 @@ func (t *T) startSubscriptions() *pubsub.Subscription {
sub.AddFilter(&msgbus.NodeStatusUpdated{})
sub.AddFilter(&msgbus.ObjectStatusUpdated{})
sub.AddFilter(&msgbus.ObjectStatusDeleted{})

sub.Start()
return sub
}
Expand Down Expand Up @@ -334,6 +337,8 @@ func (t *T) loop() {
select {
case ev := <-sub.C:
switch c := ev.(type) {
case *msgbus.ClusterConfigUpdated:
t.onClusterConfigUpdated(c)
case *msgbus.InstanceConfigDeleted:
t.onInstanceConfigDeleted(c)
case *msgbus.InstanceConfigUpdated:
Expand Down Expand Up @@ -377,12 +382,15 @@ func (t *T) initChanges() {
instanceStatusDeletes: make(map[string]*msgbus.InstanceStatusDeleted),
}
t.daemonStatusChange = make(map[string]struct{})
t.clusterObject = make(map[string]struct{})
t.clusterNode = make(map[string]struct{})
t.nodeFrozenAt = map[string]time.Time{}
t.objectConfigToSend = make(map[naming.Path]*msgbus.InstanceConfigUpdated)
t.objectConfigSent = make(map[naming.Path]objectConfigSent)

for _, v := range object.StatusData.GetAll() {
t.daemonStatusChange[v.Path.String()] = struct{}{}
t.clusterObject[v.Path.String()] = struct{}{}
}

for _, v := range instance.StatusData.GetAll() {
Expand All @@ -409,6 +417,10 @@ func (t *T) initChanges() {
Value: *v.Value,
})
}

for _, nodename := range clusternode.Get() {
t.clusterNode[nodename] = struct{}{}
}
}

func (t *T) dropChanges() {
Expand Down
11 changes: 11 additions & 0 deletions daemon/collector/on_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ func (t *T) onRefreshTicker() {
}
}

func (t *T) onClusterConfigUpdated(c *msgbus.ClusterConfigUpdated) {
for _, nodename := range c.NodesAdded {
t.clusterNode[nodename] = struct{}{}
}
for _, nodename := range c.NodesRemoved {
delete(t.clusterNode, nodename)
}
}

func (t *T) onConfigUpdated() {
t.log.Debugf("reconfigure")
if collector.Alive.Load() {
Expand Down Expand Up @@ -156,8 +165,10 @@ func (t *T) onNodeStatusUpdated(c *msgbus.NodeStatusUpdated) {

func (t *T) onObjectStatusDeleted(c *msgbus.ObjectStatusDeleted) {
t.daemonStatusChange[c.Path.String()] = struct{}{}
delete(t.clusterObject, c.Path.String())
}

func (t *T) onObjectStatusUpdated(c *msgbus.ObjectStatusUpdated) {
t.daemonStatusChange[c.Path.String()] = struct{}{}
t.clusterObject[c.Path.String()] = struct{}{}
}
47 changes: 41 additions & 6 deletions daemon/collector/post_feed_daemon_status_and_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,29 @@ import (
"net/http"
"time"

"github.com/opensvc/om3/core/cluster"
"github.com/opensvc/om3/core/naming"
"github.com/opensvc/om3/daemon/msgbus"
"github.com/opensvc/om3/util/xmap"
)

type (
// postFeedDaemonPing describes the POST feed daemon ping payload
postFeedDaemonPing struct {
Nodes []string `json:"nodes"`
objects []string `json:"objects"`
Version string `json:"version"`
}

// postFeedDaemonStatus describes the POST feed daemon status payload
postFeedDaemonStatus struct {
PreviousUpdatedAt time.Time `json:"previous_updated_at"`
UpdatedAt time.Time `json:"updated_at"`
Data *cluster.Data `json:"data"`
Changes []string `json:"changes"`
Version string `json:"version"`
}

// ObjectWithoutConfig used to decode response of post feed daemon status and ping
ObjectWithoutConfig struct {
ObjectWithoutConfig *[]string `json:"object_without_config"`
Expand Down Expand Up @@ -79,17 +96,35 @@ func (t *T) postPing() error {

method = http.MethodPost
path = "/oc3/feed/daemon/ping"

ioReader io.Reader
)
instances := make([]string, 0, len(t.instances))
for k := range t.instances {
instances = append(instances, k)

objects := make([]string, 0, len(t.clusterObject))
for k := range t.clusterObject {
objects = append(objects, k)
}
nodes := make([]string, 0, len(t.clusterNode))
for k := range t.clusterNode {
nodes = append(nodes, k)
}
body := postFeedDaemonPing{
Nodes: nodes,
objects: objects,
Version: t.version,
}
if b, err := json.Marshal(body); err != nil {
return fmt.Errorf("post daemon status body: %s", err)
} else {
ioReader = bytes.NewReader(b)
}
t.log.Debugf("postFeedDaemonPing: %v", body)
now := time.Now()

ctx, cancel := context.WithTimeout(t.ctx, defaultPostMaxDuration)
defer cancel()

req, err = t.client.NewRequestWithContext(ctx, method, path, nil)
req, err = t.client.NewRequestWithContext(ctx, method, path, ioReader)
if err != nil {
return fmt.Errorf("%s %s create request: %w", method, path, err)
}
Expand Down Expand Up @@ -199,7 +234,7 @@ func (t *T) postStatus() error {
path = "/oc3/feed/daemon/status"
)
now := time.Now()
body := statusPost{
body := postFeedDaemonStatus{
PreviousUpdatedAt: t.previousUpdatedAt,
UpdatedAt: now,
Data: t.clusterData.ClusterData(),
Expand All @@ -212,7 +247,7 @@ func (t *T) postStatus() error {
if b, err := json.Marshal(body); err != nil {
return fmt.Errorf("post daemon status body: %s", err)
} else {
ioReader = bytes.NewBuffer(b)
ioReader = bytes.NewReader(b)
}

ctx, cancel := context.WithTimeout(t.ctx, defaultPostMaxDuration)
Expand Down

0 comments on commit da31663

Please sign in to comment.