Skip to content

Commit

Permalink
NET-896: Scale test bug fixes (#2764)
Browse files Browse the repository at this point in the history
* send peer update in async

* update metrics on fallback

* return http json response
  • Loading branch information
abhishek9686 authored Jan 15, 2024
1 parent 718f739 commit 465f2bd
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 50 deletions.
6 changes: 5 additions & 1 deletion controllers/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
currentHost, err := logic.GetHost(hostid)
if err != nil {
slog.Error("error getting host", "id", hostid, "error", err)
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return
}

Expand All @@ -249,10 +250,13 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
err := logic.UpsertHost(currentHost)
if err != nil {
slog.Error("failed to update host", "id", currentHost.ID, "error", err)
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}

case models.UpdateMetrics:
mq.UpdateMetricsFallBack(hostUpdate.Node.ID.String(), hostUpdate.NewMetrics)
}
logic.ReturnSuccessResponse(w, r, "updated host data")

}

Expand Down
11 changes: 7 additions & 4 deletions models/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ const (
UpdateKeys HostMqAction = "UPDATE_KEYS"
// RequestPull - request a pull from a host
RequestPull HostMqAction = "REQ_PULL"
// UpdateMetrics - updates metrics data
UpdateMetrics HostMqAction = "UPDATE_METRICS"
)

// SignalAction - turn peer signal action
Expand All @@ -128,10 +130,11 @@ const (

// HostUpdate - struct for host update
type HostUpdate struct {
Action HostMqAction
Host Host
Node Node
Signal Signal
Action HostMqAction
Host Host
Node Node
Signal Signal
NewMetrics Metrics
}

// HostTurnRegister - struct for host turn registration
Expand Down
9 changes: 4 additions & 5 deletions models/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import (

// Metrics - metrics struct
type Metrics struct {
Network string `json:"network" bson:"network" yaml:"network"`
NodeID string `json:"node_id" bson:"node_id" yaml:"node_id"`
NodeName string `json:"node_name" bson:"node_name" yaml:"node_name"`
Connectivity map[string]Metric `json:"connectivity" bson:"connectivity" yaml:"connectivity"`
FailoverPeers map[string]string `json:"needsfailover" bson:"needsfailover" yaml:"needsfailover"`
Network string `json:"network" bson:"network" yaml:"network"`
NodeID string `json:"node_id" bson:"node_id" yaml:"node_id"`
NodeName string `json:"node_name" bson:"node_name" yaml:"node_name"`
Connectivity map[string]Metric `json:"connectivity" bson:"connectivity" yaml:"connectivity"`
}

// Metric - holds a metric for data between nodes
Expand Down
2 changes: 2 additions & 0 deletions mq/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
var UpdateMetrics = func(client mqtt.Client, msg mqtt.Message) {
}

var UpdateMetricsFallBack = func(nodeid string, newMetrics models.Metrics) {}

// DefaultHandler default message queue handler -- NOT USED
func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
slog.Info("mqtt default handler", "topic", msg.Topic(), "message", msg.Payload())
Expand Down
9 changes: 6 additions & 3 deletions mq/publishers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ func PublishPeerUpdate(replacePeers bool) error {
}
for _, host := range hosts {
host := host
if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
go func(host models.Host) {
if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
}(host)

}
return err
}
Expand Down
1 change: 1 addition & 0 deletions pro/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func InitPro() {
logic.IsInternetGw = proLogic.IsInternetGw
logic.SetInternetGw = proLogic.SetInternetGw
mq.UpdateMetrics = proLogic.MQUpdateMetrics
mq.UpdateMetricsFallBack = proLogic.MQUpdateMetricsFallBack
}

func retrieveProLogo() string {
Expand Down
63 changes: 26 additions & 37 deletions pro/logic/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,28 @@ func DeleteMetrics(nodeid string) error {
return database.DeleteRecord(database.METRICS_TABLE_NAME, nodeid)
}

// MQUpdateMetricsFallBack - called when mq fallback thread is triggered on client
func MQUpdateMetricsFallBack(nodeid string, newMetrics models.Metrics) {

currentNode, err := logic.GetNodeByID(nodeid)
if err != nil {
slog.Error("error getting node", "id", nodeid, "error", err)
return
}

updateNodeMetrics(&currentNode, &newMetrics)
if err = logic.UpdateMetrics(nodeid, &newMetrics); err != nil {
slog.Error("failed to update node metrics", "id", nodeid, "error", err)
return
}
if servercfg.IsMetricsExporter() {
if err := mq.PushMetricsToExporter(newMetrics); err != nil {
slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err)
}
}
slog.Debug("updated node metrics", "id", nodeid)
}

func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) {
id, err := mq.GetID(msg.Topic())
if err != nil {
Expand All @@ -68,9 +90,7 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) {
slog.Error("error unmarshaling payload", "error", err)
return
}

shouldUpdate := updateNodeMetrics(&currentNode, &newMetrics)

updateNodeMetrics(&currentNode, &newMetrics)
if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
slog.Error("failed to update node metrics", "id", id, "error", err)
return
Expand All @@ -80,34 +100,15 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) {
slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err)
}
}

if shouldUpdate {
slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
host, err := logic.GetHost(currentNode.HostID.String())
if err == nil {
nodes, err := logic.GetAllNodes()
if err != nil {
return
}
if err = mq.PublishSingleHostPeerUpdate(host, nodes, nil, nil, false); err != nil {
slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
}
}
}
slog.Debug("updated node metrics", "id", id)
}

func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool {
if newMetrics.FailoverPeers == nil {
newMetrics.FailoverPeers = make(map[string]string)
}
func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) {

oldMetrics, err := logic.GetMetrics(currentNode.ID.String())
if err != nil {
slog.Error("error finding old metrics for node", "id", currentNode.ID, "error", err)
return false
}
if oldMetrics.FailoverPeers == nil {
oldMetrics.FailoverPeers = make(map[string]string)
return
}

var attachedClients []models.ExtClient
Expand Down Expand Up @@ -164,19 +165,7 @@ func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) boo

}

shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0
for k, v := range oldMetrics.FailoverPeers {
if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 {
shouldUpdate = true
}

if len(v) > 0 && len(newMetrics.FailoverPeers[k]) == 0 {
newMetrics.FailoverPeers[k] = v
}
}

for k := range oldMetrics.Connectivity { // cleanup any left over data, self healing
delete(newMetrics.Connectivity, k)
}
return shouldUpdate
}

0 comments on commit 465f2bd

Please sign in to comment.