Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v0.23.0 #2844

Merged
merged 22 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"fmt"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/gravitl/netmaker/logger"
m "github.com/gravitl/netmaker/migrate"
"github.com/gravitl/netmaker/servercfg"
)

Expand Down Expand Up @@ -62,6 +64,11 @@ func HandleRESTRequests(wg *sync.WaitGroup, ctx context.Context) {
logger.Log(0, err.Error())
}
}()
if os.Getenv("MIGRATE_EMQX") == "true" {
logger.Log(0, "migrating emqx...")
time.Sleep(time.Second * 2)
m.MigrateEmqx()
}
logger.Log(0, "REST Server successfully started on port ", port, " (REST)")

// Block main routine until a signal is received
Expand Down
11 changes: 11 additions & 0 deletions controllers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,17 @@ func updateNode(w http.ResponseWriter, r *http.Request) {

}
relayUpdate := logic.RelayUpdates(&currentNode, newNode)
if relayUpdate && newNode.IsRelay {
err = logic.ValidateRelay(models.RelayRequest{
NodeID: newNode.ID.String(),
NetID: newNode.Network,
RelayedNodes: newNode.RelayedNodes,
}, true)
if err != nil {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return
}
}
_, err = logic.GetHost(newNode.HostID.String())
if err != nil {
logger.Log(0, r.Header.Get("user"),
Expand Down
2 changes: 0 additions & 2 deletions logic/acls/nodeacls/modify.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,12 @@ func RemoveNodeACL(networkID NetworkID, nodeID NodeID) (acls.ACLContainer, error
if err != nil {
return nil, err
}
acls.AclMutex.Lock()
for currentNodeID := range currentNetworkACL {
if NodeID(currentNodeID) != nodeID {
currentNetworkACL[currentNodeID].Remove(acls.AclID(nodeID))
}
}
delete(currentNetworkACL, acls.AclID(nodeID))
acls.AclMutex.Unlock()
return currentNetworkACL.Save(acls.ContainerID(networkID))
}

Expand Down
4 changes: 3 additions & 1 deletion logic/acls/nodeacls/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool {
}
var allowed bool
acls.AclMutex.RLock()
allowed = currentNetworkACL[acls.AclID(node1)].IsAllowed(acls.AclID(node2)) && currentNetworkACL[acls.AclID(node2)].IsAllowed(acls.AclID(node1))
currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)]
currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)]
acls.AclMutex.RUnlock()
allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1))
return allowed
}

Expand Down
1 change: 0 additions & 1 deletion logic/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ func DissasociateNodeFromHost(n *models.Node, h *models.Host) error {
if err := DeleteNodeByID(n); err != nil {
return err
}

return UpsertHost(h)
}

Expand Down
3 changes: 0 additions & 3 deletions logic/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func UpdateNode(currentNode *models.Node, newNode *models.Node) error {
func DeleteNode(node *models.Node, purge bool) error {
alreadyDeleted := node.PendingDelete || node.Action == models.NODE_DELETE
node.Action = models.NODE_DELETE

//delete ext clients if node is ingress gw
if node.IsIngressGateway {
if err := DeleteGatewayExtClients(node.ID.String(), node.Network); err != nil {
Expand Down Expand Up @@ -235,7 +234,6 @@ func DeleteNode(node *models.Node, purge bool) error {
if node.IsInternetGateway {
UnsetInternetGw(node)
}

if !purge && !alreadyDeleted {
newnode := *node
newnode.PendingDelete = true
Expand Down Expand Up @@ -281,7 +279,6 @@ func GetNodeByHostRef(hostid, network string) (node models.Node, err error) {
func DeleteNodeByID(node *models.Node) error {
var err error
var key = node.ID.String()

if err = database.DeleteRecord(database.NODES_TABLE_NAME, key); err != nil {
if !database.IsEmptyRecord(err) {
return err
Expand Down
4 changes: 4 additions & 0 deletions logic/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ var SetRelayedNodes = func(setRelayed bool, relay string, relayed []string) []mo
var RelayUpdates = func(currentNode, newNode *models.Node) bool {
return false
}

var ValidateRelay = func(relay models.RelayRequest, update bool) error {
return nil
}
19 changes: 19 additions & 0 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"log"
"time"

"golang.org/x/exp/slog"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/logic/acls"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/mq"
"github.com/gravitl/netmaker/servercfg"
)

Expand All @@ -22,6 +24,7 @@ func Run() {
updateHosts()
updateNodes()
updateAcls()

}

func assignSuperAdmin() {
Expand Down Expand Up @@ -292,3 +295,19 @@ func updateAcls() {
slog.Info(fmt.Sprintf("(migration) successfully saved new acls for network: %s", network.NetID))
}
}

func MigrateEmqx() {

err := mq.SendPullSYN()
if err != nil {
logger.Log(0, "failed to send pull syn to clients", "error", err.Error())

}
time.Sleep(time.Second * 3)
slog.Info("proceeding to kicking out clients from emqx")
err = mq.KickOutClients()
if err != nil {
logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error())
}

}
131 changes: 131 additions & 0 deletions mq/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package mq

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/servercfg"
"golang.org/x/exp/slog"
)

func setupmqtt_old() (mqtt.Client, error) {

opts := mqtt.NewClientOptions()
opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
id := logic.RandomString(23)
opts.ClientID = id
opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(time.Second << 2)
opts.SetKeepAlive(time.Minute)
opts.SetWriteTimeout(time.Minute)
mqclient := mqtt.NewClient(opts)

var connecterr error
if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
if token.Error() == nil {
connecterr = errors.New("connect timeout")
} else {
connecterr = token.Error()
}
slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
}
return mqclient, nil
}

func getEmqxAuthTokenOld() (string, error) {
payload, err := json.Marshal(&emqxLogin{
Username: os.Getenv("OLD_MQ_USERNAME"),
Password: os.Getenv("OLD_MQ_PASSWORD"),
})
if err != nil {
return "", err
}
resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
if err != nil {
return "", err
}
msg, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("error during EMQX login %v", string(msg))
}
var loginResp emqxLoginResponse
if err := json.Unmarshal(msg, &loginResp); err != nil {
return "", err
}
return loginResp.Token, nil
}

func SendPullSYN() error {
mqclient, err := setupmqtt_old()
if err != nil {
return err
}
hosts, err := logic.GetAllHosts()
if err != nil {
return err
}
for _, host := range hosts {
host := host
hostUpdate := models.HostUpdate{
Action: models.RequestPull,
Host: host,
}
msg, _ := json.Marshal(hostUpdate)
encrypted, encryptErr := encryptMsg(&host, msg)
if encryptErr != nil {
continue
}
logger.Log(0, "sending pull syn to", host.Name)
mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
}
return nil
}

func KickOutClients() error {
authToken, err := getEmqxAuthTokenOld()
if err != nil {
return err
}
hosts, err := logic.GetAllHosts()
if err != nil {
slog.Error("failed to migrate emqx: ", "error", err)
return err
}

for _, host := range hosts {
url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
client := &http.Client{}
req, err := http.NewRequest(http.MethodDelete, url, nil)
if err != nil {
slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
continue
}
req.Header.Add("Authorization", "Bearer "+authToken)
res, err := client.Do(req)
if err != nil {
slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
continue
}
if res.StatusCode != http.StatusNoContent {
slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
}
res.Body.Close()
}
return nil
}
1 change: 1 addition & 0 deletions pro/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func InitPro() {
logic.UpdateRelayed = proLogic.UpdateRelayed
logic.SetRelayedNodes = proLogic.SetRelayedNodes
logic.RelayUpdates = proLogic.RelayUpdates
logic.ValidateRelay = proLogic.ValidateRelay
logic.GetTrialEndDate = getTrialEndDate
logic.SetDefaultGw = proLogic.SetDefaultGw
logic.SetDefaultGwForRelayedUpdate = proLogic.SetDefaultGwForRelayedUpdate
Expand Down
9 changes: 6 additions & 3 deletions pro/logic/relays.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func CreateRelay(relay models.RelayRequest) ([]models.Node, models.Node, error)
if host.OS != "linux" {
return returnnodes, models.Node{}, fmt.Errorf("only linux machines can be relay nodes")
}
err = ValidateRelay(relay)
err = ValidateRelay(relay, false)
if err != nil {
return returnnodes, models.Node{}, err
}
Expand Down Expand Up @@ -101,14 +101,14 @@ func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.N
// }

// ValidateRelay - checks if relay is valid
func ValidateRelay(relay models.RelayRequest) error {
func ValidateRelay(relay models.RelayRequest, update bool) error {
var err error

node, err := logic.GetNodeByID(relay.NodeID)
if err != nil {
return err
}
if node.IsRelay {
if !update && node.IsRelay {
return errors.New("node is already acting as a relay")
}
for _, relayedNodeID := range relay.RelayedNodes {
Expand All @@ -119,6 +119,9 @@ func ValidateRelay(relay models.RelayRequest) error {
if relayedNode.IsIngressGateway {
return errors.New("cannot relay an ingress gateway (" + relayedNodeID + ")")
}
if relayedNode.IsInternetGateway {
return errors.New("cannot relay an internet gateway (" + relayedNodeID + ")")
}
}
return err
}
Expand Down