Skip to content

Commit

Permalink
NET-877: Replace peers on Refreshkeys peer update (#2761)
Browse files Browse the repository at this point in the history
* replace peers on key refresh

* add peer conf to metrics map only when allowed
  • Loading branch information
abhishek9686 authored Jan 11, 2024
1 parent 015a628 commit 5bf30b2
Show file tree
Hide file tree
Showing 15 changed files with 39 additions and 57 deletions.
2 changes: 1 addition & 1 deletion auth/host_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host, relayNodeId uui
Action: models.RequestAck,
Host: *h,
})
if err := mq.PublishPeerUpdate(); err != nil {
if err := mq.PublishPeerUpdate(false); err != nil {
logger.Log(0, "failed to publish peer update during registration -", err.Error())
}
}
Expand Down
7 changes: 3 additions & 4 deletions controllers/ext_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
slog.Info("created extclient", "user", r.Header.Get("user"), "network", node.Network, "clientid", extclient.ClientID)
w.WriteHeader(http.StatusOK)
go func() {
if err := mq.PublishPeerUpdate(); err != nil {
if err := mq.PublishPeerUpdate(false); err != nil {
logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
}
if servercfg.IsDNSMode() {
Expand Down Expand Up @@ -510,7 +510,6 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
return
}
logger.Log(0, r.Header.Get("user"), "updated ext client", update.ClientID)

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(newclient)

Expand All @@ -521,7 +520,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
if sendPeerUpdate { // need to send a peer update to the ingress node as enablement of one of it's clients has changed
ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID)
if err == nil {
if err = mq.PublishPeerUpdate(); err != nil {
if err = mq.PublishPeerUpdate(false); err != nil {
logger.Log(1, "error setting ext peers on", ingressNode.ID.String(), ":", err.Error())
}
}
Expand All @@ -536,7 +535,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
slog.Error("Failed to get nodes", "error", err)
return
}
go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient})
go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false)
}
}

Expand Down
4 changes: 2 additions & 2 deletions controllers/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func updateHost(w http.ResponseWriter, r *http.Request) {
logger.Log(0, r.Header.Get("user"), "failed to send host update: ", currHost.ID.String(), err.Error())
}
go func() {
if err := mq.PublishPeerUpdate(); err != nil {
if err := mq.PublishPeerUpdate(false); err != nil {
logger.Log(0, "fail to publish peer update: ", err.Error())
}
if newHost.Name != currHost.Name {
Expand Down Expand Up @@ -356,7 +356,7 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) {
Host: *currHost,
Node: *newNode,
})
mq.PublishPeerUpdate()
mq.PublishPeerUpdate(false)
if servercfg.IsDNSMode() {
logic.SetDNS()
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func migrate(w http.ResponseWriter, r *http.Request) {
if err := logic.UpsertHost(&host); err != nil {
slog.Error("save host", "error", err)
}
go mq.PublishPeerUpdate()
go mq.PublishPeerUpdate(false)
response := models.HostPull{
Host: host,
Nodes: nodes,
Expand Down
2 changes: 1 addition & 1 deletion controllers/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func updateNetworkACL(w http.ResponseWriter, r *http.Request) {

// send peer updates
go func() {
if err = mq.PublishPeerUpdate(); err != nil {
if err = mq.PublishPeerUpdate(false); err != nil {
logger.Log(0, "failed to publish peer update after ACL update on", netname)
}
}()
Expand Down
8 changes: 4 additions & 4 deletions controllers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
if err := mq.NodeUpdate(&node); err != nil {
slog.Error("error publishing node update to node", "node", node.ID, "error", err)
}
mq.PublishPeerUpdate()
mq.PublishPeerUpdate(false)
}()
}

Expand Down Expand Up @@ -479,7 +479,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
if err := mq.NodeUpdate(&node); err != nil {
slog.Error("error publishing node update to node", "node", node.ID, "error", err)
}
mq.PublishPeerUpdate()
mq.PublishPeerUpdate(false)
}()
}

Expand Down Expand Up @@ -590,7 +590,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
return
}
go func() {
if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:]); err != nil {
if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false); err != nil {
slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err)
}
if err := mq.NodeUpdate(&node); err != nil {
Expand Down Expand Up @@ -667,7 +667,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
slog.Error("error publishing node update to node", "node", newNode.ID, "error", err)
}
if aclUpdate || relayupdate || ifaceDelta {
if err := mq.PublishPeerUpdate(); err != nil {
if err := mq.PublishPeerUpdate(false); err != nil {
logger.Log(0, "error during node ACL update for node", newNode.ID.String())
}
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func authenticateUser(response http.ResponseWriter, request *http.Request) {
} else {
// publish peer update to ingress gateway
if ingressNode, err := logic.GetNodeByID(newClient.IngressGatewayID); err == nil {
if err = mq.PublishPeerUpdate(); err != nil {
if err = mq.PublishPeerUpdate(false); err != nil {
slog.Error("error updating ext clients on", "ingress", ingressNode.ID.String(), "err", err.Error())
}
}
Expand Down
2 changes: 1 addition & 1 deletion logic/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N
nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]]
}

if node.Network == network && !peerConfig.Remove { // add to peers map for metrics
if node.Network == network && !peerConfig.Remove && len(peerConfig.AllowedIPs) > 0 { // add to peers map for metrics
hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{
ID: peer.ID.String(),
HostID: peerHost.ID.String(),
Expand Down
1 change: 1 addition & 0 deletions models/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type HostPeerUpdate struct {
HostNetworkInfo HostInfoMap `json:"host_network_info,omitempty" bson:"host_network_info,omitempty" yaml:"host_network_info,omitempty"`
EgressRoutes []EgressNetworkRoutes `json:"egress_network_routes"`
FwUpdate FwUpdate `json:"fw_update"`
ReplacePeers bool `json:"replace_peers"`
}

// IngressInfo - struct for ingress info
Expand Down
33 changes: 7 additions & 26 deletions mq/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mq

import (
"encoding/json"
"fmt"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
Expand All @@ -14,7 +13,6 @@ import (
"github.com/gravitl/netmaker/netclient/ncutils"
"github.com/gravitl/netmaker/servercfg"
"golang.org/x/exp/slog"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)

// UpdateMetrics message Handler -- handles updates from client nodes for metrics
Expand Down Expand Up @@ -65,10 +63,10 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
}
allNodes, err := logic.GetAllNodes()
if err == nil {
PublishSingleHostPeerUpdate(host, allNodes, nil, nil)
PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false)
}
} else {
err = PublishPeerUpdate()
err = PublishPeerUpdate(false)
}
if err != nil {
slog.Warn("error updating peers when node informed the server of an interface change", "nodeid", currentNode.ID, "error", err)
Expand Down Expand Up @@ -102,6 +100,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
}
slog.Info("recieved host update", "name", hostUpdate.Host.Name, "id", hostUpdate.Host.ID)
var sendPeerUpdate bool
var replacePeers bool
switch hostUpdate.Action {
case models.CheckIn:
sendPeerUpdate = HandleHostCheckin(&hostUpdate.Host, currentHost)
Expand All @@ -122,7 +121,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
if err != nil {
return
}
if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil); err != nil {
if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false); err != nil {
slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
return
}
Expand All @@ -131,25 +130,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
case models.UpdateHost:
if hostUpdate.Host.PublicKey != currentHost.PublicKey {
//remove old peer entry
peerUpdate := models.HostPeerUpdate{
ServerVersion: servercfg.GetVersion(),
Peers: []wgtypes.PeerConfig{
{
PublicKey: currentHost.PublicKey,
Remove: true,
},
},
}
data, err := json.Marshal(&peerUpdate)
if err != nil {
slog.Error("failed to marshal peer update", "error", err)
}
hosts := logic.GetRelatedHosts(hostUpdate.Host.ID.String())
server := servercfg.GetServer()
for _, host := range hosts {
publish(&host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), server), data)
}

replacePeers = true
}
sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
err := logic.UpsertHost(currentHost)
Expand Down Expand Up @@ -198,7 +179,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
}

if sendPeerUpdate {
err := PublishPeerUpdate()
err := PublishPeerUpdate(replacePeers)
if err != nil {
slog.Error("failed to publish peer update", "error", err)
}
Expand Down Expand Up @@ -249,7 +230,7 @@ func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
case ncutils.ACK:
// do we still need this
case ncutils.DONE:
if err = PublishPeerUpdate(); err != nil {
if err = PublishPeerUpdate(false); err != nil {
slog.Error("error publishing peer update for node", "id", currentNode.ID, "error", err)
return
}
Expand Down
13 changes: 7 additions & 6 deletions mq/publishers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

// PublishPeerUpdate --- determines and publishes a peer update to all the hosts
func PublishPeerUpdate() error {
func PublishPeerUpdate(replacePeers bool) error {
if !servercfg.IsMessageQueueBackend() {
return nil
}
Expand All @@ -30,7 +30,7 @@ func PublishPeerUpdate() error {
}
for _, host := range hosts {
host := host
if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil); err != nil {
if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
}
Expand All @@ -55,7 +55,7 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
}
for _, host := range hosts {
host := host
if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil); err != nil {
if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
}
Expand All @@ -81,7 +81,7 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
for _, host := range hosts {
host := host
if host.OS != models.OS_Types.IoT {
if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}); err != nil {
if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
}
Expand All @@ -90,12 +90,13 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
}

// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) error {
func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool) error {

peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
if err != nil {
return err
}
peerUpdate.ReplacePeers = replacePeers
data, err := json.Marshal(&peerUpdate)
if err != nil {
return err
Expand Down Expand Up @@ -231,7 +232,7 @@ func sendPeers() {
for _, host := range hosts {
host := host
logger.Log(2, "sending scheduled peer update (5 min)")
if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil); err != nil {
if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil, false); err != nil {
logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
}
}
Expand Down
8 changes: 4 additions & 4 deletions pro/controllers/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func createfailOver(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}
go mq.PublishPeerUpdate()
go mq.PublishPeerUpdate(false)
w.Header().Set("Content-Type", "application/json")
logic.ReturnSuccessResponseWithJson(w, r, node, "created failover successfully")
}
Expand All @@ -90,7 +90,7 @@ func resetFailOver(w http.ResponseWriter, r *http.Request) {
logic.UpsertNode(&node)
}
}
go mq.PublishPeerUpdate()
go mq.PublishPeerUpdate(false)
w.Header().Set("Content-Type", "application/json")
logic.ReturnSuccessResponse(w, r, "failover has been reset successfully")
}
Expand Down Expand Up @@ -126,7 +126,7 @@ func deletefailOver(w http.ResponseWriter, r *http.Request) {
}
go func() {
proLogic.ResetFailOver(&node)
mq.PublishPeerUpdate()
mq.PublishPeerUpdate(false)
}()
w.Header().Set("Content-Type", "application/json")
logic.ReturnSuccessResponseWithJson(w, r, node, "deleted failover successfully")
Expand Down Expand Up @@ -193,7 +193,7 @@ func failOverME(w http.ResponseWriter, r *http.Request) {
sendPeerUpdate = true

if sendPeerUpdate {
go mq.PublishPeerUpdate()
go mq.PublishPeerUpdate(false)
}

w.Header().Set("Content-Type", "application/json")
Expand Down
6 changes: 3 additions & 3 deletions pro/controllers/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) {

}
}
go mq.PublishPeerUpdate()
go mq.PublishPeerUpdate(false)
logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID)
apiNode := relayNode.ConvertToAPINode()
w.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -108,13 +108,13 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
return
}
node.IsRelay = true // for iot update to recognise that it has to delete relay peer
if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil); err != nil {
if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false); err != nil {
logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
}
}
}
}
mq.PublishPeerUpdate()
mq.PublishPeerUpdate(false)
}()
logger.Log(1, r.Header.Get("user"), "deleted relay on node", node.ID.String(), "on network", node.Network)
apiNode := node.ConvertToAPINode()
Expand Down
2 changes: 1 addition & 1 deletion pro/logic/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) {
if err != nil {
return
}
if err = mq.PublishSingleHostPeerUpdate(host, nodes, nil, nil); err != nil {
if err = mq.PublishSingleHostPeerUpdate(host, nodes, nil, nil, false); err != nil {
slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pro/remote_access_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func disableExtClient(client *models.ExtClient) error {
} else {
// publish peer update to ingress gateway
if ingressNode, err := logic.GetNodeByID(newClient.IngressGatewayID); err == nil {
if err = mq.PublishPeerUpdate(); err != nil {
if err = mq.PublishPeerUpdate(false); err != nil {
slog.Error("error updating ext clients on", "ingress", ingressNode.ID.String(), "err", err.Error())
}
ingressHost, err := logic.GetHost(ingressNode.HostID.String())
Expand All @@ -75,7 +75,7 @@ func disableExtClient(client *models.ExtClient) error {
if err != nil {
return err
}
go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client})
go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false)
} else {
return err
}
Expand Down

0 comments on commit 5bf30b2

Please sign in to comment.