Skip to content

Commit

Permalink
Merge pull request #2843 from gravitl/ACC-468
Browse files Browse the repository at this point in the history
ACC-468: EMQX Migration Helpers
  • Loading branch information
abhishek9686 authored Mar 4, 2024
2 parents c45f7bf + 778d025 commit e846881
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 0 deletions.
7 changes: 7 additions & 0 deletions controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"fmt"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/gravitl/netmaker/logger"
m "github.com/gravitl/netmaker/migrate"
"github.com/gravitl/netmaker/servercfg"
)

Expand Down Expand Up @@ -62,6 +64,11 @@ func HandleRESTRequests(wg *sync.WaitGroup, ctx context.Context) {
logger.Log(0, err.Error())
}
}()
if os.Getenv("MIGRATE_EMQX") == "true" {
logger.Log(0, "migrating emqx...")
time.Sleep(time.Second * 2)
m.MigrateEmqx()
}
logger.Log(0, "REST Server successfully started on port ", port, " (REST)")

// Block main routine until a signal is received
Expand Down
19 changes: 19 additions & 0 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"log"
"time"

"golang.org/x/exp/slog"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/logic/acls"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/mq"
"github.com/gravitl/netmaker/servercfg"
)

Expand All @@ -22,6 +24,7 @@ func Run() {
updateHosts()
updateNodes()
updateAcls()

}

func assignSuperAdmin() {
Expand Down Expand Up @@ -292,3 +295,19 @@ func updateAcls() {
slog.Info(fmt.Sprintf("(migration) successfully saved new acls for network: %s", network.NetID))
}
}

func MigrateEmqx() {

err := mq.SendPullSYN()
if err != nil {
logger.Log(0, "failed to send pull syn to clients", "error", err.Error())

}
time.Sleep(time.Second * 3)
slog.Info("proceeding to kicking out clients from emqx")
err = mq.KickOutClients()
if err != nil {
logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error())
}

}
131 changes: 131 additions & 0 deletions mq/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package mq

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/servercfg"
"golang.org/x/exp/slog"
)

func setupmqtt_old() (mqtt.Client, error) {

opts := mqtt.NewClientOptions()
opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
id := logic.RandomString(23)
opts.ClientID = id
opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(time.Second << 2)
opts.SetKeepAlive(time.Minute)
opts.SetWriteTimeout(time.Minute)
mqclient := mqtt.NewClient(opts)

var connecterr error
if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
if token.Error() == nil {
connecterr = errors.New("connect timeout")
} else {
connecterr = token.Error()
}
slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
}
return mqclient, nil
}

func getEmqxAuthTokenOld() (string, error) {
payload, err := json.Marshal(&emqxLogin{
Username: os.Getenv("OLD_MQ_USERNAME"),
Password: os.Getenv("OLD_MQ_PASSWORD"),
})
if err != nil {
return "", err
}
resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
if err != nil {
return "", err
}
msg, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("error during EMQX login %v", string(msg))
}
var loginResp emqxLoginResponse
if err := json.Unmarshal(msg, &loginResp); err != nil {
return "", err
}
return loginResp.Token, nil
}

func SendPullSYN() error {
mqclient, err := setupmqtt_old()
if err != nil {
return err
}
hosts, err := logic.GetAllHosts()
if err != nil {
return err
}
for _, host := range hosts {
host := host
hostUpdate := models.HostUpdate{
Action: models.RequestPull,
Host: host,
}
msg, _ := json.Marshal(hostUpdate)
encrypted, encryptErr := encryptMsg(&host, msg)
if encryptErr != nil {
continue
}
logger.Log(0, "sending pull syn to", host.Name)
mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
}
return nil
}

func KickOutClients() error {
authToken, err := getEmqxAuthTokenOld()
if err != nil {
return err
}
hosts, err := logic.GetAllHosts()
if err != nil {
slog.Error("failed to migrate emqx: ", "error", err)
return err
}

for _, host := range hosts {
url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
client := &http.Client{}
req, err := http.NewRequest(http.MethodDelete, url, nil)
if err != nil {
slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
continue
}
req.Header.Add("Authorization", "Bearer "+authToken)
res, err := client.Do(req)
if err != nil {
slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
continue
}
if res.StatusCode != http.StatusNoContent {
slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
}
res.Body.Close()
}
return nil
}

0 comments on commit e846881

Please sign in to comment.