diff --git a/auth/host_session.go b/auth/host_session.go index 60cf963dc..0bc7000af 100644 --- a/auth/host_session.go +++ b/auth/host_session.go @@ -253,7 +253,7 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host, relayNodeId uui Action: models.RequestAck, Host: *h, }) - if err := mq.PublishPeerUpdate(); err != nil { + if err := mq.PublishPeerUpdate(false); err != nil { logger.Log(0, "failed to publish peer update during registration -", err.Error()) } } diff --git a/controllers/ext_client.go b/controllers/ext_client.go index bea4a34a3..89a3501c7 100644 --- a/controllers/ext_client.go +++ b/controllers/ext_client.go @@ -422,7 +422,7 @@ func createExtClient(w http.ResponseWriter, r *http.Request) { slog.Info("created extclient", "user", r.Header.Get("user"), "network", node.Network, "clientid", extclient.ClientID) w.WriteHeader(http.StatusOK) go func() { - if err := mq.PublishPeerUpdate(); err != nil { + if err := mq.PublishPeerUpdate(false); err != nil { logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error()) } if servercfg.IsDNSMode() { @@ -510,7 +510,6 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) { return } logger.Log(0, r.Header.Get("user"), "updated ext client", update.ClientID) - w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(newclient) @@ -521,7 +520,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) { if sendPeerUpdate { // need to send a peer update to the ingress node as enablement of one of it's clients has changed ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID) if err == nil { - if err = mq.PublishPeerUpdate(); err != nil { + if err = mq.PublishPeerUpdate(false); err != nil { logger.Log(1, "error setting ext peers on", ingressNode.ID.String(), ":", err.Error()) } } @@ -536,7 +535,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) { slog.Error("Failed to get nodes", "error", err) return } - go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}) + go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false) } } diff --git a/controllers/hosts.go b/controllers/hosts.go index 5ae3cb8a8..588bbb929 100644 --- a/controllers/hosts.go +++ b/controllers/hosts.go @@ -195,7 +195,7 @@ func updateHost(w http.ResponseWriter, r *http.Request) { logger.Log(0, r.Header.Get("user"), "failed to send host update: ", currHost.ID.String(), err.Error()) } go func() { - if err := mq.PublishPeerUpdate(); err != nil { + if err := mq.PublishPeerUpdate(false); err != nil { logger.Log(0, "fail to publish peer update: ", err.Error()) } if newHost.Name != currHost.Name { @@ -356,7 +356,7 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) { Host: *currHost, Node: *newNode, }) - mq.PublishPeerUpdate() + mq.PublishPeerUpdate(false) if servercfg.IsDNSMode() { logic.SetDNS() } diff --git a/controllers/migrate.go b/controllers/migrate.go index e3cf729be..6efa72bb0 100644 --- a/controllers/migrate.go +++ b/controllers/migrate.go @@ -95,7 +95,7 @@ func migrate(w http.ResponseWriter, r *http.Request) { if err := logic.UpsertHost(&host); err != nil { slog.Error("save host", "error", err) } - go mq.PublishPeerUpdate() + go mq.PublishPeerUpdate(false) response := models.HostPull{ Host: host, Nodes: nodes, diff --git a/controllers/network.go b/controllers/network.go index 17a9cbea0..305ac5662 100644 --- a/controllers/network.go +++ b/controllers/network.go @@ -128,7 +128,7 @@ func updateNetworkACL(w http.ResponseWriter, r *http.Request) { // send peer updates go func() { - if err = mq.PublishPeerUpdate(); err != nil { + if err = mq.PublishPeerUpdate(false); err != nil { logger.Log(0, "failed to publish peer update after ACL update on", netname) } }() diff --git a/controllers/node.go b/controllers/node.go index 5a6b065eb..3981dede9 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -436,7 +436,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) { if err := mq.NodeUpdate(&node); err != nil { slog.Error("error publishing node update to node", "node", node.ID, "error", err) } - mq.PublishPeerUpdate() + mq.PublishPeerUpdate(false) }() } @@ -479,7 +479,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) { if err := mq.NodeUpdate(&node); err != nil { slog.Error("error publishing node update to node", "node", node.ID, "error", err) } - mq.PublishPeerUpdate() + mq.PublishPeerUpdate(false) }() } @@ -590,7 +590,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { return } go func() { - if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:]); err != nil { + if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false); err != nil { slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err) } if err := mq.NodeUpdate(&node); err != nil { @@ -667,7 +667,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) { slog.Error("error publishing node update to node", "node", newNode.ID, "error", err) } if aclUpdate || relayupdate || ifaceDelta { - if err := mq.PublishPeerUpdate(); err != nil { + if err := mq.PublishPeerUpdate(false); err != nil { logger.Log(0, "error during node ACL update for node", newNode.ID.String()) } } diff --git a/controllers/user.go b/controllers/user.go index 509125026..9548afb71 100644 --- a/controllers/user.go +++ b/controllers/user.go @@ -138,7 +138,7 @@ func authenticateUser(response http.ResponseWriter, request *http.Request) { } else { // publish peer update to ingress gateway if ingressNode, err := logic.GetNodeByID(newClient.IngressGatewayID); err == nil { - if err = mq.PublishPeerUpdate(); err != nil { + if err = mq.PublishPeerUpdate(false); err != nil { slog.Error("error updating ext clients on", "ingress", ingressNode.ID.String(), "err", err.Error()) } } diff --git a/logic/peers.go b/logic/peers.go index 0a260cf1e..8359c35d5 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -210,7 +210,7 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]] } - if node.Network == network && !peerConfig.Remove { // add to peers map for metrics + if node.Network == network && !peerConfig.Remove && len(peerConfig.AllowedIPs) > 0 { // add to peers map for metrics hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{ ID: peer.ID.String(), HostID: peerHost.ID.String(), diff --git a/models/mqtt.go b/models/mqtt.go index 5dc1927ea..89924acc7 100644 --- a/models/mqtt.go +++ b/models/mqtt.go @@ -19,6 +19,7 @@ type HostPeerUpdate struct { HostNetworkInfo HostInfoMap `json:"host_network_info,omitempty" bson:"host_network_info,omitempty" yaml:"host_network_info,omitempty"` EgressRoutes []EgressNetworkRoutes `json:"egress_network_routes"` FwUpdate FwUpdate `json:"fw_update"` + ReplacePeers bool `json:"replace_peers"` } // IngressInfo - struct for ingress info diff --git a/mq/handlers.go b/mq/handlers.go index 066b6f825..1fcf725fb 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -2,7 +2,6 @@ package mq import ( "encoding/json" - "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/google/uuid" @@ -14,7 +13,6 @@ import ( "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/servercfg" "golang.org/x/exp/slog" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) // UpdateMetrics message Handler -- handles updates from client nodes for metrics @@ -65,10 +63,10 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) { } allNodes, err := logic.GetAllNodes() if err == nil { - PublishSingleHostPeerUpdate(host, allNodes, nil, nil) + PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false) } } else { - err = PublishPeerUpdate() + err = PublishPeerUpdate(false) } if err != nil { slog.Warn("error updating peers when node informed the server of an interface change", "nodeid", currentNode.ID, "error", err) @@ -102,6 +100,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { } slog.Info("recieved host update", "name", hostUpdate.Host.Name, "id", hostUpdate.Host.ID) var sendPeerUpdate bool + var replacePeers bool switch hostUpdate.Action { case models.CheckIn: sendPeerUpdate = HandleHostCheckin(&hostUpdate.Host, currentHost) @@ -122,7 +121,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { if err != nil { return } - if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil); err != nil { + if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false); err != nil { slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err) return } @@ -131,25 +130,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { case models.UpdateHost: if hostUpdate.Host.PublicKey != currentHost.PublicKey { //remove old peer entry - peerUpdate := models.HostPeerUpdate{ - ServerVersion: servercfg.GetVersion(), - Peers: []wgtypes.PeerConfig{ - { - PublicKey: currentHost.PublicKey, - Remove: true, - }, - }, - } - data, err := json.Marshal(&peerUpdate) - if err != nil { - slog.Error("failed to marshal peer update", "error", err) - } - hosts := logic.GetRelatedHosts(hostUpdate.Host.ID.String()) - server := servercfg.GetServer() - for _, host := range hosts { - publish(&host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), server), data) - } - + replacePeers = true } sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost) err := logic.UpsertHost(currentHost) @@ -198,7 +179,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { } if sendPeerUpdate { - err := PublishPeerUpdate() + err := PublishPeerUpdate(replacePeers) if err != nil { slog.Error("failed to publish peer update", "error", err) } @@ -249,7 +230,7 @@ func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) { case ncutils.ACK: // do we still need this case ncutils.DONE: - if err = PublishPeerUpdate(); err != nil { + if err = PublishPeerUpdate(false); err != nil { slog.Error("error publishing peer update for node", "id", currentNode.ID, "error", err) return } diff --git a/mq/publishers.go b/mq/publishers.go index f75de8ad7..e8532fcc0 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -14,7 +14,7 @@ import ( ) // PublishPeerUpdate --- determines and publishes a peer update to all the hosts -func PublishPeerUpdate() error { +func PublishPeerUpdate(replacePeers bool) error { if !servercfg.IsMessageQueueBackend() { return nil } @@ -30,7 +30,7 @@ func PublishPeerUpdate() error { } for _, host := range hosts { host := host - if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil); err != nil { + if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil { logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) } } @@ -55,7 +55,7 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error { } for _, host := range hosts { host := host - if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil); err != nil { + if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false); err != nil { logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) } } @@ -81,7 +81,7 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error { for _, host := range hosts { host := host if host.OS != models.OS_Types.IoT { - if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}); err != nil { + if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false); err != nil { logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) } } @@ -90,12 +90,13 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error { } // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host -func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) error { +func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool) error { peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients) if err != nil { return err } + peerUpdate.ReplacePeers = replacePeers data, err := json.Marshal(&peerUpdate) if err != nil { return err @@ -231,7 +232,7 @@ func sendPeers() { for _, host := range hosts { host := host logger.Log(2, "sending scheduled peer update (5 min)") - if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil); err != nil { + if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil, false); err != nil { logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error()) } } diff --git a/pro/controllers/failover.go b/pro/controllers/failover.go index 9519afc7c..91fa1b6f5 100644 --- a/pro/controllers/failover.go +++ b/pro/controllers/failover.go @@ -70,7 +70,7 @@ func createfailOver(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) return } - go mq.PublishPeerUpdate() + go mq.PublishPeerUpdate(false) w.Header().Set("Content-Type", "application/json") logic.ReturnSuccessResponseWithJson(w, r, node, "created failover successfully") } @@ -90,7 +90,7 @@ func resetFailOver(w http.ResponseWriter, r *http.Request) { logic.UpsertNode(&node) } } - go mq.PublishPeerUpdate() + go mq.PublishPeerUpdate(false) w.Header().Set("Content-Type", "application/json") logic.ReturnSuccessResponse(w, r, "failover has been reset successfully") } @@ -126,7 +126,7 @@ func deletefailOver(w http.ResponseWriter, r *http.Request) { } go func() { proLogic.ResetFailOver(&node) - mq.PublishPeerUpdate() + mq.PublishPeerUpdate(false) }() w.Header().Set("Content-Type", "application/json") logic.ReturnSuccessResponseWithJson(w, r, node, "deleted failover successfully") @@ -193,7 +193,7 @@ func failOverME(w http.ResponseWriter, r *http.Request) { sendPeerUpdate = true if sendPeerUpdate { - go mq.PublishPeerUpdate() + go mq.PublishPeerUpdate(false) } w.Header().Set("Content-Type", "application/json") diff --git a/pro/controllers/relay.go b/pro/controllers/relay.go index c707be7e0..a3bacab6e 100644 --- a/pro/controllers/relay.go +++ b/pro/controllers/relay.go @@ -63,7 +63,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) { } } - go mq.PublishPeerUpdate() + go mq.PublishPeerUpdate(false) logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID) apiNode := relayNode.ConvertToAPINode() w.WriteHeader(http.StatusOK) @@ -108,13 +108,13 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) { return } node.IsRelay = true // for iot update to recognise that it has to delete relay peer - if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil); err != nil { + if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false); err != nil { logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error()) } } } } - mq.PublishPeerUpdate() + mq.PublishPeerUpdate(false) }() logger.Log(1, r.Header.Get("user"), "deleted relay on node", node.ID.String(), "on network", node.Network) apiNode := node.ConvertToAPINode() diff --git a/pro/logic/metrics.go b/pro/logic/metrics.go index b8ee71ac6..c2aa159de 100644 --- a/pro/logic/metrics.go +++ b/pro/logic/metrics.go @@ -89,7 +89,7 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) { if err != nil { return } - if err = mq.PublishSingleHostPeerUpdate(host, nodes, nil, nil); err != nil { + if err = mq.PublishSingleHostPeerUpdate(host, nodes, nil, nil, false); err != nil { slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err) } } diff --git a/pro/remote_access_client.go b/pro/remote_access_client.go index e37361743..b773a388a 100644 --- a/pro/remote_access_client.go +++ b/pro/remote_access_client.go @@ -64,7 +64,7 @@ func disableExtClient(client *models.ExtClient) error { } else { // publish peer update to ingress gateway if ingressNode, err := logic.GetNodeByID(newClient.IngressGatewayID); err == nil { - if err = mq.PublishPeerUpdate(); err != nil { + if err = mq.PublishPeerUpdate(false); err != nil { slog.Error("error updating ext clients on", "ingress", ingressNode.ID.String(), "err", err.Error()) } ingressHost, err := logic.GetHost(ingressNode.HostID.String()) @@ -75,7 +75,7 @@ func disableExtClient(client *models.ExtClient) error { if err != nil { return err } - go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}) + go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false) } else { return err }