diff --git a/controllers/acls.go b/controllers/acls.go index 727811fb5..f5bcad851 100644 --- a/controllers/acls.go +++ b/controllers/acls.go @@ -69,7 +69,7 @@ func aclDebug(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) return } - allowed := logic.IsNodeAllowedToCommunicate(node, peer) + allowed := logic.IsNodeAllowedToCommunicate(node, peer, true) logic.ReturnSuccessResponseWithJson(w, r, allowed, "fetched all acls in the network ") } diff --git a/controllers/hosts.go b/controllers/hosts.go index a1015cf95..e9c8d1a8a 100644 --- a/controllers/hosts.go +++ b/controllers/hosts.go @@ -319,6 +319,9 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) { switch hostUpdate.Action { case models.CheckIn: sendPeerUpdate = mq.HandleHostCheckin(&hostUpdate.Host, currentHost) + if sendPeerUpdate { + slog.Error("sendPeerUpdate from CheckIn", "Debug", sendPeerUpdate, &hostUpdate.Host.Name, &hostUpdate.Host.ID) + } case models.UpdateHost: if hostUpdate.Host.PublicKey != currentHost.PublicKey { @@ -326,6 +329,9 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) { replacePeers = true } sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost) + if sendPeerUpdate { + slog.Error("sendPeerUpdate from UpdateHost", "Debug", sendPeerUpdate, &hostUpdate.Host.Name, &hostUpdate.Host.ID) + } err := logic.UpsertHost(currentHost) if err != nil { slog.Error("failed to update host", "id", currentHost.ID, "error", err) diff --git a/logic/acls.go b/logic/acls.go index 8863ce86d..8292792ae 100644 --- a/logic/acls.go +++ b/logic/acls.go @@ -346,14 +346,20 @@ func GetDefaultPolicy(netID models.NetworkID, ruleType models.AclPolicyType) (mo return acl, nil } // check if there are any custom all policies + srcMap := make(map[string]struct{}) + dstMap := make(map[string]struct{}) + defer func() { + srcMap = nil + dstMap = nil + }() policies, _ := ListAcls(netID) for _, policy := range policies { if !policy.Enabled { continue } if policy.RuleType == ruleType { - dstMap := convAclTagToValueMap(policy.Dst) - srcMap := convAclTagToValueMap(policy.Src) + dstMap = convAclTagToValueMap(policy.Dst) + srcMap = convAclTagToValueMap(policy.Src) if _, ok := srcMap["*"]; ok { if _, ok := dstMap["*"]; ok { return policy, nil @@ -511,29 +517,37 @@ func IsUserAllowedToCommunicate(userName string, peer models.Node) bool { } // IsNodeAllowedToCommunicate - check node is allowed to communicate with the peer -func IsNodeAllowedToCommunicate(node, peer models.Node) bool { +func IsNodeAllowedToCommunicate(node, peer models.Node, checkDefaultPolicy bool) bool { if node.IsStatic { node = node.StaticNode.ConvertToStaticNode() } if peer.IsStatic { peer = peer.StaticNode.ConvertToStaticNode() } - // check default policy if all allowed return true - defaultPolicy, err := GetDefaultPolicy(models.NetworkID(node.Network), models.DevicePolicy) - if err == nil { - if defaultPolicy.Enabled { - return true + if checkDefaultPolicy { + // check default policy if all allowed return true + defaultPolicy, err := GetDefaultPolicy(models.NetworkID(node.Network), models.DevicePolicy) + if err == nil { + if defaultPolicy.Enabled { + return true + } } } // list device policies policies := listDevicePolicies(models.NetworkID(peer.Network)) + srcMap := make(map[string]struct{}) + dstMap := make(map[string]struct{}) + defer func() { + srcMap = nil + dstMap = nil + }() for _, policy := range policies { if !policy.Enabled { continue } - srcMap := convAclTagToValueMap(policy.Src) - dstMap := convAclTagToValueMap(policy.Dst) + srcMap = convAclTagToValueMap(policy.Src) + dstMap = convAclTagToValueMap(policy.Dst) // fmt.Printf("\n======> SRCMAP: %+v\n", srcMap) // fmt.Printf("\n======> DSTMAP: %+v\n", dstMap) // fmt.Printf("\n======> node Tags: %+v\n", node.Tags) diff --git a/logic/acls/nodeacls/retrieve.go b/logic/acls/nodeacls/retrieve.go index 4411c5b22..fb0849119 100644 --- a/logic/acls/nodeacls/retrieve.go +++ b/logic/acls/nodeacls/retrieve.go @@ -13,19 +13,20 @@ var NodesAllowedACLMutex = &sync.Mutex{} // AreNodesAllowed - checks if nodes are allowed to communicate in their network ACL func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool { - NodesAllowedACLMutex.Lock() - defer NodesAllowedACLMutex.Unlock() - var currentNetworkACL, err = FetchAllACLs(networkID) - if err != nil { - return false - } - var allowed bool - acls.AclMutex.Lock() - currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)] - currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)] - acls.AclMutex.Unlock() - allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1)) - return allowed + return true + // NodesAllowedACLMutex.Lock() + // defer NodesAllowedACLMutex.Unlock() + // var currentNetworkACL, err = FetchAllACLs(networkID) + // if err != nil { + // return false + // } + // var allowed bool + // acls.AclMutex.Lock() + // currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)] + // currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)] + // acls.AclMutex.Unlock() + // allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1)) + // return allowed } // FetchNodeACL - fetches a specific node's ACL in a given network diff --git a/logic/extpeers.go b/logic/extpeers.go index c03a0efa7..6cb352231 100644 --- a/logic/extpeers.go +++ b/logic/extpeers.go @@ -564,7 +564,7 @@ func GetFwRulesOnIngressGateway(node models.Node) (rules []models.FwRule) { if peer.StaticNode.ClientID == nodeI.StaticNode.ClientID || peer.IsUserNode { continue } - if IsNodeAllowedToCommunicate(nodeI, peer) { + if IsNodeAllowedToCommunicate(nodeI, peer, true) { if peer.IsStatic { if nodeI.StaticNode.Address != "" { rules = append(rules, models.FwRule{ @@ -650,7 +650,7 @@ func GetExtPeers(node, peer *models.Node) ([]wgtypes.PeerConfig, []models.IDandA continue } if extPeer.RemoteAccessClientID == "" { - if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), *peer) { + if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), *peer, true) { continue } } else { @@ -739,7 +739,7 @@ func getExtpeerEgressRanges(node models.Node) (ranges, ranges6 []net.IPNet) { if len(extPeer.ExtraAllowedIPs) == 0 { continue } - if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node) { + if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node, true) { continue } for _, allowedRange := range extPeer.ExtraAllowedIPs { @@ -766,7 +766,7 @@ func getExtpeersExtraRoutes(node models.Node) (egressRoutes []models.EgressNetwo if len(extPeer.ExtraAllowedIPs) == 0 { continue } - if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node) { + if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node, true) { continue } egressRoutes = append(egressRoutes, getExtPeerEgressRoute(node, extPeer)...) diff --git a/logic/hosts.go b/logic/hosts.go index 8fd125c52..c182e68ba 100644 --- a/logic/hosts.go +++ b/logic/hosts.go @@ -243,25 +243,30 @@ func UpdateHost(newHost, currentHost *models.Host) { // UpdateHostFromClient - used for updating host on server with update recieved from client func UpdateHostFromClient(newHost, currHost *models.Host) (sendPeerUpdate bool) { if newHost.PublicKey != currHost.PublicKey { + slog.Error("PublicKey changed:", "Debug") currHost.PublicKey = newHost.PublicKey sendPeerUpdate = true } if newHost.ListenPort != 0 && currHost.ListenPort != newHost.ListenPort { + slog.Error("ListenPort changed:", "Debug", currHost.ListenPort, newHost.ListenPort) currHost.ListenPort = newHost.ListenPort sendPeerUpdate = true } if newHost.WgPublicListenPort != 0 && currHost.WgPublicListenPort != newHost.WgPublicListenPort { + slog.Error("WgPublicListenPort changed:", "Debug", currHost.WgPublicListenPort, newHost.WgPublicListenPort) currHost.WgPublicListenPort = newHost.WgPublicListenPort sendPeerUpdate = true } isEndpointChanged := false if currHost.EndpointIP.String() != newHost.EndpointIP.String() { + slog.Error("EndpointIP changed:", "Debug", currHost.EndpointIP, newHost.EndpointIP) currHost.EndpointIP = newHost.EndpointIP sendPeerUpdate = true isEndpointChanged = true } if currHost.EndpointIPv6.String() != newHost.EndpointIPv6.String() { + slog.Error("EndpointIPv6 changed:", "Debug", currHost.EndpointIPv6, newHost.EndpointIPv6) currHost.EndpointIPv6 = newHost.EndpointIPv6 sendPeerUpdate = true isEndpointChanged = true @@ -290,6 +295,7 @@ func UpdateHostFromClient(newHost, currHost *models.Host) (sendPeerUpdate bool) currHost.Name = newHost.Name if len(newHost.NatType) > 0 && newHost.NatType != currHost.NatType { + slog.Error("NatType changed:", "Debug", currHost.NatType, newHost.NatType) currHost.NatType = newHost.NatType sendPeerUpdate = true } diff --git a/logic/nodes.go b/logic/nodes.go index 34eebe2e4..d4f9df087 100644 --- a/logic/nodes.go +++ b/logic/nodes.go @@ -5,7 +5,9 @@ import ( "encoding/json" "errors" "fmt" + "maps" "net" + "slices" "sort" "sync" "time" @@ -24,8 +26,10 @@ import ( ) var ( - nodeCacheMutex = &sync.RWMutex{} - nodesCacheMap = make(map[string]models.Node) + nodeCacheMutex = &sync.RWMutex{} + nodeNetworkCacheMutex = &sync.RWMutex{} + nodesCacheMap = make(map[string]models.Node) + nodesNetworkCacheMap = make(map[string]map[string]models.Node) ) func getNodeFromCache(nodeID string) (node models.Node, ok bool) { @@ -48,12 +52,37 @@ func deleteNodeFromCache(nodeID string) { delete(nodesCacheMap, nodeID) nodeCacheMutex.Unlock() } +func deleteNodeFromNetworkCache(nodeID string, network string) { + nodeNetworkCacheMutex.Lock() + delete(nodesNetworkCacheMap[network], nodeID) + nodeNetworkCacheMutex.Unlock() +} + +func storeNodeInNetworkCache(node models.Node, network string) { + nodeNetworkCacheMutex.Lock() + if nodesNetworkCacheMap[network] == nil { + nodesNetworkCacheMap[network] = make(map[string]models.Node) + } + nodesNetworkCacheMap[network][node.ID.String()] = node + nodeNetworkCacheMutex.Unlock() +} func storeNodeInCache(node models.Node) { nodeCacheMutex.Lock() nodesCacheMap[node.ID.String()] = node nodeCacheMutex.Unlock() } +func loadNodesIntoNetworkCache(nMap map[string]models.Node) { + nodeNetworkCacheMutex.Lock() + for _, v := range nMap { + network := v.Network + if nodesNetworkCacheMap[network] == nil { + nodesNetworkCacheMap[network] = make(map[string]models.Node) + } + nodesNetworkCacheMap[network][v.ID.String()] = v + } + nodeNetworkCacheMutex.Unlock() +} func loadNodesIntoCache(nMap map[string]models.Node) { nodeCacheMutex.Lock() @@ -63,6 +92,7 @@ func loadNodesIntoCache(nMap map[string]models.Node) { func ClearNodeCache() { nodeCacheMutex.Lock() nodesCacheMap = make(map[string]models.Node) + nodesNetworkCacheMap = make(map[string]map[string]models.Node) nodeCacheMutex.Unlock() } @@ -77,6 +107,12 @@ const ( // GetNetworkNodes - gets the nodes of a network func GetNetworkNodes(network string) ([]models.Node, error) { + + if networkNodes, ok := nodesNetworkCacheMap[network]; ok { + nodeNetworkCacheMutex.Lock() + defer nodeNetworkCacheMutex.Unlock() + return slices.Collect(maps.Values(networkNodes)), nil + } allnodes, err := GetAllNodes() if err != nil { return []models.Node{}, err @@ -99,6 +135,12 @@ func GetHostNodes(host *models.Host) []models.Node { // GetNetworkNodesMemory - gets all nodes belonging to a network from list in memory func GetNetworkNodesMemory(allNodes []models.Node, network string) []models.Node { + + if networkNodes, ok := nodesNetworkCacheMap[network]; ok { + nodeNetworkCacheMutex.Lock() + defer nodeNetworkCacheMutex.Unlock() + return slices.Collect(maps.Values(networkNodes)) + } var nodes = []models.Node{} for i := range allNodes { node := allNodes[i] @@ -123,6 +165,7 @@ func UpdateNodeCheckin(node *models.Node) error { } if servercfg.CacheEnabled() { storeNodeInCache(*node) + storeNodeInNetworkCache(*node, node.Network) } return nil } @@ -140,6 +183,7 @@ func UpsertNode(newNode *models.Node) error { } if servercfg.CacheEnabled() { storeNodeInCache(*newNode) + storeNodeInNetworkCache(*newNode, newNode.Network) } return nil } @@ -179,6 +223,7 @@ func UpdateNode(currentNode *models.Node, newNode *models.Node) error { } if servercfg.CacheEnabled() { storeNodeInCache(*newNode) + storeNodeInNetworkCache(*newNode, newNode.Network) } return nil } @@ -288,6 +333,7 @@ func DeleteNodeByID(node *models.Node) error { } if servercfg.CacheEnabled() { deleteNodeFromCache(node.ID.String()) + deleteNodeFromNetworkCache(node.ID.String(), node.Network) } if servercfg.IsDNSMode() { SetDNS() @@ -350,6 +396,7 @@ func GetAllNodes() ([]models.Node, error) { nodesMap := make(map[string]models.Node) if servercfg.CacheEnabled() { defer loadNodesIntoCache(nodesMap) + defer loadNodesIntoNetworkCache(nodesMap) } collection, err := database.FetchRecords(database.NODES_TABLE_NAME) if err != nil { @@ -459,6 +506,7 @@ func GetNodeByID(uuid string) (models.Node, error) { } if servercfg.CacheEnabled() { storeNodeInCache(node) + storeNodeInNetworkCache(node, node.Network) } return node, nil } @@ -612,6 +660,7 @@ func createNode(node *models.Node) error { } if servercfg.CacheEnabled() { storeNodeInCache(*node) + storeNodeInNetworkCache(*node, node.Network) } if _, ok := allocatedIpMap[node.Network]; ok { if node.Address.IP != nil { diff --git a/logic/peers.go b/logic/peers.go index f67162636..1669fa528 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -96,6 +96,8 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N if !node.Connected || node.PendingDelete || node.Action == models.NODE_DELETE { continue } + // check default policy if all allowed return true + defaultPolicy, _ := GetDefaultPolicy(models.NetworkID(node.Network), models.DevicePolicy) if host.OS == models.OS_Types.IoT { hostPeerUpdate.NodeAddrs = append(hostPeerUpdate.NodeAddrs, node.PrimaryAddressIPNet()) if node.IsRelayed { @@ -259,7 +261,7 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N !peer.PendingDelete && peer.Connected && nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) && - IsNodeAllowedToCommunicate(node, peer) && + (defaultPolicy.Enabled || IsNodeAllowedToCommunicate(node, peer, false)) && (deletedNode == nil || (deletedNode != nil && peer.ID.String() != deletedNode.ID.String())) { peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection } diff --git a/mq/emqx_on_prem.go b/mq/emqx_on_prem.go index b9cd690cc..ff8c9bf1f 100644 --- a/mq/emqx_on_prem.go +++ b/mq/emqx_on_prem.go @@ -261,7 +261,7 @@ func (e *EmqxOnPrem) CreateDefaultAllowRule() error { if err != nil { return err } - req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/all", bytes.NewReader(payload)) + req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/rules/all", bytes.NewReader(payload)) if err != nil { return err } diff --git a/mq/handlers.go b/mq/handlers.go index 129133268..3e23047a4 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -106,6 +106,10 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { switch hostUpdate.Action { case models.CheckIn: sendPeerUpdate = HandleHostCheckin(&hostUpdate.Host, currentHost) + if sendPeerUpdate { + slog.Error("sendPeerUpdate from UpdateHost.CheckIn", "Debug", sendPeerUpdate, &hostUpdate.Host.Name, &hostUpdate.Host.ID) + } + case models.Acknowledgement: hu := hostactions.GetAction(currentHost.ID.String()) if hu != nil { @@ -129,6 +133,9 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { replacePeers = true } sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost) + if sendPeerUpdate { + slog.Error("sendPeerUpdate from UpdateHost.UpdateHost", "Debug", sendPeerUpdate, &hostUpdate.Host.Name, &hostUpdate.Host.ID) + } err := logic.UpsertHost(currentHost) if err != nil { slog.Error("failed to update host", "id", currentHost.ID, "error", err) diff --git a/mq/migrate.go b/mq/migrate.go index ec8b4afb0..5790e4721 100644 --- a/mq/migrate.go +++ b/mq/migrate.go @@ -88,7 +88,15 @@ func SendPullSYN() error { Host: host, } msg, _ := json.Marshal(hostUpdate) - encrypted, encryptErr := encryptMsg(&host, msg) + zipped, err := compressPayload(msg) + if err != nil { + return err + } + encrypted, encryptErr := encryptAESGCM(host.TrafficKeyPublic[0:32], zipped) + if encryptErr != nil { + return encryptErr + } + if encryptErr != nil { continue } diff --git a/mq/publishers.go b/mq/publishers.go index 3b47390a8..2073eccac 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -4,9 +4,11 @@ import ( "encoding/json" "errors" "fmt" + "runtime" "sync" "time" + "github.com/google/uuid" "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" @@ -14,11 +16,25 @@ import ( "golang.org/x/exp/slog" ) -var batchSize = servercfg.GetPeerUpdateBatchSize() -var batchUpdate = servercfg.GetBatchPeerUpdate() +var running bool // PublishPeerUpdate --- determines and publishes a peer update to all the hosts func PublishPeerUpdate(replacePeers bool) error { + slog.Error("entering PublishPeerUpdate", "Debug") + if running { + return nil + } + running = true + t1 := time.Now().Unix() + + pc, file, no, ok := runtime.Caller(1) + if ok { + slog.Error("called from ", file, no) + } + details := runtime.FuncForPC(pc) + if ok && details != nil { + slog.Error("called from ", details.Name()) + } if !servercfg.IsMessageQueueBackend() { return nil } @@ -37,35 +53,21 @@ func PublishPeerUpdate(replacePeers bool) error { return err } - //if batch peer update disabled - if !batchUpdate { - for _, host := range hosts { - host := host - go func(host models.Host) { - if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil { - logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) - } - }(host) - } - return nil - } - - //if batch peer update enabled - batchHost := BatchItems(hosts, batchSize) - var wg sync.WaitGroup - for _, v := range batchHost { - hostLen := len(v) - wg.Add(hostLen) - for i := 0; i < hostLen; i++ { - host := hosts[i] - go func(host models.Host) { - if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil { - logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) + for _, host := range hosts { + host := host + time.Sleep(5 * time.Millisecond) + go func(host models.Host) { + if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil { + id := host.Name + if host.ID != uuid.Nil { + id = host.ID.String() } - }(host) - } - wg.Wait() + slog.Error("failed to publish peer update to host", id, ": ", err) + } + }(host) } + running = false + slog.Error("leaving PublishPeerUpdate, time cost: ", "Debug", time.Now().Unix()-t1) return nil } diff --git a/mq/util.go b/mq/util.go index a38cd7d75..67596dd91 100644 --- a/mq/util.go +++ b/mq/util.go @@ -1,8 +1,14 @@ package mq import ( + "bytes" + "compress/gzip" + "crypto/aes" + "crypto/cipher" + "crypto/rand" "errors" "fmt" + "io" "math" "strings" "time" @@ -66,40 +72,49 @@ func BatchItems[T any](items []T, batchSize int) [][]T { return batches } -func encryptMsg(host *models.Host, msg []byte) ([]byte, error) { - if host.OS == models.OS_Types.IoT { - return msg, nil - } - - // fetch server public key to be certain hasn't changed in transit - trafficKey, trafficErr := logic.RetrievePrivateTrafficKey() - if trafficErr != nil { - return nil, trafficErr +func compressPayload(data []byte) ([]byte, error) { + var buf bytes.Buffer + zw := gzip.NewWriter(&buf) + if _, err := zw.Write(data); err != nil { + return nil, err } - - serverPrivKey, err := ncutils.ConvertBytesToKey(trafficKey) + zw.Close() + return buf.Bytes(), nil +} +func encryptAESGCM(key, plaintext []byte) ([]byte, error) { + // Create AES block cipher + block, err := aes.NewCipher(key) if err != nil { return nil, err } - nodePubKey, err := ncutils.ConvertBytesToKey(host.TrafficKeyPublic) + // Create GCM (Galois/Counter Mode) cipher + aesGCM, err := cipher.NewGCM(block) if err != nil { return nil, err } - if strings.Contains(host.Version, "0.10.0") { - return ncutils.BoxEncrypt(msg, nodePubKey, serverPrivKey) + // Create a random nonce + nonce := make([]byte, aesGCM.NonceSize()) + if _, err := io.ReadFull(rand.Reader, nonce); err != nil { + return nil, err } - return ncutils.Chunk(msg, nodePubKey, serverPrivKey) + // Encrypt the data + ciphertext := aesGCM.Seal(nonce, nonce, plaintext, nil) + return ciphertext, nil } func publish(host *models.Host, dest string, msg []byte) error { - - encrypted, encryptErr := encryptMsg(host, msg) + zipped, err := compressPayload(msg) + if err != nil { + return err + } + encrypted, encryptErr := encryptAESGCM(host.TrafficKeyPublic[0:32], zipped) if encryptErr != nil { return encryptErr } + if mqclient == nil || !mqclient.IsConnectionOpen() { return errors.New("cannot publish ... mqclient not connected") } diff --git a/netclient/ncutils/netclientutils.go b/netclient/ncutils/netclientutils.go index 9f79892d0..3ec17647b 100644 --- a/netclient/ncutils/netclientutils.go +++ b/netclient/ncutils/netclientutils.go @@ -31,3 +31,15 @@ func ConvertBytesToKey(data []byte) (*[32]byte, error) { } return result, err } + +// ConvertBytesToKey - util to convert bytes to a key to use elsewhere +func ConvertBytesToKey1(data []byte) ([]byte, error) { + var buffer = bytes.NewBuffer(data) + var dec = gob.NewDecoder(buffer) + var result = []byte{} + var err = dec.Decode(result) + if err != nil { + return nil, err + } + return result, err +} diff --git a/scripts/netmaker.default.env b/scripts/netmaker.default.env index b8a65a1fa..38d631462 100644 --- a/scripts/netmaker.default.env +++ b/scripts/netmaker.default.env @@ -86,10 +86,6 @@ EMAIL_SENDER_ADDR= EMAIL_SENDER_USER= # sender smtp password EMAIL_SENDER_PASSWORD= -# if batch peer update enable or not -PEER_UPDATE_BATCH=true -# batch peer update size when PEER_UPDATE_BATCH is enabled -PEER_UPDATE_BATCH_SIZE=50 # default domain for internal DNS lookup DEFAULT_DOMAIN=netmaker.hosted # managed dns setting, set to true to resolve dns entries on netmaker network diff --git a/servercfg/serverconf.go b/servercfg/serverconf.go index 2cd80e983..6c7afa018 100644 --- a/servercfg/serverconf.go +++ b/servercfg/serverconf.go @@ -686,28 +686,6 @@ func validateDomain(domain string) bool { return exp.MatchString(domain) } -// GetBatchPeerUpdate - if batch peer update -func GetBatchPeerUpdate() bool { - enabled := true - if os.Getenv("PEER_UPDATE_BATCH") != "" { - enabled = os.Getenv("PEER_UPDATE_BATCH") == "true" - } - return enabled -} - -// GetPeerUpdateBatchSize - get the batch size for peer update -func GetPeerUpdateBatchSize() int { - //default 50 - batchSize := 50 - if os.Getenv("PEER_UPDATE_BATCH_SIZE") != "" { - b, e := strconv.Atoi(os.Getenv("PEER_UPDATE_BATCH_SIZE")) - if e == nil && b > 0 && b < 1000 { - batchSize = b - } - } - return batchSize -} - // GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX func GetEmqxRestEndpoint() string { return os.Getenv("EMQX_REST_ENDPOINT")