diff --git a/controllers/controller.go b/controllers/controller.go index d80093beb..4ce41e47c 100644 --- a/controllers/controller.go +++ b/controllers/controller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "os" "strings" "sync" "time" @@ -11,6 +12,7 @@ import ( "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/gravitl/netmaker/logger" + m "github.com/gravitl/netmaker/migrate" "github.com/gravitl/netmaker/servercfg" ) @@ -62,6 +64,11 @@ func HandleRESTRequests(wg *sync.WaitGroup, ctx context.Context) { logger.Log(0, err.Error()) } }() + if os.Getenv("MIGRATE_EMQX") == "true" { + logger.Log(0, "migrating emqx...") + time.Sleep(time.Second * 2) + m.MigrateEmqx() + } logger.Log(0, "REST Server successfully started on port ", port, " (REST)") // Block main routine until a signal is received diff --git a/migrate/migrate.go b/migrate/migrate.go index ce4cc8407..0dd8ba5cb 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "time" "golang.org/x/exp/slog" @@ -12,6 +13,7 @@ import ( "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/logic/acls" "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/servercfg" ) @@ -22,6 +24,7 @@ func Run() { updateHosts() updateNodes() updateAcls() + } func assignSuperAdmin() { @@ -292,3 +295,19 @@ func updateAcls() { slog.Info(fmt.Sprintf("(migration) successfully saved new acls for network: %s", network.NetID)) } } + +func MigrateEmqx() { + + err := mq.SendPullSYN() + if err != nil { + logger.Log(0, "failed to send pull syn to clients", "error", err.Error()) + + } + time.Sleep(time.Second * 3) + slog.Info("proceeding to kicking out clients from emqx") + err = mq.KickOutClients() + if err != nil { + logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error()) + } + +} diff --git a/mq/migrate.go b/mq/migrate.go new file mode 100644 index 000000000..068d785f0 --- /dev/null +++ b/mq/migrate.go @@ -0,0 +1,131 @@ +package mq + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/servercfg" + "golang.org/x/exp/slog" +) + +func setupmqtt_old() (mqtt.Client, error) { + + opts := mqtt.NewClientOptions() + opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT")) + id := logic.RandomString(23) + opts.ClientID = id + opts.SetUsername(os.Getenv("OLD_MQ_USERNAME")) + opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD")) + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectRetryInterval(time.Second << 2) + opts.SetKeepAlive(time.Minute) + opts.SetWriteTimeout(time.Minute) + mqclient := mqtt.NewClient(opts) + + var connecterr error + if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil { + if token.Error() == nil { + connecterr = errors.New("connect timeout") + } else { + connecterr = token.Error() + } + slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr) + } + return mqclient, nil +} + +func getEmqxAuthTokenOld() (string, error) { + payload, err := json.Marshal(&emqxLogin{ + Username: os.Getenv("OLD_MQ_USERNAME"), + Password: os.Getenv("OLD_MQ_PASSWORD"), + }) + if err != nil { + return "", err + } + resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload)) + if err != nil { + return "", err + } + msg, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("error during EMQX login %v", string(msg)) + } + var loginResp emqxLoginResponse + if err := json.Unmarshal(msg, &loginResp); err != nil { + return "", err + } + return loginResp.Token, nil +} + +func SendPullSYN() error { + mqclient, err := setupmqtt_old() + if err != nil { + return err + } + hosts, err := logic.GetAllHosts() + if err != nil { + return err + } + for _, host := range hosts { + host := host + hostUpdate := models.HostUpdate{ + Action: models.RequestPull, + Host: host, + } + msg, _ := json.Marshal(hostUpdate) + encrypted, encryptErr := encryptMsg(&host, msg) + if encryptErr != nil { + continue + } + logger.Log(0, "sending pull syn to", host.Name) + mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted) + } + return nil +} + +func KickOutClients() error { + authToken, err := getEmqxAuthTokenOld() + if err != nil { + return err + } + hosts, err := logic.GetAllHosts() + if err != nil { + slog.Error("failed to migrate emqx: ", "error", err) + return err + } + + for _, host := range hosts { + url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String()) + client := &http.Client{} + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err) + continue + } + req.Header.Add("Authorization", "Bearer "+authToken) + res, err := client.Do(req) + if err != nil { + slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err) + continue + } + if res.StatusCode != http.StatusNoContent { + slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode) + } + res.Body.Close() + } + return nil +}