From 465f2bd5bef354035efcfe9064dd7bf71b7601d0 Mon Sep 17 00:00:00 2001 From: Abhishek K <32607604+abhishek9686@users.noreply.github.com> Date: Mon, 15 Jan 2024 23:17:36 +0530 Subject: [PATCH] NET-896: Scale test bug fixes (#2764) * send peer update in async * update metrics on fallback * return http json response --- controllers/hosts.go | 6 ++++- models/host.go | 11 +++++--- models/metrics.go | 9 +++---- mq/handlers.go | 2 ++ mq/publishers.go | 9 ++++--- pro/initialize.go | 1 + pro/logic/metrics.go | 63 ++++++++++++++++++-------------------------- 7 files changed, 51 insertions(+), 50 deletions(-) diff --git a/controllers/hosts.go b/controllers/hosts.go index 588bbb929..6d10a6f1b 100644 --- a/controllers/hosts.go +++ b/controllers/hosts.go @@ -228,6 +228,7 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) { currentHost, err := logic.GetHost(hostid) if err != nil { slog.Error("error getting host", "id", hostid, "error", err) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) return } @@ -249,10 +250,13 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) { err := logic.UpsertHost(currentHost) if err != nil { slog.Error("failed to update host", "id", currentHost.ID, "error", err) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) return } - + case models.UpdateMetrics: + mq.UpdateMetricsFallBack(hostUpdate.Node.ID.String(), hostUpdate.NewMetrics) } + logic.ReturnSuccessResponse(w, r, "updated host data") } diff --git a/models/host.go b/models/host.go index f44182a61..a608ce54c 100644 --- a/models/host.go +++ b/models/host.go @@ -114,6 +114,8 @@ const ( UpdateKeys HostMqAction = "UPDATE_KEYS" // RequestPull - request a pull from a host RequestPull HostMqAction = "REQ_PULL" + // UpdateMetrics - updates metrics data + UpdateMetrics HostMqAction = "UPDATE_METRICS" ) // SignalAction - turn peer signal action @@ -128,10 +130,11 @@ const ( // HostUpdate - struct for host update type HostUpdate struct { - Action HostMqAction - Host Host - Node Node - Signal Signal + Action HostMqAction + Host Host + Node Node + Signal Signal + NewMetrics Metrics } // HostTurnRegister - struct for host turn registration diff --git a/models/metrics.go b/models/metrics.go index 72a77e1c8..ca041b463 100644 --- a/models/metrics.go +++ b/models/metrics.go @@ -6,11 +6,10 @@ import ( // Metrics - metrics struct type Metrics struct { - Network string `json:"network" bson:"network" yaml:"network"` - NodeID string `json:"node_id" bson:"node_id" yaml:"node_id"` - NodeName string `json:"node_name" bson:"node_name" yaml:"node_name"` - Connectivity map[string]Metric `json:"connectivity" bson:"connectivity" yaml:"connectivity"` - FailoverPeers map[string]string `json:"needsfailover" bson:"needsfailover" yaml:"needsfailover"` + Network string `json:"network" bson:"network" yaml:"network"` + NodeID string `json:"node_id" bson:"node_id" yaml:"node_id"` + NodeName string `json:"node_name" bson:"node_name" yaml:"node_name"` + Connectivity map[string]Metric `json:"connectivity" bson:"connectivity" yaml:"connectivity"` } // Metric - holds a metric for data between nodes diff --git a/mq/handlers.go b/mq/handlers.go index 1fcf725fb..469d0376a 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -19,6 +19,8 @@ import ( var UpdateMetrics = func(client mqtt.Client, msg mqtt.Message) { } +var UpdateMetricsFallBack = func(nodeid string, newMetrics models.Metrics) {} + // DefaultHandler default message queue handler -- NOT USED func DefaultHandler(client mqtt.Client, msg mqtt.Message) { slog.Info("mqtt default handler", "topic", msg.Topic(), "message", msg.Payload()) diff --git a/mq/publishers.go b/mq/publishers.go index e8532fcc0..273776eed 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -30,9 +30,12 @@ func PublishPeerUpdate(replacePeers bool) error { } for _, host := range hosts { host := host - if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil { - logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) - } + go func(host models.Host) { + if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil { + logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) + } + }(host) + } return err } diff --git a/pro/initialize.go b/pro/initialize.go index e073b3792..32c89857c 100644 --- a/pro/initialize.go +++ b/pro/initialize.go @@ -64,6 +64,7 @@ func InitPro() { logic.IsInternetGw = proLogic.IsInternetGw logic.SetInternetGw = proLogic.SetInternetGw mq.UpdateMetrics = proLogic.MQUpdateMetrics + mq.UpdateMetricsFallBack = proLogic.MQUpdateMetricsFallBack } func retrieveProLogo() string { diff --git a/pro/logic/metrics.go b/pro/logic/metrics.go index c2aa159de..77abdc872 100644 --- a/pro/logic/metrics.go +++ b/pro/logic/metrics.go @@ -46,6 +46,28 @@ func DeleteMetrics(nodeid string) error { return database.DeleteRecord(database.METRICS_TABLE_NAME, nodeid) } +// MQUpdateMetricsFallBack - called when mq fallback thread is triggered on client +func MQUpdateMetricsFallBack(nodeid string, newMetrics models.Metrics) { + + currentNode, err := logic.GetNodeByID(nodeid) + if err != nil { + slog.Error("error getting node", "id", nodeid, "error", err) + return + } + + updateNodeMetrics(¤tNode, &newMetrics) + if err = logic.UpdateMetrics(nodeid, &newMetrics); err != nil { + slog.Error("failed to update node metrics", "id", nodeid, "error", err) + return + } + if servercfg.IsMetricsExporter() { + if err := mq.PushMetricsToExporter(newMetrics); err != nil { + slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err) + } + } + slog.Debug("updated node metrics", "id", nodeid) +} + func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) { id, err := mq.GetID(msg.Topic()) if err != nil { @@ -68,9 +90,7 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) { slog.Error("error unmarshaling payload", "error", err) return } - - shouldUpdate := updateNodeMetrics(¤tNode, &newMetrics) - + updateNodeMetrics(¤tNode, &newMetrics) if err = logic.UpdateMetrics(id, &newMetrics); err != nil { slog.Error("failed to update node metrics", "id", id, "error", err) return @@ -80,34 +100,15 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) { slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err) } } - - if shouldUpdate { - slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network) - host, err := logic.GetHost(currentNode.HostID.String()) - if err == nil { - nodes, err := logic.GetAllNodes() - if err != nil { - return - } - if err = mq.PublishSingleHostPeerUpdate(host, nodes, nil, nil, false); err != nil { - slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err) - } - } - } slog.Debug("updated node metrics", "id", id) } -func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool { - if newMetrics.FailoverPeers == nil { - newMetrics.FailoverPeers = make(map[string]string) - } +func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) { + oldMetrics, err := logic.GetMetrics(currentNode.ID.String()) if err != nil { slog.Error("error finding old metrics for node", "id", currentNode.ID, "error", err) - return false - } - if oldMetrics.FailoverPeers == nil { - oldMetrics.FailoverPeers = make(map[string]string) + return } var attachedClients []models.ExtClient @@ -164,19 +165,7 @@ func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) boo } - shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0 - for k, v := range oldMetrics.FailoverPeers { - if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 { - shouldUpdate = true - } - - if len(v) > 0 && len(newMetrics.FailoverPeers[k]) == 0 { - newMetrics.FailoverPeers[k] = v - } - } - for k := range oldMetrics.Connectivity { // cleanup any left over data, self healing delete(newMetrics.Connectivity, k) } - return shouldUpdate }