From 6da97a0ce4e69de64b6c709186bd6521c6d2aa50 Mon Sep 17 00:00:00 2001 From: shreddedbacon Date: Mon, 26 Feb 2024 19:44:27 +1100 Subject: [PATCH] feat: support cancelling restores --- internal/helpers/helper_types.go | 5 + internal/messenger/consumer.go | 24 ++++- internal/messenger/messenger.go | 4 + internal/messenger/tasks_restore.go | 162 ++++++++++++++++++++-------- main.go | 31 ++++++ 5 files changed, 183 insertions(+), 43 deletions(-) diff --git a/internal/helpers/helper_types.go b/internal/helpers/helper_types.go index 61ec1fce..27fc8bf3 100644 --- a/internal/helpers/helper_types.go +++ b/internal/helpers/helper_types.go @@ -31,3 +31,8 @@ type LagoonAPIConfiguration struct { SSHHost string SSHPort string } + +type K8UPVersions struct { + V1 bool + V2 bool +} diff --git a/internal/messenger/consumer.go b/internal/messenger/consumer.go index a0b90853..c5a580ec 100644 --- a/internal/messenger/consumer.go +++ b/internal/messenger/consumer.go @@ -343,7 +343,7 @@ func (m *Messenger) Consumer(targetName string) { //error { jobSpec.Environment.Name, ), ) - err := m.ResticRestore(namespace, jobSpec) + err := m.ResticRestore(ctx, namespace, jobSpec, false) if err != nil { opLog.Error(err, fmt.Sprintf( @@ -356,6 +356,28 @@ func (m *Messenger) Consumer(targetName string) { //error { message.Ack(false) // ack to remove from queue return } + case "deploytarget:restic:cancel:restore": + // if this is a request to cancel a restore attempt + opLog.Info( + fmt.Sprintf( + "Received restore cancellation for project %s, environment %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + ), + ) + err := m.ResticRestore(ctx, namespace, jobSpec, true) + if err != nil { + opLog.Error(err, + fmt.Sprintf( + "Cancel restore for project %s, environment %s failed", + jobSpec.Project.Name, + jobSpec.Environment.Name, + ), + ) + //@TODO: send msg back to lagoon and update task to failed? + message.Ack(false) // ack to remove from queue + return + } case "deploytarget:route:migrate", "kubernetes:route:migrate", "openshift:route:migrate": opLog.Info( fmt.Sprintf( diff --git a/internal/messenger/messenger.go b/internal/messenger/messenger.go index 2ce0eee6..dbfde019 100644 --- a/internal/messenger/messenger.go +++ b/internal/messenger/messenger.go @@ -3,6 +3,7 @@ package messenger import ( "github.com/cheshir/go-mq/v2" "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/uselagoon/remote-controller/internal/helpers" "github.com/uselagoon/remote-controller/internal/utilities/deletions" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -37,6 +38,7 @@ type Messenger struct { DeletionHandler *deletions.Deletions EnableDebug bool SupportK8upV2 bool + K8upVersions helpers.K8UPVersions Cache *expirable.LRU[string, string] } @@ -54,6 +56,7 @@ func New(config mq.Config, enableDebug bool, supportK8upV2 bool, cache *expirable.LRU[string, string], + k8upVersions helpers.K8UPVersions, ) *Messenger { return &Messenger{ Config: config, @@ -69,5 +72,6 @@ func New(config mq.Config, EnableDebug: enableDebug, SupportK8upV2: supportK8upV2, Cache: cache, + K8upVersions: k8upVersions, } } diff --git a/internal/messenger/tasks_restore.go b/internal/messenger/tasks_restore.go index 315ddb4d..4e8b2e5d 100644 --- a/internal/messenger/tasks_restore.go +++ b/internal/messenger/tasks_restore.go @@ -8,16 +8,20 @@ import ( "github.com/go-logr/logr" lagoonv1beta1 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta1" "github.com/uselagoon/remote-controller/internal/helpers" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" k8upv1 "github.com/k8up-io/k8up/v2/api/v1" k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - "k8s.io/apimachinery/pkg/types" ) +type cancelRestore struct { + RestoreName string `json:"restoreName"` + BackupID string `json:"backupId"` +} + // ResticRestore handles creating the restic restore jobs. -func (m *Messenger) ResticRestore(namespace string, jobSpec *lagoonv1beta1.LagoonTaskSpec) error { +func (m *Messenger) ResticRestore(ctx context.Context, namespace string, jobSpec *lagoonv1beta1.LagoonTaskSpec, cancel bool) error { opLog := ctrl.Log.WithName("handlers").WithName("LagoonTasks") vers, err := checkRestoreVersionFromCore(jobSpec.Misc.MiscResource) if err != nil { @@ -31,51 +35,41 @@ func (m *Messenger) ResticRestore(namespace string, jobSpec *lagoonv1beta1.Lagoo return nil } - // check if k8up crds exist in the cluster - k8upv1alpha1Exists := false - k8upv1Exists := false - crdv1alpha1 := &apiextensionsv1.CustomResourceDefinition{} - if err = m.Client.Get(context.TODO(), types.NamespacedName{Name: "restores.backup.appuio.ch"}, crdv1alpha1); err != nil { - if err := helpers.IgnoreNotFound(err); err != nil { - return err - } - } - if crdv1alpha1.ObjectMeta.Name == "restores.backup.appuio.ch" { - k8upv1alpha1Exists = true - } - crdv1 := &apiextensionsv1.CustomResourceDefinition{} - if err = m.Client.Get(context.TODO(), types.NamespacedName{Name: "restores.k8up.io"}, crdv1); err != nil { - if err := helpers.IgnoreNotFound(err); err != nil { - return err - } - } - if crdv1.ObjectMeta.Name == "restores.k8up.io" { - k8upv1Exists = true - } + v1alpha1 := false + v1 := false // check the version, if there is no version in the payload, assume it is k8up v2 if m.SupportK8upV2 { if vers == "backup.appuio.ch/v1alpha1" { - if k8upv1alpha1Exists { - return m.createv1alpha1Restore(opLog, namespace, jobSpec) + if m.K8upVersions.V1 { + v1alpha1 = true } } else { - if k8upv1Exists { - if err := m.createv1Restore(opLog, namespace, jobSpec); err != nil { - return err - } + if m.K8upVersions.V2 { + v1 = true } else { - if k8upv1alpha1Exists { - if err := m.createv1alpha1Restore(opLog, namespace, jobSpec); err != nil { - return err - } + if m.K8upVersions.V1 { + v1alpha1 = true } } } } else { - if k8upv1alpha1Exists { - if err := m.createv1alpha1Restore(opLog, namespace, jobSpec); err != nil { - return err - } + if m.K8upVersions.V1 { + v1alpha1 = true + } + } + + if v1alpha1 { + if cancel { + return m.cancelv1alpha1Restore(ctx, opLog, namespace, jobSpec) + } else { + return m.createv1alpha1Restore(ctx, opLog, namespace, jobSpec) + } + } + if v1 { + if cancel { + return m.cancelv1Restore(ctx, opLog, namespace, jobSpec) + } else { + return m.createv1Restore(ctx, opLog, namespace, jobSpec) } } return nil @@ -97,7 +91,7 @@ func checkRestoreVersionFromCore(resource []byte) (string, error) { } // createv1alpha1Restore will create a restore task using the restores.backup.appuio.ch v1alpha1 api (k8up v1) -func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, jobSpec *lagoonv1beta1.LagoonTaskSpec) error { +func (m *Messenger) createv1alpha1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta1.LagoonTaskSpec) error { restorev1alpha1 := &k8upv1alpha1.Restore{} if err := json.Unmarshal(jobSpec.Misc.MiscResource, restorev1alpha1); err != nil { opLog.Error(err, @@ -109,7 +103,7 @@ func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, j return err } restorev1alpha1.SetNamespace(namespace) - if err := m.Client.Create(context.Background(), restorev1alpha1); err != nil { + if err := m.Client.Create(ctx, restorev1alpha1); err != nil { opLog.Error(err, fmt.Sprintf( "Unable to create restore %s with k8up v1alpha1 api.", @@ -122,7 +116,7 @@ func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, j } // createv1Restore will create a restore task using the restores.k8up.io v1 api (k8up v2) -func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec *lagoonv1beta1.LagoonTaskSpec) error { +func (m *Messenger) createv1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta1.LagoonTaskSpec) error { restorev1 := &k8upv1.Restore{} if err := json.Unmarshal(jobSpec.Misc.MiscResource, restorev1); err != nil { opLog.Error(err, @@ -134,7 +128,7 @@ func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec return err } restorev1.SetNamespace(namespace) - if err := m.Client.Create(context.Background(), restorev1); err != nil { + if err := m.Client.Create(ctx, restorev1); err != nil { opLog.Error(err, fmt.Sprintf( "Unable to create restore %s with k8up v1 api.", @@ -145,3 +139,87 @@ func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec } return nil } + +// cancelv1alpha1Restore will attempt to cancel a restore task using the restores.backup.appuio.ch v1alpha1 api (k8up v1) +func (m *Messenger) cancelv1alpha1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta1.LagoonTaskSpec) error { + restorev1alpha1 := &k8upv1alpha1.Restore{} + cr := &cancelRestore{} + if err := json.Unmarshal(jobSpec.Misc.MiscResource, &cr); err != nil { + return err + } + if err := m.Client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.RestoreName}, restorev1alpha1); helpers.IgnoreNotFound(err) != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to get restore %s with k8up v1alpha1 api.", + cr.RestoreName, + ), + ) + return err + } + if restorev1alpha1.Name != "" { + if err := m.Client.Delete(ctx, restorev1alpha1); err != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to delete restore %s with k8up v1alpha1 api.", + cr.RestoreName, + ), + ) + return err + } + } + // if no matching restore found, or the restore is deleted, send the cancellation message back to core + m.pubRestoreCancel(opLog, namespace, cr.RestoreName, jobSpec) + return nil +} + +// cancelv1Restore will attempt to cancel a restore task using the restores.k8up.io v1 api (k8up v2) +func (m *Messenger) cancelv1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta1.LagoonTaskSpec) error { + restorev1 := &k8upv1.Restore{} + cr := &cancelRestore{} + if err := json.Unmarshal(jobSpec.Misc.MiscResource, &cr); err != nil { + return err + } + if err := m.Client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.RestoreName}, restorev1); helpers.IgnoreNotFound(err) != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to get restore %s with k8up v1 api.", + cr.RestoreName, + ), + ) + return err + } + if restorev1.Name != "" { + if err := m.Client.Delete(ctx, restorev1); err != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to delete restore %s with k8up v1alpha1 api.", + cr.RestoreName, + ), + ) + return err + } + } + // if no matching restore found, or the restore is deleted, send the cancellation message back to core + m.pubRestoreCancel(opLog, namespace, cr.RestoreName, jobSpec) + return nil +} + +func (m *Messenger) pubRestoreCancel(opLog logr.Logger, namespace, restorename string, jobSpec *lagoonv1beta1.LagoonTaskSpec) { + msg := lagoonv1beta1.LagoonMessage{ + Type: "restore:cancel", + Namespace: namespace, + Meta: &lagoonv1beta1.LagoonLogMeta{ + Environment: jobSpec.Environment.Name, + Project: jobSpec.Project.Name, + JobName: restorename, + }, + } + msgBytes, err := json.Marshal(msg) + if err != nil { + opLog.Error(err, "Unable to encode message as JSON") + } + // publish the cancellation result back to lagoon + if err := m.Publish("lagoon-tasks:controller", msgBytes); err != nil { + opLog.Error(err, "Unable to publish message.") + } +} diff --git a/main.go b/main.go index 60f4463b..8e001bd9 100644 --- a/main.go +++ b/main.go @@ -29,6 +29,7 @@ import ( "github.com/cheshir/go-mq/v2" str2duration "github.com/xhit/go-str2duration/v2" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" @@ -481,6 +482,35 @@ func main() { os.Exit(1) } + // check which version(s) of k8up may be installed in the remote, if installed + k8upv1alpha1Exists := false + k8upv1Exists := false + crdv1alpha1 := &apiextensionsv1.CustomResourceDefinition{} + if err := mgr.GetClient().Get(context.TODO(), types.NamespacedName{Name: "restores.backup.appuio.ch"}, crdv1alpha1); err != nil { + if err := helpers.IgnoreNotFound(err); err != nil { + setupLog.Error(fmt.Errorf("harbor-robot-account-expiry unable to convert to duration"), "unable to start manager") + os.Exit(1) + } + } + if crdv1alpha1.ObjectMeta.Name == "restores.backup.appuio.ch" { + k8upv1alpha1Exists = true + } + crdv1 := &apiextensionsv1.CustomResourceDefinition{} + if err = mgr.GetClient().Get(context.TODO(), types.NamespacedName{Name: "restores.k8up.io"}, crdv1); err != nil { + if err := helpers.IgnoreNotFound(err); err != nil { + setupLog.Error(fmt.Errorf("harbor-robot-account-expiry unable to convert to duration"), "unable to start manager") + os.Exit(1) + } + } + if crdv1.ObjectMeta.Name == "restores.k8up.io" { + k8upv1Exists = true + } + + k8upVersions := helpers.K8UPVersions{ + V1: k8upv1alpha1Exists, + V2: k8upv1Exists, + } + // create the cache cache := expirable.NewLRU[string, string](1000, nil, time.Minute*60) @@ -656,6 +686,7 @@ func main() { enableDebug, lffSupportK8UPv2, cache, + k8upVersions, ) c := cron.New()