Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NET-1778:comment ACL call and add debug message #3203

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controllers/acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func aclDebug(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return
}
allowed := logic.IsNodeAllowedToCommunicate(node, peer)
allowed := logic.IsNodeAllowedToCommunicate(node, peer, true)
logic.ReturnSuccessResponseWithJson(w, r, allowed, "fetched all acls in the network ")
}

Expand Down
6 changes: 6 additions & 0 deletions controllers/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,19 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
switch hostUpdate.Action {
case models.CheckIn:
sendPeerUpdate = mq.HandleHostCheckin(&hostUpdate.Host, currentHost)
if sendPeerUpdate {
slog.Error("sendPeerUpdate from CheckIn", "Debug", sendPeerUpdate, &hostUpdate.Host.Name, &hostUpdate.Host.ID)
}

case models.UpdateHost:
if hostUpdate.Host.PublicKey != currentHost.PublicKey {
//remove old peer entry
replacePeers = true
}
sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
if sendPeerUpdate {
slog.Error("sendPeerUpdate from UpdateHost", "Debug", sendPeerUpdate, &hostUpdate.Host.Name, &hostUpdate.Host.ID)
}
err := logic.UpsertHost(currentHost)
if err != nil {
slog.Error("failed to update host", "id", currentHost.ID, "error", err)
Expand Down
34 changes: 24 additions & 10 deletions logic/acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,14 +346,20 @@ func GetDefaultPolicy(netID models.NetworkID, ruleType models.AclPolicyType) (mo
return acl, nil
}
// check if there are any custom all policies
srcMap := make(map[string]struct{})
dstMap := make(map[string]struct{})
defer func() {
srcMap = nil
dstMap = nil
}()
policies, _ := ListAcls(netID)
for _, policy := range policies {
if !policy.Enabled {
continue
}
if policy.RuleType == ruleType {
dstMap := convAclTagToValueMap(policy.Dst)
srcMap := convAclTagToValueMap(policy.Src)
dstMap = convAclTagToValueMap(policy.Dst)
srcMap = convAclTagToValueMap(policy.Src)
if _, ok := srcMap["*"]; ok {
if _, ok := dstMap["*"]; ok {
return policy, nil
Expand Down Expand Up @@ -511,29 +517,37 @@ func IsUserAllowedToCommunicate(userName string, peer models.Node) bool {
}

// IsNodeAllowedToCommunicate - check node is allowed to communicate with the peer
func IsNodeAllowedToCommunicate(node, peer models.Node) bool {
func IsNodeAllowedToCommunicate(node, peer models.Node, checkDefaultPolicy bool) bool {
if node.IsStatic {
node = node.StaticNode.ConvertToStaticNode()
}
if peer.IsStatic {
peer = peer.StaticNode.ConvertToStaticNode()
}
// check default policy if all allowed return true
defaultPolicy, err := GetDefaultPolicy(models.NetworkID(node.Network), models.DevicePolicy)
if err == nil {
if defaultPolicy.Enabled {
return true
if checkDefaultPolicy {
// check default policy if all allowed return true
defaultPolicy, err := GetDefaultPolicy(models.NetworkID(node.Network), models.DevicePolicy)
if err == nil {
if defaultPolicy.Enabled {
return true
}
}
}

// list device policies
policies := listDevicePolicies(models.NetworkID(peer.Network))
srcMap := make(map[string]struct{})
dstMap := make(map[string]struct{})
defer func() {
srcMap = nil
dstMap = nil
}()
for _, policy := range policies {
if !policy.Enabled {
continue
}
srcMap := convAclTagToValueMap(policy.Src)
dstMap := convAclTagToValueMap(policy.Dst)
srcMap = convAclTagToValueMap(policy.Src)
dstMap = convAclTagToValueMap(policy.Dst)
// fmt.Printf("\n======> SRCMAP: %+v\n", srcMap)
// fmt.Printf("\n======> DSTMAP: %+v\n", dstMap)
// fmt.Printf("\n======> node Tags: %+v\n", node.Tags)
Expand Down
27 changes: 14 additions & 13 deletions logic/acls/nodeacls/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@ var NodesAllowedACLMutex = &sync.Mutex{}

// AreNodesAllowed - checks if nodes are allowed to communicate in their network ACL
func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool {
NodesAllowedACLMutex.Lock()
defer NodesAllowedACLMutex.Unlock()
var currentNetworkACL, err = FetchAllACLs(networkID)
if err != nil {
return false
}
var allowed bool
acls.AclMutex.Lock()
currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)]
currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)]
acls.AclMutex.Unlock()
allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1))
return allowed
return true
// NodesAllowedACLMutex.Lock()
// defer NodesAllowedACLMutex.Unlock()
// var currentNetworkACL, err = FetchAllACLs(networkID)
// if err != nil {
// return false
// }
// var allowed bool
// acls.AclMutex.Lock()
// currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)]
// currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)]
// acls.AclMutex.Unlock()
// allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1))
// return allowed
}

// FetchNodeACL - fetches a specific node's ACL in a given network
Expand Down
8 changes: 4 additions & 4 deletions logic/extpeers.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func GetFwRulesOnIngressGateway(node models.Node) (rules []models.FwRule) {
if peer.StaticNode.ClientID == nodeI.StaticNode.ClientID || peer.IsUserNode {
continue
}
if IsNodeAllowedToCommunicate(nodeI, peer) {
if IsNodeAllowedToCommunicate(nodeI, peer, true) {
if peer.IsStatic {
if nodeI.StaticNode.Address != "" {
rules = append(rules, models.FwRule{
Expand Down Expand Up @@ -650,7 +650,7 @@ func GetExtPeers(node, peer *models.Node) ([]wgtypes.PeerConfig, []models.IDandA
continue
}
if extPeer.RemoteAccessClientID == "" {
if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), *peer) {
if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), *peer, true) {
continue
}
} else {
Expand Down Expand Up @@ -739,7 +739,7 @@ func getExtpeerEgressRanges(node models.Node) (ranges, ranges6 []net.IPNet) {
if len(extPeer.ExtraAllowedIPs) == 0 {
continue
}
if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node) {
if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node, true) {
continue
}
for _, allowedRange := range extPeer.ExtraAllowedIPs {
Expand All @@ -766,7 +766,7 @@ func getExtpeersExtraRoutes(node models.Node) (egressRoutes []models.EgressNetwo
if len(extPeer.ExtraAllowedIPs) == 0 {
continue
}
if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node) {
if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node, true) {
continue
}
egressRoutes = append(egressRoutes, getExtPeerEgressRoute(node, extPeer)...)
Expand Down
6 changes: 6 additions & 0 deletions logic/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,25 +243,30 @@ func UpdateHost(newHost, currentHost *models.Host) {
// UpdateHostFromClient - used for updating host on server with update recieved from client
func UpdateHostFromClient(newHost, currHost *models.Host) (sendPeerUpdate bool) {
if newHost.PublicKey != currHost.PublicKey {
slog.Error("PublicKey changed:", "Debug")
currHost.PublicKey = newHost.PublicKey
sendPeerUpdate = true
}
if newHost.ListenPort != 0 && currHost.ListenPort != newHost.ListenPort {
slog.Error("ListenPort changed:", "Debug", currHost.ListenPort, newHost.ListenPort)
currHost.ListenPort = newHost.ListenPort
sendPeerUpdate = true
}
if newHost.WgPublicListenPort != 0 &&
currHost.WgPublicListenPort != newHost.WgPublicListenPort {
slog.Error("WgPublicListenPort changed:", "Debug", currHost.WgPublicListenPort, newHost.WgPublicListenPort)
currHost.WgPublicListenPort = newHost.WgPublicListenPort
sendPeerUpdate = true
}
isEndpointChanged := false
if currHost.EndpointIP.String() != newHost.EndpointIP.String() {
slog.Error("EndpointIP changed:", "Debug", currHost.EndpointIP, newHost.EndpointIP)
currHost.EndpointIP = newHost.EndpointIP
sendPeerUpdate = true
isEndpointChanged = true
}
if currHost.EndpointIPv6.String() != newHost.EndpointIPv6.String() {
slog.Error("EndpointIPv6 changed:", "Debug", currHost.EndpointIPv6, newHost.EndpointIPv6)
currHost.EndpointIPv6 = newHost.EndpointIPv6
sendPeerUpdate = true
isEndpointChanged = true
Expand Down Expand Up @@ -290,6 +295,7 @@ func UpdateHostFromClient(newHost, currHost *models.Host) (sendPeerUpdate bool)

currHost.Name = newHost.Name
if len(newHost.NatType) > 0 && newHost.NatType != currHost.NatType {
slog.Error("NatType changed:", "Debug", currHost.NatType, newHost.NatType)
currHost.NatType = newHost.NatType
sendPeerUpdate = true
}
Expand Down
53 changes: 51 additions & 2 deletions logic/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"encoding/json"
"errors"
"fmt"
"maps"
"net"
"slices"
"sort"
"sync"
"time"
Expand All @@ -24,8 +26,10 @@ import (
)

var (
nodeCacheMutex = &sync.RWMutex{}
nodesCacheMap = make(map[string]models.Node)
nodeCacheMutex = &sync.RWMutex{}
nodeNetworkCacheMutex = &sync.RWMutex{}
nodesCacheMap = make(map[string]models.Node)
nodesNetworkCacheMap = make(map[string]map[string]models.Node)
)

func getNodeFromCache(nodeID string) (node models.Node, ok bool) {
Expand All @@ -48,12 +52,37 @@ func deleteNodeFromCache(nodeID string) {
delete(nodesCacheMap, nodeID)
nodeCacheMutex.Unlock()
}
func deleteNodeFromNetworkCache(nodeID string, network string) {
nodeNetworkCacheMutex.Lock()
delete(nodesNetworkCacheMap[network], nodeID)
nodeNetworkCacheMutex.Unlock()
}

func storeNodeInNetworkCache(node models.Node, network string) {
nodeNetworkCacheMutex.Lock()
if nodesNetworkCacheMap[network] == nil {
nodesNetworkCacheMap[network] = make(map[string]models.Node)
}
nodesNetworkCacheMap[network][node.ID.String()] = node
nodeNetworkCacheMutex.Unlock()
}

func storeNodeInCache(node models.Node) {
nodeCacheMutex.Lock()
nodesCacheMap[node.ID.String()] = node
nodeCacheMutex.Unlock()
}
func loadNodesIntoNetworkCache(nMap map[string]models.Node) {
nodeNetworkCacheMutex.Lock()
for _, v := range nMap {
network := v.Network
if nodesNetworkCacheMap[network] == nil {
nodesNetworkCacheMap[network] = make(map[string]models.Node)
}
nodesNetworkCacheMap[network][v.ID.String()] = v
}
nodeNetworkCacheMutex.Unlock()
}

func loadNodesIntoCache(nMap map[string]models.Node) {
nodeCacheMutex.Lock()
Expand All @@ -63,6 +92,7 @@ func loadNodesIntoCache(nMap map[string]models.Node) {
func ClearNodeCache() {
nodeCacheMutex.Lock()
nodesCacheMap = make(map[string]models.Node)
nodesNetworkCacheMap = make(map[string]map[string]models.Node)
nodeCacheMutex.Unlock()
}

Expand All @@ -77,6 +107,12 @@ const (

// GetNetworkNodes - gets the nodes of a network
func GetNetworkNodes(network string) ([]models.Node, error) {

if networkNodes, ok := nodesNetworkCacheMap[network]; ok {
nodeNetworkCacheMutex.Lock()
defer nodeNetworkCacheMutex.Unlock()
return slices.Collect(maps.Values(networkNodes)), nil
}
allnodes, err := GetAllNodes()
if err != nil {
return []models.Node{}, err
Expand All @@ -99,6 +135,12 @@ func GetHostNodes(host *models.Host) []models.Node {

// GetNetworkNodesMemory - gets all nodes belonging to a network from list in memory
func GetNetworkNodesMemory(allNodes []models.Node, network string) []models.Node {

if networkNodes, ok := nodesNetworkCacheMap[network]; ok {
nodeNetworkCacheMutex.Lock()
defer nodeNetworkCacheMutex.Unlock()
return slices.Collect(maps.Values(networkNodes))
}
var nodes = []models.Node{}
for i := range allNodes {
node := allNodes[i]
Expand All @@ -123,6 +165,7 @@ func UpdateNodeCheckin(node *models.Node) error {
}
if servercfg.CacheEnabled() {
storeNodeInCache(*node)
storeNodeInNetworkCache(*node, node.Network)
}
return nil
}
Expand All @@ -140,6 +183,7 @@ func UpsertNode(newNode *models.Node) error {
}
if servercfg.CacheEnabled() {
storeNodeInCache(*newNode)
storeNodeInNetworkCache(*newNode, newNode.Network)
}
return nil
}
Expand Down Expand Up @@ -179,6 +223,7 @@ func UpdateNode(currentNode *models.Node, newNode *models.Node) error {
}
if servercfg.CacheEnabled() {
storeNodeInCache(*newNode)
storeNodeInNetworkCache(*newNode, newNode.Network)
}
return nil
}
Expand Down Expand Up @@ -288,6 +333,7 @@ func DeleteNodeByID(node *models.Node) error {
}
if servercfg.CacheEnabled() {
deleteNodeFromCache(node.ID.String())
deleteNodeFromNetworkCache(node.ID.String(), node.Network)
}
if servercfg.IsDNSMode() {
SetDNS()
Expand Down Expand Up @@ -350,6 +396,7 @@ func GetAllNodes() ([]models.Node, error) {
nodesMap := make(map[string]models.Node)
if servercfg.CacheEnabled() {
defer loadNodesIntoCache(nodesMap)
defer loadNodesIntoNetworkCache(nodesMap)
}
collection, err := database.FetchRecords(database.NODES_TABLE_NAME)
if err != nil {
Expand Down Expand Up @@ -459,6 +506,7 @@ func GetNodeByID(uuid string) (models.Node, error) {
}
if servercfg.CacheEnabled() {
storeNodeInCache(node)
storeNodeInNetworkCache(node, node.Network)
}
return node, nil
}
Expand Down Expand Up @@ -612,6 +660,7 @@ func createNode(node *models.Node) error {
}
if servercfg.CacheEnabled() {
storeNodeInCache(*node)
storeNodeInNetworkCache(*node, node.Network)
}
if _, ok := allocatedIpMap[node.Network]; ok {
if node.Address.IP != nil {
Expand Down
4 changes: 3 additions & 1 deletion logic/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N
if !node.Connected || node.PendingDelete || node.Action == models.NODE_DELETE {
continue
}
// check default policy if all allowed return true
defaultPolicy, _ := GetDefaultPolicy(models.NetworkID(node.Network), models.DevicePolicy)
if host.OS == models.OS_Types.IoT {
hostPeerUpdate.NodeAddrs = append(hostPeerUpdate.NodeAddrs, node.PrimaryAddressIPNet())
if node.IsRelayed {
Expand Down Expand Up @@ -259,7 +261,7 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N
!peer.PendingDelete &&
peer.Connected &&
nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) &&
IsNodeAllowedToCommunicate(node, peer) &&
(defaultPolicy.Enabled || IsNodeAllowedToCommunicate(node, peer, false)) &&
(deletedNode == nil || (deletedNode != nil && peer.ID.String() != deletedNode.ID.String())) {
peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
}
Expand Down
2 changes: 1 addition & 1 deletion mq/emqx_on_prem.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (e *EmqxOnPrem) CreateDefaultAllowRule() error {
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/all", bytes.NewReader(payload))
req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/rules/all", bytes.NewReader(payload))
if err != nil {
return err
}
Expand Down
Loading
Loading