From 9f5596320a146f653938f30d06bc108a08cf899e Mon Sep 17 00:00:00 2001 From: Ben Jackson Date: Sun, 12 Jun 2022 15:41:51 +1000 Subject: [PATCH 1/8] feat: create a single queue for the controller to use --- controllers/messenger/events.go | 380 ++++++++++++++++++++++++++++++++ controllers/messenger/queues.go | 370 +++---------------------------- main.go | 10 + 3 files changed, 417 insertions(+), 343 deletions(-) create mode 100644 controllers/messenger/events.go diff --git a/controllers/messenger/events.go b/controllers/messenger/events.go new file mode 100644 index 00000000..fe5b6714 --- /dev/null +++ b/controllers/messenger/events.go @@ -0,0 +1,380 @@ +package messenger + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/go-logr/logr" + lagoonv1beta1 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta1" + "github.com/uselagoon/remote-controller/internal/harbor" + "github.com/uselagoon/remote-controller/internal/helpers" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +// LagoonEvent defines a Lagoon event type +type LagoonEvent struct { + EventType string `json:"eventType"` + Payload interface{} `json:"payload"` +} + +const ( + lagoonBuild = "lagoon:build" + lagoonTask = "lagoon:task" + lagoonMisc = "lagoon:misc" + lagoonRemval = "lagoon:removal" +) + +func (h *Messaging) handleLagoonEvent(ctx context.Context, opLog logr.Logger, body []byte, event LagoonEvent) error { + // unmarshal the body of the message into a lagoonevent + lEvent := &LagoonEvent{} + err := json.Unmarshal(body, lEvent) + if err != nil { + return err + } + + // turn the payload back into bytes to be processed by the handlers + payloadBytes, err := json.Marshal(lEvent.Payload) + if err != nil { + return err + } + switch lEvent.EventType { + case lagoonBuild: + h.handleBuildEvent(ctx, opLog, payloadBytes) + case lagoonTask: + h.handleTaskEvent(ctx, opLog, payloadBytes) + case lagoonMisc: + h.handleMiscEvent(ctx, opLog, payloadBytes) + case lagoonRemval: + h.handleRemovalEvent(ctx, opLog, payloadBytes) + } + return nil +} + +func (h *Messaging) handleBuildEvent(ctx context.Context, opLog logr.Logger, payload []byte) error { + // unmarshal the body into a lagoonbuild + newBuild := &lagoonv1beta1.LagoonBuild{} + json.Unmarshal(payload, newBuild) + // new builds that come in should initially get created in the controllers own + // namespace before being handled and re-created in the correct namespace + // so set the controller namespace to the build namespace here + newBuild.ObjectMeta.Namespace = h.ControllerNamespace + newBuild.SetLabels( + map[string]string{ + "lagoon.sh/controller": h.ControllerNamespace, + }, + ) + opLog.Info( + fmt.Sprintf( + "Received builddeploy task for project %s, environment %s", + newBuild.Spec.Project.Name, + newBuild.Spec.Project.Environment, + ), + ) + // create it now + if err := h.Client.Create(ctx, newBuild); err != nil { + opLog.Error(err, + fmt.Sprintf( + "Failed to create builddeploy task for project %s, environment %s", + newBuild.Spec.Project.Name, + newBuild.Spec.Project.Environment, + ), + ) + //@TODO: send msg back to lagoon and update task to failed? + return err + } + return nil +} + +func (h *Messaging) handleTaskEvent(ctx context.Context, opLog logr.Logger, payload []byte) error { + // unmarshall the message into a remove task to be processed + jobSpec := &lagoonv1beta1.LagoonTaskSpec{} + json.Unmarshal(payload, jobSpec) + namespace := helpers.GenerateNamespaceName( + jobSpec.Project.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller + jobSpec.Environment.Name, + jobSpec.Project.Name, + h.NamespacePrefix, + h.ControllerNamespace, + h.RandomNamespacePrefix, + ) + opLog.Info( + fmt.Sprintf( + "Received task for project %s, environment %s - %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + namespace, + ), + ) + job := &lagoonv1beta1.LagoonTask{} + job.Spec = *jobSpec + // set the namespace to the `openshiftProjectName` from the environment + job.ObjectMeta.Namespace = namespace + job.SetLabels( + map[string]string{ + "lagoon.sh/taskType": string(lagoonv1beta1.TaskTypeStandard), + "lagoon.sh/taskStatus": string(lagoonv1beta1.TaskStatusPending), + "lagoon.sh/controller": h.ControllerNamespace, + }, + ) + job.ObjectMeta.Name = fmt.Sprintf("lagoon-task-%s-%s", job.Spec.Task.ID, helpers.HashString(job.Spec.Task.ID)[0:6]) + if job.Spec.Task.TaskName != "" { + job.ObjectMeta.Name = job.Spec.Task.TaskName + } + if err := h.Client.Create(ctx, job); err != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to create job task for project %s, environment %s", + job.Spec.Project.Name, + job.Spec.Environment.Name, + ), + ) + return err + } + return nil +} + +func (h *Messaging) handleMiscEvent(ctx context.Context, opLog logr.Logger, payload []byte) error { + // unmarshall the message into a remove task to be processed + jobSpec := &lagoonv1beta1.LagoonTaskSpec{} + json.Unmarshal(payload, jobSpec) + // check which key has been received + namespace := helpers.GenerateNamespaceName( + jobSpec.Project.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller + jobSpec.Environment.Name, + jobSpec.Project.Name, + h.NamespacePrefix, + h.ControllerNamespace, + h.RandomNamespacePrefix, + ) + switch jobSpec.Key { + case "kubernetes:build:cancel", "deploytarget:build:cancel": + opLog.Info( + fmt.Sprintf( + "Received build cancellation for project %s, environment %s - %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + namespace, + ), + ) + err := h.CancelBuild(namespace, jobSpec) + if err != nil { + return err + } + case "kubernetes:task:cancel", "deploytarget:task:cancel": + opLog.Info( + fmt.Sprintf( + "Received task cancellation for project %s, environment %s - %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + namespace, + ), + ) + err := h.CancelTask(namespace, jobSpec) + if err != nil { + return err + } + case "kubernetes:restic:backup:restore", "deploytarget:backup:restore": + opLog.Info( + fmt.Sprintf( + "Received backup restoration for project %s, environment %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + ), + ) + err := h.ResticRestore(namespace, jobSpec) + if err != nil { + return err + } + case "kubernetes:route:migrate", "deploytarget:ingress:migrate": + opLog.Info( + fmt.Sprintf( + "Received ingress migration for project %s", + jobSpec.Project.Name, + ), + ) + err := h.IngressRouteMigration(namespace, jobSpec) + if err != nil { + return err + } + case "kubernetes:task:advanced", "deploytarget:task:advanced": + opLog.Info( + fmt.Sprintf( + "Received advanced task for project %s", + jobSpec.Project.Name, + ), + ) + err := h.AdvancedTask(namespace, jobSpec) + if err != nil { + return err + } + default: + // if we get something that we don't know about, spit out the entire message + opLog.Info( + fmt.Sprintf( + "Received unknown message: %s", + string(payload), + ), + ) + } + return nil +} + +func (h *Messaging) handleRemovalEvent(ctx context.Context, opLog logr.Logger, payload []byte) error { + // unmarshall the message into a remove task to be processed + removeTask := &removeTask{} + json.Unmarshal(payload, removeTask) + // webhooks2tasks sends the `branch` field, but deletion from the API (UI/CLI) does not + // the tasks system crafts a field `branchName` which is passed through + // since webhooks2tasks uses the same underlying mechanism, we can still consume branchName even if branch is populated + if removeTask.Type == "pullrequest" { + removeTask.Branch = removeTask.BranchName + } + // generate the namespace name from the branch and project and any prefixes that the controller may add + ns := helpers.GenerateNamespaceName( + removeTask.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller + removeTask.Branch, + removeTask.ProjectName, + h.NamespacePrefix, + h.ControllerNamespace, + h.RandomNamespacePrefix, + ) + branch := removeTask.Branch + project := removeTask.ProjectName + opLog.WithName("RemoveTask").Info( + fmt.Sprintf( + "Received remove task for project %s, branch %s - %s", + project, + branch, + ns, + ), + ) + namespace := &corev1.Namespace{} + err := h.Client.Get(ctx, types.NamespacedName{ + Name: ns, + }, namespace) + if err != nil { + if strings.Contains(err.Error(), "not found") { + opLog.WithName("RemoveTask").Info( + fmt.Sprintf( + "Namespace %s for project %s, branch %s does not exist, marking deleted", + ns, + project, + branch, + ), + ) + msg := lagoonv1beta1.LagoonMessage{ + Type: "remove", + Namespace: ns, + Meta: &lagoonv1beta1.LagoonLogMeta{ + Project: project, + Environment: branch, + }, + } + msgBytes, _ := json.Marshal(msg) + h.Publish("lagoon-tasks:controller", msgBytes) + } else { + opLog.WithName("RemoveTask").Info( + fmt.Sprintf( + "Unable to get namespace %s for project %s, branch %s: %v", + ns, + project, + branch, + err, + ), + ) + } + //@TODO: send msg back to lagoon and update task to failed? + return nil + + } + // check that the namespace selected for deletion is owned by this controller + if value, ok := namespace.ObjectMeta.Labels["lagoon.sh/controller"]; ok { + if value == h.ControllerNamespace { + /* + get any deployments/statefulsets/daemonsets + then delete them + */ + if h.CleanupHarborRepositoryOnDelete { + lagoonHarbor, err := harbor.NewHarbor(h.Harbor) + if err != nil { + return err + } + curVer, err := lagoonHarbor.GetHarborVersion(ctx) + if err != nil { + return err + } + if lagoonHarbor.UseV2Functions(curVer) { + lagoonHarbor.DeleteRepository(ctx, project, branch) + } + } + if del := h.DeleteLagoonTasks(ctx, opLog.WithName("DeleteLagoonTasks"), ns, project, branch); del == false { + return nil + } + if del := h.DeleteLagoonBuilds(ctx, opLog.WithName("DeleteLagoonBuilds"), ns, project, branch); del == false { + return nil + } + if del := h.DeleteDeployments(ctx, opLog.WithName("DeleteDeployments"), ns, project, branch); del == false { + return nil + } + if del := h.DeleteStatefulSets(ctx, opLog.WithName("DeleteStatefulSets"), ns, project, branch); del == false { + return nil + } + if del := h.DeleteDaemonSets(ctx, opLog.WithName("DeleteDaemonSets"), ns, project, branch); del == false { + return nil + } + if del := h.DeletePVCs(ctx, opLog.WithName("DeletePVCs"), ns, project, branch); del == false { + return nil + } + /* + then delete the namespace + */ + if del := h.DeleteNamespace(ctx, opLog.WithName("DeleteNamespace"), namespace, project, branch); del == false { + return nil + } + opLog.WithName("DeleteNamespace").Info( + fmt.Sprintf( + "Deleted namespace %s for project %s, branch %s", + ns, + project, + branch, + ), + ) + msg := lagoonv1beta1.LagoonMessage{ + Type: "remove", + Namespace: ns, + Meta: &lagoonv1beta1.LagoonLogMeta{ + Project: project, + Environment: branch, + }, + } + msgBytes, _ := json.Marshal(msg) + h.Publish("lagoon-tasks:controller", msgBytes) + return nil + } + // controller label didn't match, log the message + opLog.WithName("RemoveTask").Info( + fmt.Sprintf( + "Selected namespace %s for project %s, branch %s: %v", + ns, + project, + branch, + fmt.Errorf("The controller label value %s does not match %s for this namespace", value, h.ControllerNamespace), + ), + ) + return nil + } + // controller label didn't match, log the message + opLog.WithName("RemoveTask").Info( + fmt.Sprintf( + "Selected namespace %s for project %s, branch %s: %v", + ns, + project, + branch, + fmt.Errorf("The controller ownership label does not exist on this namespace, nothing will be done for this removal request"), + ), + ) + return nil +} diff --git a/controllers/messenger/queues.go b/controllers/messenger/queues.go index 3491baa8..22df06bb 100644 --- a/controllers/messenger/queues.go +++ b/controllers/messenger/queues.go @@ -5,16 +5,13 @@ import ( "encoding/json" "fmt" "log" - "strings" "time" "github.com/cheshir/go-mq" "github.com/go-logr/logr" lagoonv1beta1 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta1" "github.com/uselagoon/remote-controller/internal/harbor" - "github.com/uselagoon/remote-controller/internal/helpers" "gopkg.in/matryer/try.v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -127,37 +124,10 @@ func (h *Messaging) Consumer(targetName string) { //error { forever := make(chan bool) // Handle any tasks that go to the `builddeploy` queue - opLog.Info("Listening for lagoon-tasks:" + targetName + ":builddeploy") - err = messageQueue.SetConsumerHandler("builddeploy-queue", func(message mq.Message) { + opLog.Info(fmt.Sprintf("Listening for lagoon-controller:%s", targetName)) + err = messageQueue.SetConsumerHandler("controller-queue", func(message mq.Message) { if err == nil { - // unmarshal the body into a lagoonbuild - newBuild := &lagoonv1beta1.LagoonBuild{} - json.Unmarshal(message.Body(), newBuild) - // new builds that come in should initially get created in the controllers own - // namespace before being handled and re-created in the correct namespace - // so set the controller namespace to the build namespace here - newBuild.ObjectMeta.Namespace = h.ControllerNamespace - newBuild.SetLabels( - map[string]string{ - "lagoon.sh/controller": h.ControllerNamespace, - }, - ) - opLog.Info( - fmt.Sprintf( - "Received builddeploy task for project %s, environment %s", - newBuild.Spec.Project.Name, - newBuild.Spec.Project.Environment, - ), - ) - // create it now - if err := h.Client.Create(ctx, newBuild); err != nil { - opLog.Error(err, - fmt.Sprintf( - "Failed to create builddeploy task for project %s, environment %s", - newBuild.Spec.Project.Name, - newBuild.Spec.Project.Environment, - ), - ) + if err := h.handleLagoonEvent(ctx, opLog, message.Body()); err != nil { //@TODO: send msg back to lagoon and update task to failed? message.Ack(false) // ack to remove from queue return @@ -169,175 +139,31 @@ func (h *Messaging) Consumer(targetName string) { //error { log.Fatalf(fmt.Sprintf("Failed to set handler to consumer `%s`: %v", "builddeploy-queue", err)) } - // Handle any tasks that go to the `remove` queue - opLog.Info("Listening for lagoon-tasks:" + targetName + ":remove") - err = messageQueue.SetConsumerHandler("remove-queue", func(message mq.Message) { + // Handle any tasks that go to the `builddeploy` queue + opLog.Info(fmt.Sprintf("Listening for lagoon-tasks:%s:builddeploy", targetName)) + err = messageQueue.SetConsumerHandler("builddeploy-queue", func(message mq.Message) { if err == nil { - // unmarshall the message into a remove task to be processed - removeTask := &removeTask{} - json.Unmarshal(message.Body(), removeTask) - // webhooks2tasks sends the `branch` field, but deletion from the API (UI/CLI) does not - // the tasks system crafts a field `branchName` which is passed through - // since webhooks2tasks uses the same underlying mechanism, we can still consume branchName even if branch is populated - if removeTask.Type == "pullrequest" { - removeTask.Branch = removeTask.BranchName - } - // generate the namespace name from the branch and project and any prefixes that the controller may add - ns := helpers.GenerateNamespaceName( - removeTask.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller - removeTask.Branch, - removeTask.ProjectName, - h.NamespacePrefix, - h.ControllerNamespace, - h.RandomNamespacePrefix, - ) - branch := removeTask.Branch - project := removeTask.ProjectName - opLog.WithName("RemoveTask").Info( - fmt.Sprintf( - "Received remove task for project %s, branch %s - %s", - project, - branch, - ns, - ), - ) - namespace := &corev1.Namespace{} - err := h.Client.Get(ctx, types.NamespacedName{ - Name: ns, - }, namespace) - if err != nil { - if strings.Contains(err.Error(), "not found") { - opLog.WithName("RemoveTask").Info( - fmt.Sprintf( - "Namespace %s for project %s, branch %s does not exist, marking deleted", - ns, - project, - branch, - ), - ) - msg := lagoonv1beta1.LagoonMessage{ - Type: "remove", - Namespace: ns, - Meta: &lagoonv1beta1.LagoonLogMeta{ - Project: project, - Environment: branch, - }, - } - msgBytes, _ := json.Marshal(msg) - h.Publish("lagoon-tasks:controller", msgBytes) - } else { - opLog.WithName("RemoveTask").Info( - fmt.Sprintf( - "Unable to get namespace %s for project %s, branch %s: %v", - ns, - project, - branch, - err, - ), - ) - } + if err := h.handleBuildEvent(ctx, opLog, message.Body()); err != nil { //@TODO: send msg back to lagoon and update task to failed? message.Ack(false) // ack to remove from queue return - } - // check that the namespace selected for deletion is owned by this controller - if value, ok := namespace.ObjectMeta.Labels["lagoon.sh/controller"]; ok { - if value == h.ControllerNamespace { - /* - get any deployments/statefulsets/daemonsets - then delete them - */ - if h.CleanupHarborRepositoryOnDelete { - lagoonHarbor, err := harbor.NewHarbor(h.Harbor) - if err != nil { - message.Ack(false) // ack to remove from queue - return - } - curVer, err := lagoonHarbor.GetHarborVersion(ctx) - if err != nil { - message.Ack(false) // ack to remove from queue - return - } - if lagoonHarbor.UseV2Functions(curVer) { - lagoonHarbor.DeleteRepository(ctx, project, branch) - } - } - if del := h.DeleteLagoonTasks(ctx, opLog.WithName("DeleteLagoonTasks"), ns, project, branch); del == false { - message.Ack(false) // ack to remove from queue - return - } - if del := h.DeleteLagoonBuilds(ctx, opLog.WithName("DeleteLagoonBuilds"), ns, project, branch); del == false { - message.Ack(false) // ack to remove from queue - return - } - if del := h.DeleteDeployments(ctx, opLog.WithName("DeleteDeployments"), ns, project, branch); del == false { - message.Ack(false) // ack to remove from queue - return - } - if del := h.DeleteStatefulSets(ctx, opLog.WithName("DeleteStatefulSets"), ns, project, branch); del == false { - message.Ack(false) // ack to remove from queue - return - } - if del := h.DeleteDaemonSets(ctx, opLog.WithName("DeleteDaemonSets"), ns, project, branch); del == false { - message.Ack(false) // ack to remove from queue - return - } - if del := h.DeletePVCs(ctx, opLog.WithName("DeletePVCs"), ns, project, branch); del == false { - message.Ack(false) // ack to remove from queue - return - } - /* - then delete the namespace - */ - if del := h.DeleteNamespace(ctx, opLog.WithName("DeleteNamespace"), namespace, project, branch); del == false { - message.Ack(false) // ack to remove from queue - return - } - opLog.WithName("DeleteNamespace").Info( - fmt.Sprintf( - "Deleted namespace %s for project %s, branch %s", - ns, - project, - branch, - ), - ) - msg := lagoonv1beta1.LagoonMessage{ - Type: "remove", - Namespace: ns, - Meta: &lagoonv1beta1.LagoonLogMeta{ - Project: project, - Environment: branch, - }, - } - msgBytes, _ := json.Marshal(msg) - h.Publish("lagoon-tasks:controller", msgBytes) - message.Ack(false) // ack to remove from queue - return - } - // controller label didn't match, log the message - opLog.WithName("RemoveTask").Info( - fmt.Sprintf( - "Selected namespace %s for project %s, branch %s: %v", - ns, - project, - branch, - fmt.Errorf("The controller label value %s does not match %s for this namespace", value, h.ControllerNamespace), - ), - ) + } + message.Ack(false) // ack to remove from queue + }) + if err != nil { + log.Fatalf(fmt.Sprintf("Failed to set handler to consumer `%s`: %v", "builddeploy-queue", err)) + } + + // Handle any tasks that go to the `remove` queue + opLog.Info(fmt.Sprintf("Listening for lagoon-tasks:%s:remove", targetName)) + err = messageQueue.SetConsumerHandler("remove-queue", func(message mq.Message) { + if err == nil { + if err := h.handleRemovalEvent(ctx, opLog, message.Body()); err != nil { + //@TODO: send msg back to lagoon and update task to failed? message.Ack(false) // ack to remove from queue return } - // controller label didn't match, log the message - opLog.WithName("RemoveTask").Info( - fmt.Sprintf( - "Selected namespace %s for project %s, branch %s: %v", - ns, - project, - branch, - fmt.Errorf("The controller ownership label does not exist on this namespace, nothing will be done for this removal request"), - ), - ) } message.Ack(false) // ack to remove from queue }) @@ -346,51 +172,10 @@ func (h *Messaging) Consumer(targetName string) { //error { } // Handle any tasks that go to the `jobs` queue - opLog.Info("Listening for lagoon-tasks:" + targetName + ":jobs") + opLog.Info(fmt.Sprintf("Listening for lagoon-tasks:%s:jobs", targetName)) err = messageQueue.SetConsumerHandler("jobs-queue", func(message mq.Message) { if err == nil { - // unmarshall the message into a remove task to be processed - jobSpec := &lagoonv1beta1.LagoonTaskSpec{} - json.Unmarshal(message.Body(), jobSpec) - namespace := helpers.GenerateNamespaceName( - jobSpec.Project.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller - jobSpec.Environment.Name, - jobSpec.Project.Name, - h.NamespacePrefix, - h.ControllerNamespace, - h.RandomNamespacePrefix, - ) - opLog.Info( - fmt.Sprintf( - "Received task for project %s, environment %s - %s", - jobSpec.Project.Name, - jobSpec.Environment.Name, - namespace, - ), - ) - job := &lagoonv1beta1.LagoonTask{} - job.Spec = *jobSpec - // set the namespace to the `openshiftProjectName` from the environment - job.ObjectMeta.Namespace = namespace - job.SetLabels( - map[string]string{ - "lagoon.sh/taskType": string(lagoonv1beta1.TaskTypeStandard), - "lagoon.sh/taskStatus": string(lagoonv1beta1.TaskStatusPending), - "lagoon.sh/controller": h.ControllerNamespace, - }, - ) - job.ObjectMeta.Name = fmt.Sprintf("lagoon-task-%s-%s", job.Spec.Task.ID, helpers.HashString(job.Spec.Task.ID)[0:6]) - if job.Spec.Task.TaskName != "" { - job.ObjectMeta.Name = job.Spec.Task.TaskName - } - if err := h.Client.Create(ctx, job); err != nil { - opLog.Error(err, - fmt.Sprintf( - "Unable to create job task for project %s, environment %s", - job.Spec.Project.Name, - job.Spec.Environment.Name, - ), - ) + if err := h.handleTaskEvent(ctx, opLog, message.Body()); err != nil { //@TODO: send msg back to lagoon and update task to failed? message.Ack(false) // ack to remove from queue return @@ -403,114 +188,13 @@ func (h *Messaging) Consumer(targetName string) { //error { } // Handle any tasks that go to the `misc` queue - opLog.Info("Listening for lagoon-tasks:" + targetName + ":misc") + opLog.Info(fmt.Sprintf("Listening for lagoon-tasks:%s:misc", targetName)) err = messageQueue.SetConsumerHandler("misc-queue", func(message mq.Message) { if err == nil { - opLog := ctrl.Log.WithName("handlers").WithName("LagoonTasks") - // unmarshall the message into a remove task to be processed - jobSpec := &lagoonv1beta1.LagoonTaskSpec{} - json.Unmarshal(message.Body(), jobSpec) - // check which key has been received - namespace := helpers.GenerateNamespaceName( - jobSpec.Project.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller - jobSpec.Environment.Name, - jobSpec.Project.Name, - h.NamespacePrefix, - h.ControllerNamespace, - h.RandomNamespacePrefix, - ) - switch jobSpec.Key { - case "kubernetes:build:cancel": - opLog.Info( - fmt.Sprintf( - "Received build cancellation for project %s, environment %s - %s", - jobSpec.Project.Name, - jobSpec.Environment.Name, - namespace, - ), - ) - err := h.CancelBuild(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - case "kubernetes:task:cancel": - opLog.Info( - fmt.Sprintf( - "Received task cancellation for project %s, environment %s - %s", - jobSpec.Project.Name, - jobSpec.Environment.Name, - namespace, - ), - ) - err := h.CancelTask(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - case "kubernetes:restic:backup:restore": - opLog.Info( - fmt.Sprintf( - "Received backup restoration for project %s, environment %s", - jobSpec.Project.Name, - jobSpec.Environment.Name, - ), - ) - err := h.ResticRestore(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - case "kubernetes:route:migrate": - opLog.Info( - fmt.Sprintf( - "Received ingress migration for project %s", - jobSpec.Project.Name, - ), - ) - err := h.IngressRouteMigration(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - case "openshift:route:migrate": - opLog.Info( - fmt.Sprintf( - "Received route migration for project %s", - jobSpec.Project.Name, - ), - ) - err := h.IngressRouteMigration(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - case "kubernetes:task:advanced": - opLog.Info( - fmt.Sprintf( - "Received advanced task for project %s", - jobSpec.Project.Name, - ), - ) - err := h.AdvancedTask(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - default: - // if we get something that we don't know about, spit out the entire message - opLog.Info( - fmt.Sprintf( - "Received unknown message: %s", - string(message.Body()), - ), - ) + if err := h.handleMiscEvent(ctx, opLog, message.Body()); err != nil { + //@TODO: send msg back to lagoon and update task to failed? + message.Ack(false) // ack to remove from queue + return } } message.Ack(false) // ack to remove from queue diff --git a/main.go b/main.go index 4a614947..8838be68 100644 --- a/main.go +++ b/main.go @@ -439,6 +439,16 @@ func main() { }, Consumers: mq.Consumers{ { + Name: "controller-queue", + Queue: fmt.Sprintf("lagoon-controller:%s", lagoonTargetName), + Workers: mqWorkers, + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { Name: "remove-queue", Queue: fmt.Sprintf("lagoon-tasks:%s:remove", lagoonTargetName), Workers: mqWorkers, From 2ecb04d190602442ca2375ede1b6ba04dd055db1 Mon Sep 17 00:00:00 2001 From: Ben Jackson Date: Sun, 12 Jun 2022 16:23:12 +1000 Subject: [PATCH 2/8] fix: remove unneeded argument --- controllers/messenger/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controllers/messenger/events.go b/controllers/messenger/events.go index fe5b6714..5cff0f7a 100644 --- a/controllers/messenger/events.go +++ b/controllers/messenger/events.go @@ -27,7 +27,7 @@ const ( lagoonRemval = "lagoon:removal" ) -func (h *Messaging) handleLagoonEvent(ctx context.Context, opLog logr.Logger, body []byte, event LagoonEvent) error { +func (h *Messaging) handleLagoonEvent(ctx context.Context, opLog logr.Logger, body []byte) error { // unmarshal the body of the message into a lagoonevent lEvent := &LagoonEvent{} err := json.Unmarshal(body, lEvent) From b97b4bfd0c9aea52a306bb6406a0454125389694 Mon Sep 17 00:00:00 2001 From: Ben Jackson Date: Sun, 12 Jun 2022 16:49:24 +1000 Subject: [PATCH 3/8] fix: queue and exchanges --- main.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/main.go b/main.go index 8838be68..e6cc0dfa 100644 --- a/main.go +++ b/main.go @@ -492,6 +492,16 @@ func main() { }, Queues: mq.Queues{ { + Name: fmt.Sprintf("lagoon-controller:%s", lagoonTargetName), + Exchange: "lagoon-controller", + RoutingKey: fmt.Sprintf("controller:%s", lagoonTargetName), + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { Name: fmt.Sprintf("lagoon-tasks:%s:builddeploy", lagoonTargetName), Exchange: "lagoon-tasks", RoutingKey: fmt.Sprintf("%s:builddeploy", lagoonTargetName), @@ -555,6 +565,17 @@ func main() { "content_type": "", }, }, + { + Name: "lagoon-controller:controller", + Exchange: "lagoon-controller", + RoutingKey: "controller", + Options: mq.Options{ + "app_id": lagoonAppID, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, }, DSN: fmt.Sprintf("amqp://%s:%s@%s/", mqUser, mqPass, mqHost), } From 142067cbb25cbf0e4d550fb0e329416aafedcd5d Mon Sep 17 00:00:00 2001 From: Ben Jackson Date: Sun, 12 Jun 2022 17:08:57 +1000 Subject: [PATCH 4/8] fix: add the exchange --- main.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/main.go b/main.go index e6cc0dfa..2daa163a 100644 --- a/main.go +++ b/main.go @@ -436,6 +436,16 @@ func main() { "content_type": "", }, }, + { + Name: "lagoon-controller", + Type: "direct", + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, }, Consumers: mq.Consumers{ { @@ -565,17 +575,6 @@ func main() { "content_type": "", }, }, - { - Name: "lagoon-controller:controller", - Exchange: "lagoon-controller", - RoutingKey: "controller", - Options: mq.Options{ - "app_id": lagoonAppID, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, }, DSN: fmt.Sprintf("amqp://%s:%s@%s/", mqUser, mqPass, mqHost), } From efee2fac5fe96f84d433585c6edca9d234c0d4f0 Mon Sep 17 00:00:00 2001 From: Ben Jackson Date: Tue, 14 Jun 2022 19:26:00 +1000 Subject: [PATCH 5/8] feat: flag single queue configuration --- controllers/messenger/queues.go | 31 ++-- main.go | 302 +++++++++++++++++--------------- 2 files changed, 175 insertions(+), 158 deletions(-) diff --git a/controllers/messenger/queues.go b/controllers/messenger/queues.go index 22df06bb..ec10359c 100644 --- a/controllers/messenger/queues.go +++ b/controllers/messenger/queues.go @@ -47,6 +47,7 @@ type Messaging struct { Harbor harbor.Harbor CleanupHarborRepositoryOnDelete bool EnableDebug bool + EnableSingleQueue bool } // NewMessaging returns a messaging with config and controller-runtime client. @@ -61,6 +62,7 @@ func NewMessaging(config mq.Config, advancedTaskDeployTokenInjection bool, harborConfig harbor.Harbor, cleanupHarborOnDelete bool, + enableSingleQueue bool, enableDebug bool, ) *Messaging { return &Messaging{ @@ -75,6 +77,7 @@ func NewMessaging(config mq.Config, AdvancedTaskDeployTokenInjection: advancedTaskDeployTokenInjection, Harbor: harborConfig, CleanupHarborRepositoryOnDelete: cleanupHarborOnDelete, + EnableSingleQueue: enableSingleQueue, EnableDebug: enableDebug, } } @@ -123,22 +126,24 @@ func (h *Messaging) Consumer(targetName string) { //error { forever := make(chan bool) - // Handle any tasks that go to the `builddeploy` queue - opLog.Info(fmt.Sprintf("Listening for lagoon-controller:%s", targetName)) - err = messageQueue.SetConsumerHandler("controller-queue", func(message mq.Message) { - if err == nil { - if err := h.handleLagoonEvent(ctx, opLog, message.Body()); err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return + // if this controller is set up for single queue only, then only start the single queue listener + if h.EnableSingleQueue { + // Handle any tasks that go to the `lagoon-controller` queue + opLog.Info(fmt.Sprintf("Listening for lagoon-controller:%s", targetName)) + err = messageQueue.SetConsumerHandler("controller-queue", func(message mq.Message) { + if err == nil { + if err := h.handleLagoonEvent(ctx, opLog, message.Body()); err != nil { + //@TODO: send msg back to lagoon and update task to failed? + message.Ack(false) // ack to remove from queue + return + } } + message.Ack(false) // ack to remove from queue + }) + if err != nil { + log.Fatalf(fmt.Sprintf("Failed to set handler to consumer `%s`: %v", "builddeploy-queue", err)) } - message.Ack(false) // ack to remove from queue - }) - if err != nil { - log.Fatalf(fmt.Sprintf("Failed to set handler to consumer `%s`: %v", "builddeploy-queue", err)) } - // Handle any tasks that go to the `builddeploy` queue opLog.Info(fmt.Sprintf("Listening for lagoon-tasks:%s:builddeploy", targetName)) err = messageQueue.SetConsumerHandler("builddeploy-queue", func(message mq.Message) { diff --git a/main.go b/main.go index 2daa163a..787914b5 100644 --- a/main.go +++ b/main.go @@ -78,6 +78,7 @@ func main() { var pendingMessageCron string var mqWorkers int var rabbitRetryInterval int + var enableSingleQueue bool var startupConnectionAttempts int var startupConnectionInterval int var overrideBuildDeployImage string @@ -171,6 +172,7 @@ func main() { "The number of workers to start with.") flag.IntVar(&rabbitRetryInterval, "rabbitmq-retry-interval", 30, "The retry interval for rabbitmq.") + flag.BoolVar(&enableSingleQueue, "enable-single-queue", false, "Flag to have this controller use the single queue option.") flag.StringVar(&leaderElectionID, "leader-election-id", "lagoon-builddeploy-leader-election-helper", "The ID to use for leader election.") flag.StringVar(&pendingMessageCron, "pending-message-cron", "15,45 * * * *", @@ -423,160 +425,169 @@ func main() { os.Exit(1) } - config := mq.Config{ - ReconnectDelay: time.Duration(rabbitRetryInterval) * time.Second, - Exchanges: mq.Exchanges{ - { - Name: "lagoon-tasks", - Type: "direct", - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, - { - Name: "lagoon-controller", - Type: "direct", - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, + exchanges := mq.Exchanges{ + { + Name: "lagoon-tasks", + Type: "direct", + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", }, }, - Consumers: mq.Consumers{ - { - Name: "controller-queue", - Queue: fmt.Sprintf("lagoon-controller:%s", lagoonTargetName), - Workers: mqWorkers, - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: "remove-queue", - Queue: fmt.Sprintf("lagoon-tasks:%s:remove", lagoonTargetName), - Workers: mqWorkers, - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: "builddeploy-queue", - Queue: fmt.Sprintf("lagoon-tasks:%s:builddeploy", lagoonTargetName), - Workers: mqWorkers, - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: "jobs-queue", - Queue: fmt.Sprintf("lagoon-tasks:%s:jobs", lagoonTargetName), - Workers: mqWorkers, - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: "misc-queue", - Queue: fmt.Sprintf("lagoon-tasks:%s:misc", lagoonTargetName), - Workers: mqWorkers, - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, + } + consumers := mq.Consumers{ + { + Name: "remove-queue", + Queue: fmt.Sprintf("lagoon-tasks:%s:remove", lagoonTargetName), + Workers: mqWorkers, + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: "builddeploy-queue", + Queue: fmt.Sprintf("lagoon-tasks:%s:builddeploy", lagoonTargetName), + Workers: mqWorkers, + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: "jobs-queue", + Queue: fmt.Sprintf("lagoon-tasks:%s:jobs", lagoonTargetName), + Workers: mqWorkers, + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: "misc-queue", + Queue: fmt.Sprintf("lagoon-tasks:%s:misc", lagoonTargetName), + Workers: mqWorkers, + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", }, }, - Queues: mq.Queues{ - { - Name: fmt.Sprintf("lagoon-controller:%s", lagoonTargetName), - Exchange: "lagoon-controller", - RoutingKey: fmt.Sprintf("controller:%s", lagoonTargetName), - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: fmt.Sprintf("lagoon-tasks:%s:builddeploy", lagoonTargetName), - Exchange: "lagoon-tasks", - RoutingKey: fmt.Sprintf("%s:builddeploy", lagoonTargetName), - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: fmt.Sprintf("lagoon-tasks:%s:remove", lagoonTargetName), - Exchange: "lagoon-tasks", - RoutingKey: fmt.Sprintf("%s:remove", lagoonTargetName), - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: fmt.Sprintf("lagoon-tasks:%s:jobs", lagoonTargetName), - Exchange: "lagoon-tasks", - RoutingKey: fmt.Sprintf("%s:jobs", lagoonTargetName), - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: fmt.Sprintf("lagoon-tasks:%s:misc", lagoonTargetName), - Exchange: "lagoon-tasks", - RoutingKey: fmt.Sprintf("%s:misc", lagoonTargetName), - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, + } + queues := mq.Queues{ + { + Name: fmt.Sprintf("lagoon-tasks:%s:builddeploy", lagoonTargetName), + Exchange: "lagoon-tasks", + RoutingKey: fmt.Sprintf("%s:builddeploy", lagoonTargetName), + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: fmt.Sprintf("lagoon-tasks:%s:remove", lagoonTargetName), + Exchange: "lagoon-tasks", + RoutingKey: fmt.Sprintf("%s:remove", lagoonTargetName), + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: fmt.Sprintf("lagoon-tasks:%s:jobs", lagoonTargetName), + Exchange: "lagoon-tasks", + RoutingKey: fmt.Sprintf("%s:jobs", lagoonTargetName), + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: fmt.Sprintf("lagoon-tasks:%s:misc", lagoonTargetName), + Exchange: "lagoon-tasks", + RoutingKey: fmt.Sprintf("%s:misc", lagoonTargetName), + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", }, }, - Producers: mq.Producers{ - { - Name: "lagoon-logs", - Exchange: "lagoon-logs", - Options: mq.Options{ - "app_id": lagoonAppID, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, + } + producers := mq.Producers{ + { + Name: "lagoon-logs", + Exchange: "lagoon-logs", + Options: mq.Options{ + "app_id": lagoonAppID, + "delivery_mode": "2", + "headers": "", + "content_type": "", }, - { - Name: "lagoon-tasks:controller", - Exchange: "lagoon-tasks", - RoutingKey: "controller", - Options: mq.Options{ - "app_id": lagoonAppID, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, + }, + { + Name: "lagoon-tasks:controller", + Exchange: "lagoon-tasks", + RoutingKey: "controller", + Options: mq.Options{ + "app_id": lagoonAppID, + "delivery_mode": "2", + "headers": "", + "content_type": "", }, }, - DSN: fmt.Sprintf("amqp://%s:%s@%s/", mqUser, mqPass, mqHost), + } + if enableSingleQueue { + // if this controller is set up for single queue only, then add the configuration for the single queue + exchanges = append(exchanges, mq.ExchangeConfig{ + Name: "lagoon-controller", + Type: "direct", + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }) + consumers = append(consumers, mq.ConsumerConfig{ + Name: "controller-queue", + Queue: fmt.Sprintf("lagoon-controller:%s", lagoonTargetName), + Workers: mqWorkers, + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }) + queues = append(queues, mq.QueueConfig{ + Name: fmt.Sprintf("lagoon-controller:%s", lagoonTargetName), + Exchange: "lagoon-controller", + RoutingKey: fmt.Sprintf("controller:%s", lagoonTargetName), + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }) + } + config := mq.Config{ + ReconnectDelay: time.Duration(rabbitRetryInterval) * time.Second, + Exchanges: exchanges, + Consumers: consumers, + Queues: queues, + Producers: producers, + DSN: fmt.Sprintf("amqp://%s:%s@%s/", mqUser, mqPass, mqHost), } harborURLParsed, _ := url.Parse(harborURL) @@ -614,6 +625,7 @@ func main() { advancedTaskDeployToken, harborConfig, cleanupHarborRepositoryOnDelete, + enableSingleQueue, enableDebug, ) c := cron.New() From 4b3a3264ddaf907cfaf7127a3a8eb2406b38afcb Mon Sep 17 00:00:00 2001 From: shreddedbacon Date: Thu, 29 Sep 2022 15:30:44 +1000 Subject: [PATCH 6/8] chore: fix other conflicts --- internal/messenger/consumer.go | 163 --------------------------------- main.go | 5 - 2 files changed, 168 deletions(-) diff --git a/internal/messenger/consumer.go b/internal/messenger/consumer.go index 917f6ba7..7f8df610 100644 --- a/internal/messenger/consumer.go +++ b/internal/messenger/consumer.go @@ -2,18 +2,13 @@ package messenger import ( "context" - "encoding/json" "fmt" "log" "time" "github.com/cheshir/go-mq" - "github.com/go-logr/logr" - lagoonv1beta1 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta1" "gopkg.in/matryer/try.v1" - "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) // Consumer handles consuming messages sent to the queue that these controllers are connected to and processes them accordingly @@ -143,161 +138,3 @@ func (m *Messenger) Consumer(targetName string) { //error { } <-forever } - -// Publish publishes a message to a given queue -func (h *Messaging) Publish(queue string, message []byte) error { - opLog := ctrl.Log.WithName("handlers").WithName("LagoonTasks") - // no need to re-try here, this is on a cron schedule and the error is returned, cron will try again whenever it is set to run next - messageQueue, err := mq.New(m.Config) - if err != nil { - opLog.Info(fmt.Sprintf("Failed to initialize message queue manager: %v", err)) - return err - } - defer messageQueue.Close() - - producer, err := messageQueue.AsyncProducer(queue) - if err != nil { - opLog.Info(fmt.Sprintf("Failed to get async producer: %v", err)) - return err - } - producer.Produce([]byte(fmt.Sprintf("%s", message))) - return nil -} - -// GetPendingMessages will get any pending messages from the queue and attempt to publish them if possible -func (h *Messaging) GetPendingMessages() { - opLog := ctrl.Log.WithName("handlers").WithName("PendingMessages") - ctx := context.Background() - opLog.Info(fmt.Sprintf("Checking pending build messages across all namespaces")) - m.pendingBuildLogMessages(ctx, opLog) - opLog.Info(fmt.Sprintf("Checking pending task messages across all namespaces")) - m.pendingTaskLogMessages(ctx, opLog) -} - -func (h *Messaging) pendingBuildLogMessages(ctx context.Context, opLog logr.Logger) { - pendingMsgs := &lagoonv1beta1.LagoonBuildList{} - listOption := (&client.ListOptions{}).ApplyOptions([]client.ListOption{ - client.MatchingLabels(map[string]string{ - "lagoon.sh/pendingMessages": "true", - "lagoon.sh/controller": m.ControllerNamespace, - }), - }) - if err := m.Client.List(ctx, pendingMsgs, listOption); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to list LagoonBuilds, there may be none or something went wrong")) - return - } - for _, build := range pendingMsgs.Items { - // get the latest resource in case it has been updated since the loop started - if err := m.Client.Get(ctx, types.NamespacedName{ - Name: build.ObjectMeta.Name, - Namespace: build.ObjectMeta.Namespace, - }, &build); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to get LagoonBuild, something went wrong")) - break - } - opLog.Info(fmt.Sprintf("LagoonBuild %s has pending messages, attempting to re-send", build.ObjectMeta.Name)) - - // try to re-publish message or break and try the next build with pending message - if build.StatusMessages.StatusMessage != nil { - statusBytes, _ := json.Marshal(build.StatusMessages.StatusMessage) - if err := m.Publish("lagoon-logs", statusBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - if build.StatusMessages.BuildLogMessage != nil { - logBytes, _ := json.Marshal(build.StatusMessages.BuildLogMessage) - if err := m.Publish("lagoon-logs", logBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - if build.StatusMessages.EnvironmentMessage != nil { - envBytes, _ := json.Marshal(build.StatusMessages.EnvironmentMessage) - if err := m.Publish("lagoon-tasks:controller", envBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - // if we managed to send all the pending messages, then update the resource to remove the pending state - // so we don't send the same message multiple times - opLog.Info(fmt.Sprintf("Sent pending messages for LagoonBuild %s", build.ObjectMeta.Name)) - mergePatch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{ - "lagoon.sh/pendingMessages": "false", - }, - }, - "statusMessages": nil, - }) - if err := m.Client.Patch(ctx, &build, client.RawPatch(types.MergePatchType, mergePatch)); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to update status condition")) - break - } - } - return -} - -func (h *Messaging) pendingTaskLogMessages(ctx context.Context, opLog logr.Logger) { - pendingMsgs := &lagoonv1beta1.LagoonTaskList{} - listOption := (&client.ListOptions{}).ApplyOptions([]client.ListOption{ - client.MatchingLabels(map[string]string{ - "lagoon.sh/pendingMessages": "true", - "lagoon.sh/controller": m.ControllerNamespace, - }), - }) - if err := m.Client.List(ctx, pendingMsgs, listOption); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to list LagoonBuilds, there may be none or something went wrong")) - return - } - for _, task := range pendingMsgs.Items { - // get the latest resource in case it has been updated since the loop started - if err := m.Client.Get(ctx, types.NamespacedName{ - Name: task.ObjectMeta.Name, - Namespace: task.ObjectMeta.Namespace, - }, &task); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to get LagoonBuild, something went wrong")) - break - } - opLog.Info(fmt.Sprintf("LagoonTasl %s has pending messages, attempting to re-send", task.ObjectMeta.Name)) - - // try to re-publish message or break and try the next build with pending message - if task.StatusMessages.StatusMessage != nil { - statusBytes, _ := json.Marshal(task.StatusMessages.StatusMessage) - if err := m.Publish("lagoon-logs", statusBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - if task.StatusMessages.TaskLogMessage != nil { - taskLogBytes, _ := json.Marshal(task.StatusMessages.TaskLogMessage) - if err := m.Publish("lagoon-logs", taskLogBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - if task.StatusMessages.EnvironmentMessage != nil { - envBytes, _ := json.Marshal(task.StatusMessages.EnvironmentMessage) - if err := m.Publish("lagoon-tasks:controller", envBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - // if we managed to send all the pending messages, then update the resource to remove the pending state - // so we don't send the same message multiple times - opLog.Info(fmt.Sprintf("Sent pending messages for LagoonTask %s", task.ObjectMeta.Name)) - mergePatch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{ - "lagoon.sh/pendingMessages": "false", - }, - }, - "statusMessages": nil, - }) - if err := m.Client.Patch(ctx, &task, client.RawPatch(types.MergePatchType, mergePatch)); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to update status condition")) - break - } - } - return -} diff --git a/main.go b/main.go index 607ea700..d629e205 100644 --- a/main.go +++ b/main.go @@ -650,13 +650,8 @@ func main() { randomPrefix, advancedTaskSSHKeyInjection, advancedTaskDeployToken, -<<<<<<< HEAD - harborConfig, - cleanupHarborRepositoryOnDelete, enableSingleQueue, -======= deletion, ->>>>>>> main enableDebug, ) From 7f88c452d42ed6aada1059b0170f24b17b7fb854 Mon Sep 17 00:00:00 2001 From: shreddedbacon Date: Wed, 2 Nov 2022 16:48:40 +1100 Subject: [PATCH 7/8] test: update test to use the new single queue --- .github/workflows/remote-controller.yaml | 1 + config/default/config.properties | 3 +- config/default/envs.yaml | 7 +- controller-test.sh | 88 +++++++++++++----------- internal/messenger/messenger.go | 1 + main.go | 1 + 6 files changed, 60 insertions(+), 41 deletions(-) diff --git a/.github/workflows/remote-controller.yaml b/.github/workflows/remote-controller.yaml index 29638014..e5d7fcf2 100644 --- a/.github/workflows/remote-controller.yaml +++ b/.github/workflows/remote-controller.yaml @@ -111,5 +111,6 @@ jobs: export HARBOR_API="http://harbor.$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[0].address}').nip.io:32080/api" export KIND_NODE_IP="$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[0].address}')" export HARBOR_VERSION=${{matrix.harbor}} + export ENABLE_SINGLE_QUEUE=true # export GO111MODULE=on make controller-test \ No newline at end of file diff --git a/config/default/config.properties b/config/default/config.properties index 8d18a393..959af732 100644 --- a/config/default/config.properties +++ b/config/default/config.properties @@ -1,3 +1,4 @@ OVERRIDE_BUILD_DEPLOY_DIND_IMAGE HARBOR_URL -HARBOR_API \ No newline at end of file +HARBOR_API +ENABLE_SINGLE_QUEUE \ No newline at end of file diff --git a/config/default/envs.yaml b/config/default/envs.yaml index da121b7e..719272a4 100644 --- a/config/default/envs.yaml +++ b/config/default/envs.yaml @@ -35,4 +35,9 @@ spec: valueFrom: configMapKeyRef: name: overrides - key: HARBOR_API \ No newline at end of file + key: HARBOR_API + - name: ENABLE_SINGLE_QUEUE + valueFrom: + configMapKeyRef: + name: overrides + key: ENABLE_SINGLE_QUEUE \ No newline at end of file diff --git a/controller-test.sh b/controller-test.sh index 4bd0cfe1..ca0831c2 100755 --- a/controller-test.sh +++ b/controller-test.sh @@ -165,52 +165,55 @@ sleep 10 check_lagoon_build lagoon-build-${LBUILD} -echo "==> Trigger a lagoon build using rabbitmq" +echo "==> Trigger a lagoon build using rabbitmq to the single queue" echo ' { "properties":{ "delivery_mode":2 }, - "routing_key":"ci-local-controller-kubernetes:builddeploy", + "routing_key":"controller:ci-local-controller-kubernetes", "payload":"{ - \"metadata\": { - \"name\": \"lagoon-build-8m5zypx\" - }, - \"spec\": { - \"build\": { - \"ci\": \"true\", - \"type\": \"branch\" + \"eventType\": \"lagoon:build\", + \"payload\": { + \"metadata\": { + \"name\": \"lagoon-build-8m5zypx\" }, - \"gitReference\": \"origin\/main\", - \"project\": { - \"name\": \"nginx-example\", - \"environment\": \"main\", - \"uiLink\": \"https:\/\/dashboard.amazeeio.cloud\/projects\/project\/project-environment\/deployments\/lagoon-build-8m5zypx\", - \"routerPattern\": \"main-nginx-example\", - \"environmentType\": \"production\", - \"productionEnvironment\": \"main\", - \"standbyEnvironment\": \"\", - \"gitUrl\": \"https:\/\/github.com\/shreddedbacon\/lagoon-nginx-example.git\", - \"deployTarget\": \"kind\", - \"projectSecret\": \"4d6e7dd0f013a75d62a0680139fa82d350c2a1285f43f867535bad1143f228b1\", - \"key\": \"LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlDWFFJQkFBS0JnUUNjc1g2RG5KNXpNb0RqQ2R6a1JFOEg2TEh2TDQzaUhsekJLTWo4T1VNV05ZZG5YekdqCkR5Mkp1anQ3ZDNlMTVLeC8zOFo5UzJLdHNnVFVtWi9lUlRQSTdabE1idHRJK250UmtyblZLblBWNzhEeEFKNW8KTGZtQndmdWE2MnlVYnl0cnpYQ2pwVVJrQUlBMEZiR2VqS2Rvd3cxcnZGMzJoZFUzQ3ZIcG5rKzE2d0lEQVFBQgpBb0dCQUkrV0dyL1NDbVMzdCtIVkRPVGtMNk9vdVR6Y1QrRVFQNkVGbGIrRFhaV0JjZFhwSnB3c2NXZFBEK2poCkhnTEJUTTFWS3hkdnVEcEE4aW83cUlMTzJWYm1MeGpNWGk4TUdwY212dXJFNVJydTZTMXJzRDl2R0c5TGxoR3UKK0pUSmViMVdaZFduWFZ2am5LbExrWEV1eUthbXR2Z253Um5xNld5V05OazJ6SktoQWtFQThFenpxYnowcFVuTApLc241K2k0NUdoRGVpRTQvajRtamo1b1FHVzJxbUZWT2pHaHR1UGpaM2lwTis0RGlTRkFyMkl0b2VlK085d1pyCkRINHBkdU5YOFFKQkFLYnVOQ3dXK29sYXA4R2pUSk1TQjV1MW8wMVRHWFdFOGhVZG1leFBBdjl0cTBBT0gzUUQKUTIrM0RsaVY0ektoTlMra2xaSkVjNndzS0YyQmJIby81NXNDUVFETlBJd24vdERja3loSkJYVFJyc1RxZEZuOApCUWpZYVhBZTZEQ3o1eXg3S3ZFSmp1K1h1a01xTXV1ajBUSnpITFkySHVzK3FkSnJQVG9VMDNSS3JHV2hBa0JFCnB3aXI3Vk5pYy9jMFN2MnVLcWNZWWM1a2ViMnB1R0I3VUs1Q0lvaWdGakZzNmFJRDYyZXJwVVJ3S0V6RlFNbUgKNjQ5Y0ZXemhMVlA0aU1iZFREVHJBa0FFMTZXU1A3WXBWOHV1eFVGMGV0L3lFR3dURVpVU2R1OEppSTBHN0tqagpqcVR6RjQ3YkJZc0pIYTRYcWpVb2E3TXgwcS9FSUtRWkJ2NGFvQm42bGFOQwotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQ==\", - \"monitoring\": { - \"contact\": \"1234\", - \"statuspageID\": \"1234\" + \"spec\": { + \"build\": { + \"ci\": \"true\", + \"type\": \"branch\" }, - \"variables\": { - \"project\": \"W3sibmFtZSI6IkxBR09PTl9TWVNURU1fUk9VVEVSX1BBVFRFUk4iLCJ2YWx1ZSI6IiR7ZW52aXJvbm1lbnR9LiR7cHJvamVjdH0uZXhhbXBsZS5jb20iLCJzY29wZSI6ImludGVybmFsX3N5c3RlbSJ9XQ==\", - \"environment\": \"W10=\" + \"gitReference\": \"origin\/main\", + \"project\": { + \"name\": \"nginx-example\", + \"environment\": \"main\", + \"uiLink\": \"https:\/\/dashboard.amazeeio.cloud\/projects\/project\/project-environment\/deployments\/lagoon-build-8m5zypx\", + \"routerPattern\": \"main-nginx-example\", + \"environmentType\": \"production\", + \"productionEnvironment\": \"main\", + \"standbyEnvironment\": \"\", + \"gitUrl\": \"https:\/\/github.com\/shreddedbacon\/lagoon-nginx-example.git\", + \"deployTarget\": \"kind\", + \"projectSecret\": \"4d6e7dd0f013a75d62a0680139fa82d350c2a1285f43f867535bad1143f228b1\", + \"key\": \"LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlDWFFJQkFBS0JnUUNjc1g2RG5KNXpNb0RqQ2R6a1JFOEg2TEh2TDQzaUhsekJLTWo4T1VNV05ZZG5YekdqCkR5Mkp1anQ3ZDNlMTVLeC8zOFo5UzJLdHNnVFVtWi9lUlRQSTdabE1idHRJK250UmtyblZLblBWNzhEeEFKNW8KTGZtQndmdWE2MnlVYnl0cnpYQ2pwVVJrQUlBMEZiR2VqS2Rvd3cxcnZGMzJoZFUzQ3ZIcG5rKzE2d0lEQVFBQgpBb0dCQUkrV0dyL1NDbVMzdCtIVkRPVGtMNk9vdVR6Y1QrRVFQNkVGbGIrRFhaV0JjZFhwSnB3c2NXZFBEK2poCkhnTEJUTTFWS3hkdnVEcEE4aW83cUlMTzJWYm1MeGpNWGk4TUdwY212dXJFNVJydTZTMXJzRDl2R0c5TGxoR3UKK0pUSmViMVdaZFduWFZ2am5LbExrWEV1eUthbXR2Z253Um5xNld5V05OazJ6SktoQWtFQThFenpxYnowcFVuTApLc241K2k0NUdoRGVpRTQvajRtamo1b1FHVzJxbUZWT2pHaHR1UGpaM2lwTis0RGlTRkFyMkl0b2VlK085d1pyCkRINHBkdU5YOFFKQkFLYnVOQ3dXK29sYXA4R2pUSk1TQjV1MW8wMVRHWFdFOGhVZG1leFBBdjl0cTBBT0gzUUQKUTIrM0RsaVY0ektoTlMra2xaSkVjNndzS0YyQmJIby81NXNDUVFETlBJd24vdERja3loSkJYVFJyc1RxZEZuOApCUWpZYVhBZTZEQ3o1eXg3S3ZFSmp1K1h1a01xTXV1ajBUSnpITFkySHVzK3FkSnJQVG9VMDNSS3JHV2hBa0JFCnB3aXI3Vk5pYy9jMFN2MnVLcWNZWWM1a2ViMnB1R0I3VUs1Q0lvaWdGakZzNmFJRDYyZXJwVVJ3S0V6RlFNbUgKNjQ5Y0ZXemhMVlA0aU1iZFREVHJBa0FFMTZXU1A3WXBWOHV1eFVGMGV0L3lFR3dURVpVU2R1OEppSTBHN0tqagpqcVR6RjQ3YkJZc0pIYTRYcWpVb2E3TXgwcS9FSUtRWkJ2NGFvQm42bGFOQwotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQ==\", + \"monitoring\": { + \"contact\": \"1234\", + \"statuspageID\": \"1234\" + }, + \"variables\": { + \"project\": \"W3sibmFtZSI6IkxBR09PTl9TWVNURU1fUk9VVEVSX1BBVFRFUk4iLCJ2YWx1ZSI6IiR7ZW52aXJvbm1lbnR9LiR7cHJvamVjdH0uZXhhbXBsZS5jb20iLCJzY29wZSI6ImludGVybmFsX3N5c3RlbSJ9XQ==\", + \"environment\": \"W10=\" + } + }, + \"branch\": { + \"name\": \"main\" } - }, - \"branch\": { - \"name\": \"main\" } } }", "payload_encoding":"string" }' >payload.json -curl -s -u guest:guest -H "Accept: application/json" -H "Content-Type:application/json" -X POST -d @payload.json http://172.17.0.1:15672/api/exchanges/%2f/lagoon-tasks/publish +curl -s -u guest:guest -H "Accept: application/json" -H "Content-Type:application/json" -X POST -d @payload.json http://172.17.0.1:15672/api/exchanges/%2f/lagoon-controller/publish echo "" sleep 10 check_lagoon_build lagoon-build-${LBUILD2} @@ -268,17 +271,24 @@ kubectl logs $(kubectl get pods -n ${CONTROLLER_NAMESPACE} --no-headers | awk ' echo "==> Delete the environment" echo ' -{"properties":{"delivery_mode":2},"routing_key":"ci-local-controller-kubernetes:remove", +{ + "properties":{ + "delivery_mode":2 + }, + "routing_key":"controller:ci-local-controller-kubernetes", "payload":"{ - \"projectName\": \"nginx-example\", - \"type\":\"branch\", - \"forceDeleteProductionEnvironment\":true, - \"branch\":\"main\", - \"openshiftProjectName\":\"nginx-example-main\" + \"eventType\": \"lagoon:removal\", + \"payload\": { + \"projectName\": \"nginx-example\", + \"type\":\"branch\", + \"forceDeleteProductionEnvironment\":true, + \"branch\":\"main\", + \"openshiftProjectName\":\"nginx-example-main\" + } }", "payload_encoding":"string" }' >payload.json -curl -s -u guest:guest -H "Accept: application/json" -H "Content-Type:application/json" -X POST -d @payload.json http://172.17.0.1:15672/api/exchanges/%2f/lagoon-tasks/publish +curl -s -u guest:guest -H "Accept: application/json" -H "Content-Type:application/json" -X POST -d @payload.json http://172.17.0.1:15672/api/exchanges/%2f/lagoon-controller/publish echo "" CHECK_COUNTER=1 until $(kubectl logs $(kubectl get pods -n ${CONTROLLER_NAMESPACE} --no-headers | awk '{print $1}') -c manager -n ${CONTROLLER_NAMESPACE} | grep -q "Deleted namespace nginx-example-main for project nginx-example, environment main") diff --git a/internal/messenger/messenger.go b/internal/messenger/messenger.go index 2a5b5648..9771c8a6 100644 --- a/internal/messenger/messenger.go +++ b/internal/messenger/messenger.go @@ -63,6 +63,7 @@ func New(config mq.Config, AdvancedTaskSSHKeyInjection: advancedTaskSSHKeyInjection, AdvancedTaskDeployTokenInjection: advancedTaskDeployTokenInjection, DeletionHandler: deletionHandler, + EnableSingleQueue: enableSingleQueue, EnableDebug: enableDebug, } } diff --git a/main.go b/main.go index 23406808..d6d1d03e 100644 --- a/main.go +++ b/main.go @@ -362,6 +362,7 @@ func main() { mqUser = helpers.GetEnv("RABBITMQ_USERNAME", mqUser) mqPass = helpers.GetEnv("RABBITMQ_PASSWORD", mqPass) mqHost = helpers.GetEnv("RABBITMQ_HOSTNAME", mqHost) + enableSingleQueue = helpers.GetEnvBool("ENABLE_SINGLE_QUEUE", enableSingleQueue) lagoonTargetName = helpers.GetEnv("LAGOON_TARGET_NAME", lagoonTargetName) lagoonAppID = helpers.GetEnv("LAGOON_APP_ID", lagoonAppID) pendingMessageCron = helpers.GetEnv("PENDING_MESSAGE_CRON", pendingMessageCron) From 4ef498b097e05d27b12a87b5feab2695dddb46fc Mon Sep 17 00:00:00 2001 From: shreddedbacon Date: Wed, 2 Nov 2022 18:52:25 +1100 Subject: [PATCH 8/8] refactor: enable single queue by default so the controller listens to all queues --- .github/workflows/remote-controller.yaml | 1 - config/default/config.properties | 3 +-- config/default/envs.yaml | 7 +------ main.go | 2 +- 4 files changed, 3 insertions(+), 10 deletions(-) diff --git a/.github/workflows/remote-controller.yaml b/.github/workflows/remote-controller.yaml index e5d7fcf2..29638014 100644 --- a/.github/workflows/remote-controller.yaml +++ b/.github/workflows/remote-controller.yaml @@ -111,6 +111,5 @@ jobs: export HARBOR_API="http://harbor.$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[0].address}').nip.io:32080/api" export KIND_NODE_IP="$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[0].address}')" export HARBOR_VERSION=${{matrix.harbor}} - export ENABLE_SINGLE_QUEUE=true # export GO111MODULE=on make controller-test \ No newline at end of file diff --git a/config/default/config.properties b/config/default/config.properties index 959af732..8d18a393 100644 --- a/config/default/config.properties +++ b/config/default/config.properties @@ -1,4 +1,3 @@ OVERRIDE_BUILD_DEPLOY_DIND_IMAGE HARBOR_URL -HARBOR_API -ENABLE_SINGLE_QUEUE \ No newline at end of file +HARBOR_API \ No newline at end of file diff --git a/config/default/envs.yaml b/config/default/envs.yaml index 719272a4..da121b7e 100644 --- a/config/default/envs.yaml +++ b/config/default/envs.yaml @@ -35,9 +35,4 @@ spec: valueFrom: configMapKeyRef: name: overrides - key: HARBOR_API - - name: ENABLE_SINGLE_QUEUE - valueFrom: - configMapKeyRef: - name: overrides - key: ENABLE_SINGLE_QUEUE \ No newline at end of file + key: HARBOR_API \ No newline at end of file diff --git a/main.go b/main.go index d6d1d03e..ec32f35e 100644 --- a/main.go +++ b/main.go @@ -183,7 +183,7 @@ func main() { "The number of workers to start with.") flag.IntVar(&rabbitRetryInterval, "rabbitmq-retry-interval", 30, "The retry interval for rabbitmq.") - flag.BoolVar(&enableSingleQueue, "enable-single-queue", false, "Flag to have this controller use the single queue option.") + flag.BoolVar(&enableSingleQueue, "enable-single-queue", true, "Flag to have this controller use the single queue option.") flag.StringVar(&leaderElectionID, "leader-election-id", "lagoon-builddeploy-leader-election-helper", "The ID to use for leader election.") flag.StringVar(&pendingMessageCron, "pending-message-cron", "15,45 * * * *",