From d028e488816c158513cb70590a1a57f7cf0e1dfd Mon Sep 17 00:00:00 2001 From: shreddedbacon Date: Tue, 19 Sep 2023 16:16:05 +1000 Subject: [PATCH] fix: cache cancellations to prevent race conditions in pod creation --- .github/workflows/remote-controller.yaml | 3 +-- Makefile | 2 +- controllers/v1beta1/build_controller.go | 6 +++--- controllers/v1beta1/podmonitor_buildhandlers.go | 17 ++++++++++++----- controllers/v1beta1/podmonitor_controller.go | 2 ++ controllers/v1beta1/podmonitor_taskhandlers.go | 16 ++++++++++++---- go.mod | 1 + go.sum | 2 ++ internal/messenger/consumer.go | 2 ++ internal/messenger/messenger.go | 4 ++++ main.go | 7 +++++++ 11 files changed, 47 insertions(+), 15 deletions(-) diff --git a/.github/workflows/remote-controller.yaml b/.github/workflows/remote-controller.yaml index f023aa9e..9cbbba24 100644 --- a/.github/workflows/remote-controller.yaml +++ b/.github/workflows/remote-controller.yaml @@ -47,7 +47,7 @@ jobs: - name: Setup correct Go version uses: actions/setup-go@v2 with: - go-version: '1.17' + go-version: '1.20' - name: Install kubebuilder run: | #kubebuilder @@ -122,7 +122,6 @@ jobs: run: | export PATH=$PATH:/usr/local/kubebuilder/bin export PATH=$PATH:/usr/local/go/bin - export GOPATH=$HOME/go export OVERRIDE_BUILD_DEPLOY_DIND_IMAGE="${{matrix.lagoon_build_image}}" export HARBOR_URL="http://harbor.$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[0].address}').nip.io:32080" export HARBOR_API="http://harbor.$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[0].address}').nip.io:32080/api" diff --git a/Makefile b/Makefile index 5e3fb5b7..9a4c9409 100644 --- a/Makefile +++ b/Makefile @@ -102,7 +102,7 @@ ifeq (, $(shell which controller-gen)) CONTROLLER_GEN_TMP_DIR=$$(mktemp -d) ;\ cd $$CONTROLLER_GEN_TMP_DIR ;\ go mod init tmp ;\ - go get sigs.k8s.io/controller-tools/cmd/controller-gen@v0.6.2 ;\ + go install sigs.k8s.io/controller-tools/cmd/controller-gen@v0.6.2 ;\ rm -rf $$CONTROLLER_GEN_TMP_DIR ;\ } CONTROLLER_GEN=$(GOBIN)/controller-gen diff --git a/controllers/v1beta1/build_controller.go b/controllers/v1beta1/build_controller.go index 03ebf734..421cdac3 100644 --- a/controllers/v1beta1/build_controller.go +++ b/controllers/v1beta1/build_controller.go @@ -315,8 +315,8 @@ func (r *LagoonBuildReconciler) createNamespaceBuild(ctx context.Context, // getOrCreateBuildResource will deepcopy the lagoon build into a new resource and push it to the new namespace // then clean up the old one. -func (r *LagoonBuildReconciler) getOrCreateBuildResource(ctx context.Context, build *lagoonv1beta1.LagoonBuild, ns string) error { - newBuild := build.DeepCopy() +func (r *LagoonBuildReconciler) getOrCreateBuildResource(ctx context.Context, lagoonBuild *lagoonv1beta1.LagoonBuild, ns string) error { + newBuild := lagoonBuild.DeepCopy() newBuild.SetNamespace(ns) newBuild.SetResourceVersion("") newBuild.SetLabels( @@ -334,7 +334,7 @@ func (r *LagoonBuildReconciler) getOrCreateBuildResource(ctx context.Context, bu return err } } - err = r.Delete(ctx, build) + err = r.Delete(ctx, lagoonBuild) if err != nil { return err } diff --git a/controllers/v1beta1/podmonitor_buildhandlers.go b/controllers/v1beta1/podmonitor_buildhandlers.go index 54017566..c5faabf8 100644 --- a/controllers/v1beta1/podmonitor_buildhandlers.go +++ b/controllers/v1beta1/podmonitor_buildhandlers.go @@ -38,12 +38,19 @@ func (r *LagoonMonitorReconciler) handleBuildMonitor(ctx context.Context, if err != nil { return err } + cancel := false if cancelBuild, ok := jobPod.ObjectMeta.Labels["lagoon.sh/cancelBuild"]; ok { - cancel, _ := strconv.ParseBool(cancelBuild) - if cancel { - opLog.Info(fmt.Sprintf("Attempting to cancel build %s", lagoonBuild.ObjectMeta.Name)) - return r.updateDeploymentWithLogs(ctx, req, lagoonBuild, jobPod, nil, cancel) - } + cancel, _ = strconv.ParseBool(cancelBuild) + } + _, ok := r.Cache.Get(lagoonBuild.ObjectMeta.Name) + if ok { + opLog.Info(fmt.Sprintf("Cached cancellation exists (in podmonitor): %s", lagoonBuild.ObjectMeta.Name)) + // this object exists in the cache meaning the task has been cancelled, set cancel to true and remove from cache + r.Cache.Remove(lagoonBuild.ObjectMeta.Name) + cancel = true + } + if cancel { + return r.updateDeploymentWithLogs(ctx, req, lagoonBuild, jobPod, nil, cancel) } // check if the build pod is in pending, a container in the pod could be failed in this state if jobPod.Status.Phase == corev1.PodPending { diff --git a/controllers/v1beta1/podmonitor_controller.go b/controllers/v1beta1/podmonitor_controller.go index 260a313c..185be749 100644 --- a/controllers/v1beta1/podmonitor_controller.go +++ b/controllers/v1beta1/podmonitor_controller.go @@ -23,6 +23,7 @@ import ( "strconv" "github.com/go-logr/logr" + "github.com/hashicorp/golang-lru/v2/expirable" lagoonv1beta1 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta1" "github.com/uselagoon/remote-controller/internal/helpers" "github.com/uselagoon/remote-controller/internal/messenger" @@ -50,6 +51,7 @@ type LagoonMonitorReconciler struct { LagoonTargetName string LFFQoSEnabled bool BuildQoS BuildQoS + Cache *expirable.LRU[string, string] } // slice of the different failure states of pods that we care about diff --git a/controllers/v1beta1/podmonitor_taskhandlers.go b/controllers/v1beta1/podmonitor_taskhandlers.go index 5f105045..17eb247c 100644 --- a/controllers/v1beta1/podmonitor_taskhandlers.go +++ b/controllers/v1beta1/podmonitor_taskhandlers.go @@ -31,11 +31,19 @@ func (r *LagoonMonitorReconciler) handleTaskMonitor(ctx context.Context, opLog l if err != nil { return err } + cancel := false if cancelTask, ok := jobPod.ObjectMeta.Labels["lagoon.sh/cancelTask"]; ok { - cancel, _ := strconv.ParseBool(cancelTask) - if cancel { - return r.updateTaskWithLogs(ctx, req, lagoonTask, jobPod, nil, cancel) - } + cancel, _ = strconv.ParseBool(cancelTask) + } + _, ok := r.Cache.Get(lagoonTask.ObjectMeta.Name) + if ok { + opLog.Info(fmt.Sprintf("Cached cancellation exists (in podmonitor): %s", lagoonTask.ObjectMeta.Name)) + // this object exists in the cache meaning the task has been cancelled, set cancel to true and remove from cache + r.Cache.Remove(lagoonTask.ObjectMeta.Name) + cancel = true + } + if cancel { + return r.updateTaskWithLogs(ctx, req, lagoonTask, jobPod, nil, cancel) } if jobPod.Status.Phase == corev1.PodPending { for _, container := range jobPod.Status.ContainerStatuses { diff --git a/go.mod b/go.mod index 46085b36..88770eda 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/go-logr/logr v1.2.4 github.com/google/go-cmp v0.5.9 github.com/hashicorp/go-version v1.6.0 + github.com/hashicorp/golang-lru/v2 v2.0.6 github.com/k8up-io/k8up/v2 v2.7.1 github.com/mittwald/goharbor-client/v3 v3.3.0 github.com/mittwald/goharbor-client/v5 v5.3.1 diff --git a/go.sum b/go.sum index d52a8b60..53473ad0 100644 --- a/go.sum +++ b/go.sum @@ -779,6 +779,8 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.6 h1:3xi/Cafd1NaoEnS/yDssIiuVeDVywU0QdFGl3aQaQHM= +github.com/hashicorp/golang-lru/v2 v2.0.6/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= diff --git a/internal/messenger/consumer.go b/internal/messenger/consumer.go index 862284a9..49ed3210 100644 --- a/internal/messenger/consumer.go +++ b/internal/messenger/consumer.go @@ -312,6 +312,7 @@ func (m *Messenger) Consumer(targetName string) { //error { namespace, ), ) + m.Cache.Add(jobSpec.Misc.Name, jobSpec.Project.Name) err := m.CancelBuild(namespace, jobSpec) if err != nil { //@TODO: send msg back to lagoon and update task to failed? @@ -327,6 +328,7 @@ func (m *Messenger) Consumer(targetName string) { //error { namespace, ), ) + m.Cache.Add(jobSpec.Task.TaskName, jobSpec.Project.Name) err := m.CancelTask(namespace, jobSpec) if err != nil { //@TODO: send msg back to lagoon and update task to failed? diff --git a/internal/messenger/messenger.go b/internal/messenger/messenger.go index a95d1d33..f5a69d73 100644 --- a/internal/messenger/messenger.go +++ b/internal/messenger/messenger.go @@ -2,6 +2,7 @@ package messenger import ( "github.com/cheshir/go-mq/v2" + "github.com/hashicorp/golang-lru/v2/expirable" "github.com/uselagoon/remote-controller/internal/utilities/deletions" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -35,6 +36,7 @@ type Messenger struct { AdvancedTaskDeployTokenInjection bool DeletionHandler *deletions.Deletions EnableDebug bool + Cache *expirable.LRU[string, string] } // New returns a messaging with config and controller-runtime client. @@ -49,6 +51,7 @@ func New(config mq.Config, advancedTaskDeployTokenInjection bool, deletionHandler *deletions.Deletions, enableDebug bool, + cache *expirable.LRU[string, string], ) *Messenger { return &Messenger{ Config: config, @@ -62,5 +65,6 @@ func New(config mq.Config, AdvancedTaskDeployTokenInjection: advancedTaskDeployTokenInjection, DeletionHandler: deletionHandler, EnableDebug: enableDebug, + Cache: cache, } } diff --git a/main.go b/main.go index 04ac3694..d709103c 100644 --- a/main.go +++ b/main.go @@ -42,6 +42,7 @@ import ( cron "gopkg.in/robfig/cron.v2" + "github.com/hashicorp/golang-lru/v2/expirable" k8upv1 "github.com/k8up-io/k8up/v2/api/v1" lagoonv1beta1 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta1" lagoonv1beta1ctrl "github.com/uselagoon/remote-controller/controllers/v1beta1" @@ -470,6 +471,9 @@ func main() { os.Exit(1) } + // create the cache + cache := expirable.NewLRU[string, string](1000, nil, time.Minute*60) + config := mq.Config{ ReconnectDelay: time.Duration(rabbitRetryInterval) * time.Second, Exchanges: mq.Exchanges{ @@ -640,10 +644,12 @@ func main() { advancedTaskDeployToken, deletion, enableDebug, + cache, ) c := cron.New() // if we are running with MQ support, then start the consumer handler + if enableMQ { setupLog.Info("starting messaging handler") go messaging.Consumer(lagoonTargetName) @@ -812,6 +818,7 @@ func main() { LagoonTargetName: lagoonTargetName, LFFQoSEnabled: lffQoSEnabled, BuildQoS: buildQoSConfig, + Cache: cache, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "LagoonMonitor") os.Exit(1)