diff --git a/controller-test.sh b/controller-test.sh index 4bd0cfe1..ca0831c2 100755 --- a/controller-test.sh +++ b/controller-test.sh @@ -165,52 +165,55 @@ sleep 10 check_lagoon_build lagoon-build-${LBUILD} -echo "==> Trigger a lagoon build using rabbitmq" +echo "==> Trigger a lagoon build using rabbitmq to the single queue" echo ' { "properties":{ "delivery_mode":2 }, - "routing_key":"ci-local-controller-kubernetes:builddeploy", + "routing_key":"controller:ci-local-controller-kubernetes", "payload":"{ - \"metadata\": { - \"name\": \"lagoon-build-8m5zypx\" - }, - \"spec\": { - \"build\": { - \"ci\": \"true\", - \"type\": \"branch\" + \"eventType\": \"lagoon:build\", + \"payload\": { + \"metadata\": { + \"name\": \"lagoon-build-8m5zypx\" }, - \"gitReference\": \"origin\/main\", - \"project\": { - \"name\": \"nginx-example\", - \"environment\": \"main\", - \"uiLink\": \"https:\/\/dashboard.amazeeio.cloud\/projects\/project\/project-environment\/deployments\/lagoon-build-8m5zypx\", - \"routerPattern\": \"main-nginx-example\", - \"environmentType\": \"production\", - \"productionEnvironment\": \"main\", - \"standbyEnvironment\": \"\", - \"gitUrl\": \"https:\/\/github.com\/shreddedbacon\/lagoon-nginx-example.git\", - \"deployTarget\": \"kind\", - \"projectSecret\": \"4d6e7dd0f013a75d62a0680139fa82d350c2a1285f43f867535bad1143f228b1\", - \"key\": \"LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlDWFFJQkFBS0JnUUNjc1g2RG5KNXpNb0RqQ2R6a1JFOEg2TEh2TDQzaUhsekJLTWo4T1VNV05ZZG5YekdqCkR5Mkp1anQ3ZDNlMTVLeC8zOFo5UzJLdHNnVFVtWi9lUlRQSTdabE1idHRJK250UmtyblZLblBWNzhEeEFKNW8KTGZtQndmdWE2MnlVYnl0cnpYQ2pwVVJrQUlBMEZiR2VqS2Rvd3cxcnZGMzJoZFUzQ3ZIcG5rKzE2d0lEQVFBQgpBb0dCQUkrV0dyL1NDbVMzdCtIVkRPVGtMNk9vdVR6Y1QrRVFQNkVGbGIrRFhaV0JjZFhwSnB3c2NXZFBEK2poCkhnTEJUTTFWS3hkdnVEcEE4aW83cUlMTzJWYm1MeGpNWGk4TUdwY212dXJFNVJydTZTMXJzRDl2R0c5TGxoR3UKK0pUSmViMVdaZFduWFZ2am5LbExrWEV1eUthbXR2Z253Um5xNld5V05OazJ6SktoQWtFQThFenpxYnowcFVuTApLc241K2k0NUdoRGVpRTQvajRtamo1b1FHVzJxbUZWT2pHaHR1UGpaM2lwTis0RGlTRkFyMkl0b2VlK085d1pyCkRINHBkdU5YOFFKQkFLYnVOQ3dXK29sYXA4R2pUSk1TQjV1MW8wMVRHWFdFOGhVZG1leFBBdjl0cTBBT0gzUUQKUTIrM0RsaVY0ektoTlMra2xaSkVjNndzS0YyQmJIby81NXNDUVFETlBJd24vdERja3loSkJYVFJyc1RxZEZuOApCUWpZYVhBZTZEQ3o1eXg3S3ZFSmp1K1h1a01xTXV1ajBUSnpITFkySHVzK3FkSnJQVG9VMDNSS3JHV2hBa0JFCnB3aXI3Vk5pYy9jMFN2MnVLcWNZWWM1a2ViMnB1R0I3VUs1Q0lvaWdGakZzNmFJRDYyZXJwVVJ3S0V6RlFNbUgKNjQ5Y0ZXemhMVlA0aU1iZFREVHJBa0FFMTZXU1A3WXBWOHV1eFVGMGV0L3lFR3dURVpVU2R1OEppSTBHN0tqagpqcVR6RjQ3YkJZc0pIYTRYcWpVb2E3TXgwcS9FSUtRWkJ2NGFvQm42bGFOQwotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQ==\", - \"monitoring\": { - \"contact\": \"1234\", - \"statuspageID\": \"1234\" + \"spec\": { + \"build\": { + \"ci\": \"true\", + \"type\": \"branch\" }, - \"variables\": { - \"project\": \"W3sibmFtZSI6IkxBR09PTl9TWVNURU1fUk9VVEVSX1BBVFRFUk4iLCJ2YWx1ZSI6IiR7ZW52aXJvbm1lbnR9LiR7cHJvamVjdH0uZXhhbXBsZS5jb20iLCJzY29wZSI6ImludGVybmFsX3N5c3RlbSJ9XQ==\", - \"environment\": \"W10=\" + \"gitReference\": \"origin\/main\", + \"project\": { + \"name\": \"nginx-example\", + \"environment\": \"main\", + \"uiLink\": \"https:\/\/dashboard.amazeeio.cloud\/projects\/project\/project-environment\/deployments\/lagoon-build-8m5zypx\", + \"routerPattern\": \"main-nginx-example\", + \"environmentType\": \"production\", + \"productionEnvironment\": \"main\", + \"standbyEnvironment\": \"\", + \"gitUrl\": \"https:\/\/github.com\/shreddedbacon\/lagoon-nginx-example.git\", + \"deployTarget\": \"kind\", + \"projectSecret\": \"4d6e7dd0f013a75d62a0680139fa82d350c2a1285f43f867535bad1143f228b1\", + \"key\": \"LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlDWFFJQkFBS0JnUUNjc1g2RG5KNXpNb0RqQ2R6a1JFOEg2TEh2TDQzaUhsekJLTWo4T1VNV05ZZG5YekdqCkR5Mkp1anQ3ZDNlMTVLeC8zOFo5UzJLdHNnVFVtWi9lUlRQSTdabE1idHRJK250UmtyblZLblBWNzhEeEFKNW8KTGZtQndmdWE2MnlVYnl0cnpYQ2pwVVJrQUlBMEZiR2VqS2Rvd3cxcnZGMzJoZFUzQ3ZIcG5rKzE2d0lEQVFBQgpBb0dCQUkrV0dyL1NDbVMzdCtIVkRPVGtMNk9vdVR6Y1QrRVFQNkVGbGIrRFhaV0JjZFhwSnB3c2NXZFBEK2poCkhnTEJUTTFWS3hkdnVEcEE4aW83cUlMTzJWYm1MeGpNWGk4TUdwY212dXJFNVJydTZTMXJzRDl2R0c5TGxoR3UKK0pUSmViMVdaZFduWFZ2am5LbExrWEV1eUthbXR2Z253Um5xNld5V05OazJ6SktoQWtFQThFenpxYnowcFVuTApLc241K2k0NUdoRGVpRTQvajRtamo1b1FHVzJxbUZWT2pHaHR1UGpaM2lwTis0RGlTRkFyMkl0b2VlK085d1pyCkRINHBkdU5YOFFKQkFLYnVOQ3dXK29sYXA4R2pUSk1TQjV1MW8wMVRHWFdFOGhVZG1leFBBdjl0cTBBT0gzUUQKUTIrM0RsaVY0ektoTlMra2xaSkVjNndzS0YyQmJIby81NXNDUVFETlBJd24vdERja3loSkJYVFJyc1RxZEZuOApCUWpZYVhBZTZEQ3o1eXg3S3ZFSmp1K1h1a01xTXV1ajBUSnpITFkySHVzK3FkSnJQVG9VMDNSS3JHV2hBa0JFCnB3aXI3Vk5pYy9jMFN2MnVLcWNZWWM1a2ViMnB1R0I3VUs1Q0lvaWdGakZzNmFJRDYyZXJwVVJ3S0V6RlFNbUgKNjQ5Y0ZXemhMVlA0aU1iZFREVHJBa0FFMTZXU1A3WXBWOHV1eFVGMGV0L3lFR3dURVpVU2R1OEppSTBHN0tqagpqcVR6RjQ3YkJZc0pIYTRYcWpVb2E3TXgwcS9FSUtRWkJ2NGFvQm42bGFOQwotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQ==\", + \"monitoring\": { + \"contact\": \"1234\", + \"statuspageID\": \"1234\" + }, + \"variables\": { + \"project\": \"W3sibmFtZSI6IkxBR09PTl9TWVNURU1fUk9VVEVSX1BBVFRFUk4iLCJ2YWx1ZSI6IiR7ZW52aXJvbm1lbnR9LiR7cHJvamVjdH0uZXhhbXBsZS5jb20iLCJzY29wZSI6ImludGVybmFsX3N5c3RlbSJ9XQ==\", + \"environment\": \"W10=\" + } + }, + \"branch\": { + \"name\": \"main\" } - }, - \"branch\": { - \"name\": \"main\" } } }", "payload_encoding":"string" }' >payload.json -curl -s -u guest:guest -H "Accept: application/json" -H "Content-Type:application/json" -X POST -d @payload.json http://172.17.0.1:15672/api/exchanges/%2f/lagoon-tasks/publish +curl -s -u guest:guest -H "Accept: application/json" -H "Content-Type:application/json" -X POST -d @payload.json http://172.17.0.1:15672/api/exchanges/%2f/lagoon-controller/publish echo "" sleep 10 check_lagoon_build lagoon-build-${LBUILD2} @@ -268,17 +271,24 @@ kubectl logs $(kubectl get pods -n ${CONTROLLER_NAMESPACE} --no-headers | awk ' echo "==> Delete the environment" echo ' -{"properties":{"delivery_mode":2},"routing_key":"ci-local-controller-kubernetes:remove", +{ + "properties":{ + "delivery_mode":2 + }, + "routing_key":"controller:ci-local-controller-kubernetes", "payload":"{ - \"projectName\": \"nginx-example\", - \"type\":\"branch\", - \"forceDeleteProductionEnvironment\":true, - \"branch\":\"main\", - \"openshiftProjectName\":\"nginx-example-main\" + \"eventType\": \"lagoon:removal\", + \"payload\": { + \"projectName\": \"nginx-example\", + \"type\":\"branch\", + \"forceDeleteProductionEnvironment\":true, + \"branch\":\"main\", + \"openshiftProjectName\":\"nginx-example-main\" + } }", "payload_encoding":"string" }' >payload.json -curl -s -u guest:guest -H "Accept: application/json" -H "Content-Type:application/json" -X POST -d @payload.json http://172.17.0.1:15672/api/exchanges/%2f/lagoon-tasks/publish +curl -s -u guest:guest -H "Accept: application/json" -H "Content-Type:application/json" -X POST -d @payload.json http://172.17.0.1:15672/api/exchanges/%2f/lagoon-controller/publish echo "" CHECK_COUNTER=1 until $(kubectl logs $(kubectl get pods -n ${CONTROLLER_NAMESPACE} --no-headers | awk '{print $1}') -c manager -n ${CONTROLLER_NAMESPACE} | grep -q "Deleted namespace nginx-example-main for project nginx-example, environment main") diff --git a/internal/messenger/consumer.go b/internal/messenger/consumer.go index 4251cb65..7f8df610 100644 --- a/internal/messenger/consumer.go +++ b/internal/messenger/consumer.go @@ -2,18 +2,12 @@ package messenger import ( "context" - "encoding/json" "fmt" "log" - "strings" "time" "github.com/cheshir/go-mq" - lagoonv1beta1 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta1" - "github.com/uselagoon/remote-controller/internal/helpers" "gopkg.in/matryer/try.v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" ) @@ -61,38 +55,29 @@ func (m *Messenger) Consumer(targetName string) { //error { forever := make(chan bool) + // if this controller is set up for single queue only, then only start the single queue listener + if m.EnableSingleQueue { + // Handle any tasks that go to the `lagoon-controller` queue + opLog.Info(fmt.Sprintf("Listening for lagoon-controller:%s", targetName)) + err = messageQueue.SetConsumerHandler("controller-queue", func(message mq.Message) { + if err == nil { + if err := m.handleLagoonEvent(ctx, opLog, message.Body()); err != nil { + //@TODO: send msg back to lagoon and update task to failed? + message.Ack(false) // ack to remove from queue + return + } + } + message.Ack(false) // ack to remove from queue + }) + if err != nil { + log.Fatalf(fmt.Sprintf("Failed to set handler to consumer `%s`: %v", "builddeploy-queue", err)) + } + } // Handle any tasks that go to the `builddeploy` queue - opLog.Info("Listening for lagoon-tasks:" + targetName + ":builddeploy") + opLog.Info(fmt.Sprintf("Listening for lagoon-tasks:%s:builddeploy", targetName)) err = messageQueue.SetConsumerHandler("builddeploy-queue", func(message mq.Message) { if err == nil { - // unmarshal the body into a lagoonbuild - newBuild := &lagoonv1beta1.LagoonBuild{} - json.Unmarshal(message.Body(), newBuild) - // new builds that come in should initially get created in the controllers own - // namespace before being handled and re-created in the correct namespace - // so set the controller namespace to the build namespace here - newBuild.ObjectMeta.Namespace = m.ControllerNamespace - newBuild.SetLabels( - map[string]string{ - "lagoon.sh/controller": m.ControllerNamespace, - }, - ) - opLog.Info( - fmt.Sprintf( - "Received builddeploy task for project %s, environment %s", - newBuild.Spec.Project.Name, - newBuild.Spec.Project.Environment, - ), - ) - // create it now - if err := m.Client.Create(ctx, newBuild); err != nil { - opLog.Error(err, - fmt.Sprintf( - "Failed to create builddeploy task for project %s, environment %s", - newBuild.Spec.Project.Name, - newBuild.Spec.Project.Environment, - ), - ) + if err := m.handleBuildEvent(ctx, opLog, message.Body()); err != nil { //@TODO: send msg back to lagoon and update task to failed? message.Ack(false) // ack to remove from queue return @@ -105,122 +90,14 @@ func (m *Messenger) Consumer(targetName string) { //error { } // Handle any tasks that go to the `remove` queue - opLog.Info("Listening for lagoon-tasks:" + targetName + ":remove") + opLog.Info(fmt.Sprintf("Listening for lagoon-tasks:%s:remove", targetName)) err = messageQueue.SetConsumerHandler("remove-queue", func(message mq.Message) { if err == nil { - // unmarshall the message into a remove task to be processed - removeTask := &removeTask{} - json.Unmarshal(message.Body(), removeTask) - // webhooks2tasks sends the `branch` field, but deletion from the API (UI/CLI) does not - // the tasks system crafts a field `branchName` which is passed through - // since webhooks2tasks uses the same underlying mechanism, we can still consume branchName even if branch is populated - if removeTask.Type == "pullrequest" { - removeTask.Branch = removeTask.BranchName - } - // generate the namespace name from the branch and project and any prefixes that the controller may add - ns := helpers.GenerateNamespaceName( - removeTask.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller - removeTask.Branch, - removeTask.ProjectName, - m.NamespacePrefix, - m.ControllerNamespace, - m.RandomNamespacePrefix, - ) - branch := removeTask.Branch - project := removeTask.ProjectName - opLog.WithName("RemoveTask").Info( - fmt.Sprintf( - "Received remove task for project %s, branch %s - %s", - project, - branch, - ns, - ), - ) - namespace := &corev1.Namespace{} - err := m.Client.Get(ctx, types.NamespacedName{ - Name: ns, - }, namespace) - if err != nil { - if strings.Contains(err.Error(), "not found") { - opLog.WithName("RemoveTask").Info( - fmt.Sprintf( - "Namespace %s for project %s, branch %s does not exist, marking deleted", - ns, - project, - branch, - ), - ) - msg := lagoonv1beta1.LagoonMessage{ - Type: "remove", - Namespace: ns, - Meta: &lagoonv1beta1.LagoonLogMeta{ - Project: project, - Environment: branch, - }, - } - msgBytes, _ := json.Marshal(msg) - m.Publish("lagoon-tasks:controller", msgBytes) - } else { - opLog.WithName("RemoveTask").Info( - fmt.Sprintf( - "Unable to get namespace %s for project %s, branch %s: %v", - ns, - project, - branch, - err, - ), - ) - } + if err := m.handleRemovalEvent(ctx, opLog, message.Body()); err != nil { //@TODO: send msg back to lagoon and update task to failed? message.Ack(false) // ack to remove from queue return - } - // check that the namespace selected for deletion is owned by this controller - if value, ok := namespace.ObjectMeta.Labels["lagoon.sh/controller"]; ok { - if value == m.ControllerNamespace { - // spawn the deletion process for this namespace - go func() { - err := m.DeletionHandler.ProcessDeletion(ctx, opLog, namespace) - if err == nil { - msg := lagoonv1beta1.LagoonMessage{ - Type: "remove", - Namespace: namespace.ObjectMeta.Name, - Meta: &lagoonv1beta1.LagoonLogMeta{ - Project: project, - Environment: branch, - }, - } - msgBytes, _ := json.Marshal(msg) - m.Publish("lagoon-tasks:controller", msgBytes) - } - }() - message.Ack(false) // ack to remove from queue - return - } - // controller label didn't match, log the message - opLog.WithName("RemoveTask").Info( - fmt.Sprintf( - "Selected namespace %s for project %s, branch %s: %v", - ns, - project, - branch, - fmt.Errorf("The controller label value %s does not match %s for this namespace", value, m.ControllerNamespace), - ), - ) - message.Ack(false) // ack to remove from queue - return - } - // controller label didn't match, log the message - opLog.WithName("RemoveTask").Info( - fmt.Sprintf( - "Selected namespace %s for project %s, branch %s: %v", - ns, - project, - branch, - fmt.Errorf("The controller ownership label does not exist on this namespace, nothing will be done for this removal request"), - ), - ) } message.Ack(false) // ack to remove from queue }) @@ -229,51 +106,10 @@ func (m *Messenger) Consumer(targetName string) { //error { } // Handle any tasks that go to the `jobs` queue - opLog.Info("Listening for lagoon-tasks:" + targetName + ":jobs") + opLog.Info(fmt.Sprintf("Listening for lagoon-tasks:%s:jobs", targetName)) err = messageQueue.SetConsumerHandler("jobs-queue", func(message mq.Message) { if err == nil { - // unmarshall the message into a remove task to be processed - jobSpec := &lagoonv1beta1.LagoonTaskSpec{} - json.Unmarshal(message.Body(), jobSpec) - namespace := helpers.GenerateNamespaceName( - jobSpec.Project.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller - jobSpec.Environment.Name, - jobSpec.Project.Name, - m.NamespacePrefix, - m.ControllerNamespace, - m.RandomNamespacePrefix, - ) - opLog.Info( - fmt.Sprintf( - "Received task for project %s, environment %s - %s", - jobSpec.Project.Name, - jobSpec.Environment.Name, - namespace, - ), - ) - job := &lagoonv1beta1.LagoonTask{} - job.Spec = *jobSpec - // set the namespace to the `openshiftProjectName` from the environment - job.ObjectMeta.Namespace = namespace - job.SetLabels( - map[string]string{ - "lagoon.sh/taskType": string(lagoonv1beta1.TaskTypeStandard), - "lagoon.sh/taskStatus": string(lagoonv1beta1.TaskStatusPending), - "lagoon.sh/controller": m.ControllerNamespace, - }, - ) - job.ObjectMeta.Name = fmt.Sprintf("lagoon-task-%s-%s", job.Spec.Task.ID, helpers.HashString(job.Spec.Task.ID)[0:6]) - if job.Spec.Task.TaskName != "" { - job.ObjectMeta.Name = job.Spec.Task.TaskName - } - if err := m.Client.Create(ctx, job); err != nil { - opLog.Error(err, - fmt.Sprintf( - "Unable to create job task for project %s, environment %s", - job.Spec.Project.Name, - job.Spec.Environment.Name, - ), - ) + if err := m.handleTaskEvent(ctx, opLog, message.Body()); err != nil { //@TODO: send msg back to lagoon and update task to failed? message.Ack(false) // ack to remove from queue return @@ -286,114 +122,13 @@ func (m *Messenger) Consumer(targetName string) { //error { } // Handle any tasks that go to the `misc` queue - opLog.Info("Listening for lagoon-tasks:" + targetName + ":misc") + opLog.Info(fmt.Sprintf("Listening for lagoon-tasks:%s:misc", targetName)) err = messageQueue.SetConsumerHandler("misc-queue", func(message mq.Message) { if err == nil { - opLog := ctrl.Log.WithName("handlers").WithName("LagoonTasks") - // unmarshall the message into a remove task to be processed - jobSpec := &lagoonv1beta1.LagoonTaskSpec{} - json.Unmarshal(message.Body(), jobSpec) - // check which key has been received - namespace := helpers.GenerateNamespaceName( - jobSpec.Project.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller - jobSpec.Environment.Name, - jobSpec.Project.Name, - m.NamespacePrefix, - m.ControllerNamespace, - m.RandomNamespacePrefix, - ) - switch jobSpec.Key { - case "kubernetes:build:cancel": - opLog.Info( - fmt.Sprintf( - "Received build cancellation for project %s, environment %s - %s", - jobSpec.Project.Name, - jobSpec.Environment.Name, - namespace, - ), - ) - err := m.CancelBuild(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - case "kubernetes:task:cancel": - opLog.Info( - fmt.Sprintf( - "Received task cancellation for project %s, environment %s - %s", - jobSpec.Project.Name, - jobSpec.Environment.Name, - namespace, - ), - ) - err := m.CancelTask(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - case "kubernetes:restic:backup:restore": - opLog.Info( - fmt.Sprintf( - "Received backup restoration for project %s, environment %s", - jobSpec.Project.Name, - jobSpec.Environment.Name, - ), - ) - err := m.ResticRestore(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - case "kubernetes:route:migrate": - opLog.Info( - fmt.Sprintf( - "Received ingress migration for project %s", - jobSpec.Project.Name, - ), - ) - err := m.IngressRouteMigration(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - case "openshift:route:migrate": - opLog.Info( - fmt.Sprintf( - "Received route migration for project %s", - jobSpec.Project.Name, - ), - ) - err := m.IngressRouteMigration(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - case "kubernetes:task:advanced": - opLog.Info( - fmt.Sprintf( - "Received advanced task for project %s", - jobSpec.Project.Name, - ), - ) - err := m.AdvancedTask(namespace, jobSpec) - if err != nil { - //@TODO: send msg back to lagoon and update task to failed? - message.Ack(false) // ack to remove from queue - return - } - default: - // if we get something that we don't know about, spit out the entire message - opLog.Info( - fmt.Sprintf( - "Received unknown message: %s", - string(message.Body()), - ), - ) + if err := m.handleMiscEvent(ctx, opLog, message.Body()); err != nil { + //@TODO: send msg back to lagoon and update task to failed? + message.Ack(false) // ack to remove from queue + return } } message.Ack(false) // ack to remove from queue diff --git a/internal/messenger/events.go b/internal/messenger/events.go new file mode 100644 index 00000000..95695a0a --- /dev/null +++ b/internal/messenger/events.go @@ -0,0 +1,336 @@ +package messenger + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/go-logr/logr" + lagoonv1beta1 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta1" + "github.com/uselagoon/remote-controller/internal/helpers" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +// LagoonEvent defines a Lagoon event type +type LagoonEvent struct { + EventType string `json:"eventType"` + Payload interface{} `json:"payload"` +} + +const ( + lagoonBuild = "lagoon:build" + lagoonTask = "lagoon:task" + lagoonMisc = "lagoon:misc" + lagoonRemval = "lagoon:removal" +) + +func (m *Messenger) handleLagoonEvent(ctx context.Context, opLog logr.Logger, body []byte) error { + // unmarshal the body of the message into a lagoonevent + lEvent := &LagoonEvent{} + err := json.Unmarshal(body, lEvent) + if err != nil { + return err + } + + // turn the payload back into bytes to be processed by the handlers + payloadBytes, err := json.Marshal(lEvent.Payload) + if err != nil { + return err + } + switch lEvent.EventType { + case lagoonBuild: + m.handleBuildEvent(ctx, opLog, payloadBytes) + case lagoonTask: + m.handleTaskEvent(ctx, opLog, payloadBytes) + case lagoonMisc: + m.handleMiscEvent(ctx, opLog, payloadBytes) + case lagoonRemval: + m.handleRemovalEvent(ctx, opLog, payloadBytes) + } + return nil +} + +func (m *Messenger) handleBuildEvent(ctx context.Context, opLog logr.Logger, payload []byte) error { + // unmarshal the body into a lagoonbuild + newBuild := &lagoonv1beta1.LagoonBuild{} + json.Unmarshal(payload, newBuild) + // new builds that come in should initially get created in the controllers own + // namespace before being handled and re-created in the correct namespace + // so set the controller namespace to the build namespace here + newBuild.ObjectMeta.Namespace = m.ControllerNamespace + newBuild.SetLabels( + map[string]string{ + "lagoon.sh/controller": m.ControllerNamespace, + }, + ) + opLog.Info( + fmt.Sprintf( + "Received builddeploy task for project %s, environment %s", + newBuild.Spec.Project.Name, + newBuild.Spec.Project.Environment, + ), + ) + // create it now + if err := m.Client.Create(ctx, newBuild); err != nil { + opLog.Error(err, + fmt.Sprintf( + "Failed to create builddeploy task for project %s, environment %s", + newBuild.Spec.Project.Name, + newBuild.Spec.Project.Environment, + ), + ) + //@TODO: send msg back to lagoon and update task to failed? + return err + } + return nil +} + +func (m *Messenger) handleTaskEvent(ctx context.Context, opLog logr.Logger, payload []byte) error { + // unmarshall the message into a remove task to be processed + jobSpec := &lagoonv1beta1.LagoonTaskSpec{} + json.Unmarshal(payload, jobSpec) + namespace := helpers.GenerateNamespaceName( + jobSpec.Project.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller + jobSpec.Environment.Name, + jobSpec.Project.Name, + m.NamespacePrefix, + m.ControllerNamespace, + m.RandomNamespacePrefix, + ) + opLog.Info( + fmt.Sprintf( + "Received task for project %s, environment %s - %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + namespace, + ), + ) + job := &lagoonv1beta1.LagoonTask{} + job.Spec = *jobSpec + // set the namespace to the `openshiftProjectName` from the environment + job.ObjectMeta.Namespace = namespace + job.SetLabels( + map[string]string{ + "lagoon.sh/taskType": string(lagoonv1beta1.TaskTypeStandard), + "lagoon.sh/taskStatus": string(lagoonv1beta1.TaskStatusPending), + "lagoon.sh/controller": m.ControllerNamespace, + }, + ) + job.ObjectMeta.Name = fmt.Sprintf("lagoon-task-%s-%s", job.Spec.Task.ID, helpers.HashString(job.Spec.Task.ID)[0:6]) + if job.Spec.Task.TaskName != "" { + job.ObjectMeta.Name = job.Spec.Task.TaskName + } + if err := m.Client.Create(ctx, job); err != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to create job task for project %s, environment %s", + job.Spec.Project.Name, + job.Spec.Environment.Name, + ), + ) + return err + } + return nil +} + +func (m *Messenger) handleMiscEvent(ctx context.Context, opLog logr.Logger, payload []byte) error { + // unmarshall the message into a remove task to be processed + jobSpec := &lagoonv1beta1.LagoonTaskSpec{} + json.Unmarshal(payload, jobSpec) + // check which key has been received + namespace := helpers.GenerateNamespaceName( + jobSpec.Project.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller + jobSpec.Environment.Name, + jobSpec.Project.Name, + m.NamespacePrefix, + m.ControllerNamespace, + m.RandomNamespacePrefix, + ) + switch jobSpec.Key { + case "kubernetes:build:cancel", "deploytarget:build:cancel": + opLog.Info( + fmt.Sprintf( + "Received build cancellation for project %s, environment %s - %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + namespace, + ), + ) + err := m.CancelBuild(namespace, jobSpec) + if err != nil { + return err + } + case "kubernetes:task:cancel", "deploytarget:task:cancel": + opLog.Info( + fmt.Sprintf( + "Received task cancellation for project %s, environment %s - %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + namespace, + ), + ) + err := m.CancelTask(namespace, jobSpec) + if err != nil { + return err + } + case "kubernetes:restic:backup:restore", "deploytarget:backup:restore": + opLog.Info( + fmt.Sprintf( + "Received backup restoration for project %s, environment %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + ), + ) + err := m.ResticRestore(namespace, jobSpec) + if err != nil { + return err + } + case "kubernetes:route:migrate", "deploytarget:ingress:migrate": + opLog.Info( + fmt.Sprintf( + "Received ingress migration for project %s", + jobSpec.Project.Name, + ), + ) + err := m.IngressRouteMigration(namespace, jobSpec) + if err != nil { + return err + } + case "kubernetes:task:advanced", "deploytarget:task:advanced": + opLog.Info( + fmt.Sprintf( + "Received advanced task for project %s", + jobSpec.Project.Name, + ), + ) + err := m.AdvancedTask(namespace, jobSpec) + if err != nil { + return err + } + default: + // if we get something that we don't know about, spit out the entire message + opLog.Info( + fmt.Sprintf( + "Received unknown message: %s", + string(payload), + ), + ) + } + return nil +} + +func (m *Messenger) handleRemovalEvent(ctx context.Context, opLog logr.Logger, payload []byte) error { + // unmarshall the message into a remove task to be processed + removeTask := &removeTask{} + json.Unmarshal(payload, removeTask) + // webhooks2tasks sends the `branch` field, but deletion from the API (UI/CLI) does not + // the tasks system crafts a field `branchName` which is passed through + // since webhooks2tasks uses the same underlying mechanism, we can still consume branchName even if branch is populated + if removeTask.Type == "pullrequest" { + removeTask.Branch = removeTask.BranchName + } + // generate the namespace name from the branch and project and any prefixes that the controller may add + ns := helpers.GenerateNamespaceName( + removeTask.NamespacePattern, // the namespace pattern or `openshiftProjectPattern` from Lagoon is never received by the controller + removeTask.Branch, + removeTask.ProjectName, + m.NamespacePrefix, + m.ControllerNamespace, + m.RandomNamespacePrefix, + ) + branch := removeTask.Branch + project := removeTask.ProjectName + opLog.WithName("RemoveTask").Info( + fmt.Sprintf( + "Received remove task for project %s, branch %s - %s", + project, + branch, + ns, + ), + ) + namespace := &corev1.Namespace{} + err := m.Client.Get(ctx, types.NamespacedName{ + Name: ns, + }, namespace) + if err != nil { + if strings.Contains(err.Error(), "not found") { + opLog.WithName("RemoveTask").Info( + fmt.Sprintf( + "Namespace %s for project %s, branch %s does not exist, marking deleted", + ns, + project, + branch, + ), + ) + msg := lagoonv1beta1.LagoonMessage{ + Type: "remove", + Namespace: ns, + Meta: &lagoonv1beta1.LagoonLogMeta{ + Project: project, + Environment: branch, + }, + } + msgBytes, _ := json.Marshal(msg) + m.Publish("lagoon-tasks:controller", msgBytes) + } else { + opLog.WithName("RemoveTask").Info( + fmt.Sprintf( + "Unable to get namespace %s for project %s, branch %s: %v", + ns, + project, + branch, + err, + ), + ) + } + //@TODO: send msg back to lagoon and update task to failed? + return nil + + } + // check that the namespace selected for deletion is owned by this controller + if value, ok := namespace.ObjectMeta.Labels["lagoon.sh/controller"]; ok { + if value == m.ControllerNamespace { + // spawn the deletion process for this namespace + go func() { + err := m.DeletionHandler.ProcessDeletion(ctx, opLog, namespace) + if err == nil { + msg := lagoonv1beta1.LagoonMessage{ + Type: "remove", + Namespace: namespace.ObjectMeta.Name, + Meta: &lagoonv1beta1.LagoonLogMeta{ + Project: project, + Environment: branch, + }, + } + msgBytes, _ := json.Marshal(msg) + m.Publish("lagoon-tasks:controller", msgBytes) + } + }() + return nil + } + // controller label didn't match, log the message + opLog.WithName("RemoveTask").Info( + fmt.Sprintf( + "Selected namespace %s for project %s, branch %s: %v", + ns, + project, + branch, + fmt.Errorf("The controller label value %s does not match %s for this namespace", value, m.ControllerNamespace), + ), + ) + return nil + } + // controller label didn't match, log the message + opLog.WithName("RemoveTask").Info( + fmt.Sprintf( + "Selected namespace %s for project %s, branch %s: %v", + ns, + project, + branch, + fmt.Errorf("The controller ownership label does not exist on this namespace, nothing will be done for this removal request"), + ), + ) + return nil +} diff --git a/internal/messenger/messenger.go b/internal/messenger/messenger.go index 61b4e43f..9771c8a6 100644 --- a/internal/messenger/messenger.go +++ b/internal/messenger/messenger.go @@ -34,6 +34,7 @@ type Messenger struct { AdvancedTaskSSHKeyInjection bool AdvancedTaskDeployTokenInjection bool DeletionHandler *deletions.Deletions + EnableSingleQueue bool EnableDebug bool } @@ -47,6 +48,7 @@ func New(config mq.Config, randomNamespacePrefix, advancedTaskSSHKeyInjection bool, advancedTaskDeployTokenInjection bool, + enableSingleQueue bool, deletionHandler *deletions.Deletions, enableDebug bool, ) *Messenger { @@ -61,6 +63,7 @@ func New(config mq.Config, AdvancedTaskSSHKeyInjection: advancedTaskSSHKeyInjection, AdvancedTaskDeployTokenInjection: advancedTaskDeployTokenInjection, DeletionHandler: deletionHandler, + EnableSingleQueue: enableSingleQueue, EnableDebug: enableDebug, } } diff --git a/main.go b/main.go index cc85ba57..ec32f35e 100644 --- a/main.go +++ b/main.go @@ -80,6 +80,7 @@ func main() { var pendingMessageCron string var mqWorkers int var rabbitRetryInterval int + var enableSingleQueue bool var startupConnectionAttempts int var startupConnectionInterval int var overrideBuildDeployImage string @@ -182,6 +183,7 @@ func main() { "The number of workers to start with.") flag.IntVar(&rabbitRetryInterval, "rabbitmq-retry-interval", 30, "The retry interval for rabbitmq.") + flag.BoolVar(&enableSingleQueue, "enable-single-queue", true, "Flag to have this controller use the single queue option.") flag.StringVar(&leaderElectionID, "leader-election-id", "lagoon-builddeploy-leader-election-helper", "The ID to use for leader election.") flag.StringVar(&pendingMessageCron, "pending-message-cron", "15,45 * * * *", @@ -360,6 +362,7 @@ func main() { mqUser = helpers.GetEnv("RABBITMQ_USERNAME", mqUser) mqPass = helpers.GetEnv("RABBITMQ_PASSWORD", mqPass) mqHost = helpers.GetEnv("RABBITMQ_HOSTNAME", mqHost) + enableSingleQueue = helpers.GetEnvBool("ENABLE_SINGLE_QUEUE", enableSingleQueue) lagoonTargetName = helpers.GetEnv("LAGOON_TARGET_NAME", lagoonTargetName) lagoonAppID = helpers.GetEnv("LAGOON_APP_ID", lagoonAppID) pendingMessageCron = helpers.GetEnv("PENDING_MESSAGE_CRON", pendingMessageCron) @@ -454,130 +457,169 @@ func main() { os.Exit(1) } - config := mq.Config{ - ReconnectDelay: time.Duration(rabbitRetryInterval) * time.Second, - Exchanges: mq.Exchanges{ - { - Name: "lagoon-tasks", - Type: "direct", - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, + exchanges := mq.Exchanges{ + { + Name: "lagoon-tasks", + Type: "direct", + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", }, }, - Consumers: mq.Consumers{ - { - Name: "remove-queue", - Queue: fmt.Sprintf("lagoon-tasks:%s:remove", lagoonTargetName), - Workers: mqWorkers, - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: "builddeploy-queue", - Queue: fmt.Sprintf("lagoon-tasks:%s:builddeploy", lagoonTargetName), - Workers: mqWorkers, - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: "jobs-queue", - Queue: fmt.Sprintf("lagoon-tasks:%s:jobs", lagoonTargetName), - Workers: mqWorkers, - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: "misc-queue", - Queue: fmt.Sprintf("lagoon-tasks:%s:misc", lagoonTargetName), - Workers: mqWorkers, - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, + } + consumers := mq.Consumers{ + { + Name: "remove-queue", + Queue: fmt.Sprintf("lagoon-tasks:%s:remove", lagoonTargetName), + Workers: mqWorkers, + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: "builddeploy-queue", + Queue: fmt.Sprintf("lagoon-tasks:%s:builddeploy", lagoonTargetName), + Workers: mqWorkers, + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: "jobs-queue", + Queue: fmt.Sprintf("lagoon-tasks:%s:jobs", lagoonTargetName), + Workers: mqWorkers, + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: "misc-queue", + Queue: fmt.Sprintf("lagoon-tasks:%s:misc", lagoonTargetName), + Workers: mqWorkers, + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", }, }, - Queues: mq.Queues{ - { - Name: fmt.Sprintf("lagoon-tasks:%s:builddeploy", lagoonTargetName), - Exchange: "lagoon-tasks", - RoutingKey: fmt.Sprintf("%s:builddeploy", lagoonTargetName), - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: fmt.Sprintf("lagoon-tasks:%s:remove", lagoonTargetName), - Exchange: "lagoon-tasks", - RoutingKey: fmt.Sprintf("%s:remove", lagoonTargetName), - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: fmt.Sprintf("lagoon-tasks:%s:jobs", lagoonTargetName), - Exchange: "lagoon-tasks", - RoutingKey: fmt.Sprintf("%s:jobs", lagoonTargetName), - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, - }, { - Name: fmt.Sprintf("lagoon-tasks:%s:misc", lagoonTargetName), - Exchange: "lagoon-tasks", - RoutingKey: fmt.Sprintf("%s:misc", lagoonTargetName), - Options: mq.Options{ - "durable": true, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, + } + queues := mq.Queues{ + { + Name: fmt.Sprintf("lagoon-tasks:%s:builddeploy", lagoonTargetName), + Exchange: "lagoon-tasks", + RoutingKey: fmt.Sprintf("%s:builddeploy", lagoonTargetName), + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: fmt.Sprintf("lagoon-tasks:%s:remove", lagoonTargetName), + Exchange: "lagoon-tasks", + RoutingKey: fmt.Sprintf("%s:remove", lagoonTargetName), + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: fmt.Sprintf("lagoon-tasks:%s:jobs", lagoonTargetName), + Exchange: "lagoon-tasks", + RoutingKey: fmt.Sprintf("%s:jobs", lagoonTargetName), + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, { + Name: fmt.Sprintf("lagoon-tasks:%s:misc", lagoonTargetName), + Exchange: "lagoon-tasks", + RoutingKey: fmt.Sprintf("%s:misc", lagoonTargetName), + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", }, }, - Producers: mq.Producers{ - { - Name: "lagoon-logs", - Exchange: "lagoon-logs", - Options: mq.Options{ - "app_id": lagoonAppID, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, + } + producers := mq.Producers{ + { + Name: "lagoon-logs", + Exchange: "lagoon-logs", + Options: mq.Options{ + "app_id": lagoonAppID, + "delivery_mode": "2", + "headers": "", + "content_type": "", }, - { - Name: "lagoon-tasks:controller", - Exchange: "lagoon-tasks", - RoutingKey: "controller", - Options: mq.Options{ - "app_id": lagoonAppID, - "delivery_mode": "2", - "headers": "", - "content_type": "", - }, + }, + { + Name: "lagoon-tasks:controller", + Exchange: "lagoon-tasks", + RoutingKey: "controller", + Options: mq.Options{ + "app_id": lagoonAppID, + "delivery_mode": "2", + "headers": "", + "content_type": "", }, }, - DSN: fmt.Sprintf("amqp://%s:%s@%s/", mqUser, mqPass, mqHost), + } + if enableSingleQueue { + // if this controller is set up for single queue only, then add the configuration for the single queue + exchanges = append(exchanges, mq.ExchangeConfig{ + Name: "lagoon-controller", + Type: "direct", + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }) + consumers = append(consumers, mq.ConsumerConfig{ + Name: "controller-queue", + Queue: fmt.Sprintf("lagoon-controller:%s", lagoonTargetName), + Workers: mqWorkers, + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }) + queues = append(queues, mq.QueueConfig{ + Name: fmt.Sprintf("lagoon-controller:%s", lagoonTargetName), + Exchange: "lagoon-controller", + RoutingKey: fmt.Sprintf("controller:%s", lagoonTargetName), + Options: mq.Options{ + "durable": true, + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }) + } + config := mq.Config{ + ReconnectDelay: time.Duration(rabbitRetryInterval) * time.Second, + Exchanges: exchanges, + Consumers: consumers, + Queues: queues, + Producers: producers, + DSN: fmt.Sprintf("amqp://%s:%s@%s/", mqUser, mqPass, mqHost), } harborURLParsed, _ := url.Parse(harborURL) @@ -624,6 +666,7 @@ func main() { randomPrefix, advancedTaskSSHKeyInjection, advancedTaskDeployToken, + enableSingleQueue, deletion, enableDebug, )