From c8d4209b6544733e6d9860f061f1f577cf7da811 Mon Sep 17 00:00:00 2001 From: Michael Weibel Date: Thu, 26 Jan 2023 16:33:07 +0100 Subject: [PATCH] fix owner reconciliation fixes #806 Signed-off-by: Michael Weibel --- cmd/operator/main.go | 2 - config/rbac/role.yaml | 9 -- operator/archivecontroller/controller.go | 34 +++-- operator/archivecontroller/executor.go | 6 +- operator/archivecontroller/setup.go | 3 + operator/backupcontroller/controller.go | 19 ++- .../controller_integration_test.go | 60 ++++++++ .../controller_utils_integration_test.go | 11 ++ operator/backupcontroller/executor.go | 6 +- operator/backupcontroller/setup.go | 2 + operator/checkcontroller/controller.go | 30 ++-- operator/checkcontroller/executor.go | 6 +- operator/checkcontroller/setup.go | 3 + operator/job/job.go | 77 ++++++++++ operator/jobcontroller/conditions.go | 14 -- operator/jobcontroller/controller.go | 142 ------------------ .../controller_integration_test.go | 134 ----------------- operator/jobcontroller/setup.go | 32 ---- operator/prunecontroller/controller.go | 28 ++-- operator/prunecontroller/executor.go | 7 +- operator/prunecontroller/setup.go | 3 + operator/restorecontroller/controller.go | 30 ++-- operator/restorecontroller/executor.go | 6 +- operator/restorecontroller/setup.go | 3 + 24 files changed, 282 insertions(+), 385 deletions(-) delete mode 100644 operator/jobcontroller/conditions.go delete mode 100644 operator/jobcontroller/controller.go delete mode 100644 operator/jobcontroller/controller_integration_test.go delete mode 100644 operator/jobcontroller/setup.go diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 802a98d6f..cef62ab82 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -10,7 +10,6 @@ import ( "github.com/k8up-io/k8up/v2/operator/backupcontroller" "github.com/k8up-io/k8up/v2/operator/cfg" "github.com/k8up-io/k8up/v2/operator/checkcontroller" - "github.com/k8up-io/k8up/v2/operator/jobcontroller" "github.com/k8up-io/k8up/v2/operator/prunecontroller" "github.com/k8up-io/k8up/v2/operator/restorecontroller" "github.com/k8up-io/k8up/v2/operator/schedulecontroller" @@ -117,7 +116,6 @@ func operatorMain(c *cli.Context) error { "Archive": archivecontroller.SetupWithManager, "Check": checkcontroller.SetupWithManager, "Prune": prunecontroller.SetupWithManager, - "Job": jobcontroller.SetupWithManager, } { if setupErr := setupFn(mgr); setupErr != nil { operatorLog.Error(setupErr, "unable to initialize operator mode", "step", "controller", "controller", name) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 45d501e0b..24ad5dcac 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -29,15 +29,6 @@ rules: - patch - update - watch -- apiGroups: - - batch - resources: - - jobs/finalizers - - jobs/status - verbs: - - get - - patch - - update - apiGroups: - coordination.k8s.io resources: diff --git a/operator/archivecontroller/controller.go b/operator/archivecontroller/controller.go index 59324dae2..ebbe61c52 100644 --- a/operator/archivecontroller/controller.go +++ b/operator/archivecontroller/controller.go @@ -8,6 +8,7 @@ import ( "github.com/k8up-io/k8up/v2/operator/cfg" "github.com/k8up-io/k8up/v2/operator/job" "github.com/k8up-io/k8up/v2/operator/locker" + "k8s.io/apimachinery/pkg/types" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -25,25 +26,34 @@ func (r *ArchiveReconciler) NewObjectList() *k8upv1.ArchiveList { return &k8upv1.ArchiveList{} } -func (r *ArchiveReconciler) Provision(ctx context.Context, archive *k8upv1.Archive) (controllerruntime.Result, error) { +func (r *ArchiveReconciler) Provision(ctx context.Context, obj *k8upv1.Archive) (controllerruntime.Result, error) { log := controllerruntime.LoggerFrom(ctx) - if archive.Status.HasStarted() { - return controllerruntime.Result{}, nil - } - repository := cfg.Config.GetGlobalRepository() - if archive.Spec.Backend != nil { - repository = archive.Spec.Backend.String() + if obj.Spec.Backend != nil { + repository = obj.Spec.Backend.String() } - if archive.Spec.RestoreSpec == nil { - archive.Spec.RestoreSpec = &k8upv1.RestoreSpec{} + if obj.Spec.RestoreSpec == nil { + obj.Spec.RestoreSpec = &k8upv1.RestoreSpec{} } - config := job.NewConfig(r.Kube, archive, repository) + config := job.NewConfig(r.Kube, obj, repository) executor := NewArchiveExecutor(config) - if archive.Status.HasFinished() { - executor.cleanupOldArchives(ctx, archive) + jobKey := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: executor.jobName(), + } + if err := job.ReconcileJobStatus(ctx, jobKey, r.Kube, obj); err != nil { + return controllerruntime.Result{}, err + } + + if obj.Status.HasStarted() { + log.V(1).Info("archive just started, waiting") + return controllerruntime.Result{}, nil + } + + if obj.Status.HasFinished() { + executor.cleanupOldArchives(ctx, obj) return controllerruntime.Result{}, nil } diff --git a/operator/archivecontroller/executor.go b/operator/archivecontroller/executor.go index 8dbe39dca..bdff7c4d0 100644 --- a/operator/archivecontroller/executor.go +++ b/operator/archivecontroller/executor.go @@ -39,7 +39,7 @@ func (a *ArchiveExecutor) Execute(ctx context.Context) error { archive := a.Obj.(*k8upv1.Archive) batchJob := &batchv1.Job{} - batchJob.Name = k8upv1.ArchiveType.String() + "-" + a.Obj.GetName() + batchJob.Name = a.jobName() batchJob.Namespace = archive.Namespace _, err := controllerutil.CreateOrUpdate(ctx, a.Client, batchJob, func() error { @@ -63,6 +63,10 @@ func (a *ArchiveExecutor) Execute(ctx context.Context) error { return nil } +func (a *ArchiveExecutor) jobName() string { + return k8upv1.ArchiveType.String() + "-" + a.Obj.GetName() +} + func (a *ArchiveExecutor) setupArgs(archive *k8upv1.Archive) []string { args := []string{"-archive", "-restoreType", "s3"} diff --git a/operator/archivecontroller/setup.go b/operator/archivecontroller/setup.go index 94bf9249d..48271024d 100644 --- a/operator/archivecontroller/setup.go +++ b/operator/archivecontroller/setup.go @@ -3,12 +3,14 @@ package archivecontroller import ( k8upv1 "github.com/k8up-io/k8up/v2/api/v1" "github.com/k8up-io/k8up/v2/operator/reconciler" + batchv1 "k8s.io/api/batch/v1" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/predicate" ) // +kubebuilder:rbac:groups=k8up.io,resources=archives,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=k8up.io,resources=archives/status;archives/finalizers,verbs=get;update;patch +// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete // SetupWithManager configures the reconciler. func SetupWithManager(mgr controllerruntime.Manager) error { @@ -18,6 +20,7 @@ func SetupWithManager(mgr controllerruntime.Manager) error { }) return controllerruntime.NewControllerManagedBy(mgr). For(&k8upv1.Archive{}). + Owns(&batchv1.Job{}). Named(name). WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) diff --git a/operator/backupcontroller/controller.go b/operator/backupcontroller/controller.go index 3dfc298b8..d9cf7c1f0 100644 --- a/operator/backupcontroller/controller.go +++ b/operator/backupcontroller/controller.go @@ -9,6 +9,7 @@ import ( "github.com/k8up-io/k8up/v2/operator/job" "github.com/k8up-io/k8up/v2/operator/locker" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -30,10 +31,6 @@ func (r *BackupReconciler) NewObjectList() *k8upv1.BackupList { func (r *BackupReconciler) Provision(ctx context.Context, obj *k8upv1.Backup) (reconcile.Result, error) { log := controllerruntime.LoggerFrom(ctx) - if obj.Status.HasStarted() { - return controllerruntime.Result{RequeueAfter: 30 * time.Second}, nil // nothing to do, wait until finished - } - repository := cfg.Config.GetGlobalRepository() if obj.Spec.Backend != nil { repository = obj.Spec.Backend.String() @@ -41,6 +38,18 @@ func (r *BackupReconciler) Provision(ctx context.Context, obj *k8upv1.Backup) (r config := job.NewConfig(r.Kube, obj, repository) executor := NewBackupExecutor(config) + jobKey := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: executor.jobName(), + } + if err := job.ReconcileJobStatus(ctx, jobKey, r.Kube, obj); err != nil { + return controllerruntime.Result{}, err + } + + if obj.Status.HasStarted() { + log.V(1).Info("backup just started, waiting") + return controllerruntime.Result{RequeueAfter: 30 * time.Second}, nil + } if obj.Status.HasFinished() || isPrebackupFailed(obj) { cleanupCond := meta.FindStatusCondition(obj.Status.Conditions, k8upv1.ConditionScrubbed.String()) if cleanupCond == nil || cleanupCond.Reason != k8upv1.ReasonSucceeded.String() { @@ -48,7 +57,7 @@ func (r *BackupReconciler) Provision(ctx context.Context, obj *k8upv1.Backup) (r } prebackupCond := meta.FindStatusCondition(obj.Status.Conditions, k8upv1.ConditionPreBackupPodReady.String()) - if prebackupCond.Reason == k8upv1.ReasonFinished.String() || prebackupCond.Reason == k8upv1.ReasonFailed.String() || prebackupCond.Reason == k8upv1.ReasonNoPreBackupPodsFound.String() { + if prebackupCond != nil && (prebackupCond.Reason == k8upv1.ReasonFinished.String() || prebackupCond.Reason == k8upv1.ReasonFailed.String() || prebackupCond.Reason == k8upv1.ReasonNoPreBackupPodsFound.String()) { // only ignore future reconciles if we have stopped all prebackup deployments in an earlier reconciliation. return controllerruntime.Result{}, nil } diff --git a/operator/backupcontroller/controller_integration_test.go b/operator/backupcontroller/controller_integration_test.go index e4be092cc..b06215439 100644 --- a/operator/backupcontroller/controller_integration_test.go +++ b/operator/backupcontroller/controller_integration_test.go @@ -10,6 +10,9 @@ import ( k8upv1 "github.com/k8up-io/k8up/v2/api/v1" "github.com/k8up-io/k8up/v2/envtest" "github.com/stretchr/testify/suite" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -44,6 +47,56 @@ func (ts *BackupTestSuite) Test_GivenBackup_ExpectBackupJob() { ts.expectABackupJob() } +func (ts *BackupTestSuite) Test_GivenBackup_AndJob_KeepBackupProgressing() { + backupJob := ts.newJob(ts.BackupResource) + ts.EnsureResources(ts.BackupResource, backupJob) + ts.BackupResource.Status.Started = true + backupJob.Status.Active = 1 + ts.UpdateStatus(ts.BackupResource, backupJob) + + ts.whenReconciling(ts.BackupResource) + + result := &k8upv1.Backup{} + err := ts.Client.Get(ts.Ctx, k8upv1.MapToNamespacedName(ts.BackupResource), result) + ts.Require().NoError(err) + ts.assertCondition(result.Status.Conditions, k8upv1.ConditionProgressing, k8upv1.ReasonStarted, metav1.ConditionTrue) + ts.assertCondition(result.Status.Conditions, k8upv1.ConditionReady, k8upv1.ReasonReady, metav1.ConditionTrue) + ts.Assert().Len(result.Status.Conditions, 2, "amount of conditions") +} + +func (ts *BackupTestSuite) Test_GivenBackup_AndCompletedJob_ThenCompleteBackup() { + backupJob := ts.newJob(ts.BackupResource) + ts.EnsureResources(ts.BackupResource, backupJob) + ts.BackupResource.Status.Started = true + backupJob.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}} + ts.UpdateStatus(ts.BackupResource, backupJob) + + ts.whenReconciling(ts.BackupResource) + + result := &k8upv1.Backup{} + ts.FetchResource(types.NamespacedName{Namespace: ts.BackupResource.Namespace, Name: ts.BackupResource.Name}, result) + + ts.assertCondition(result.Status.Conditions, k8upv1.ConditionCompleted, k8upv1.ReasonSucceeded, metav1.ConditionTrue) + ts.assertCondition(result.Status.Conditions, k8upv1.ConditionProgressing, k8upv1.ReasonFinished, metav1.ConditionFalse) + ts.Assert().Len(result.Status.Conditions, 4, "amount of conditions") +} + +func (ts *BackupTestSuite) Test_GivenBackup_AndFailedJob_ThenCompleteBackup() { + backupJob := ts.newJob(ts.BackupResource) + ts.EnsureResources(ts.BackupResource, backupJob) + ts.BackupResource.Status.Started = true + backupJob.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobFailed, Status: corev1.ConditionTrue}} + ts.UpdateStatus(ts.BackupResource, backupJob) + + ts.whenReconciling(ts.BackupResource) + + result := &k8upv1.Backup{} + ts.FetchResource(types.NamespacedName{Namespace: ts.BackupResource.Namespace, Name: ts.BackupResource.Name}, result) + ts.assertCondition(result.Status.Conditions, k8upv1.ConditionCompleted, k8upv1.ReasonFailed, metav1.ConditionTrue) + ts.assertCondition(result.Status.Conditions, k8upv1.ConditionProgressing, k8upv1.ReasonFinished, metav1.ConditionFalse) + ts.Assert().Len(result.Status.Conditions, 4, "amount of conditions") +} + func (ts *BackupTestSuite) Test_GivenBackupWithSecurityContext_ExpectBackupJobWithSecurityContext() { ts.BackupResource = ts.newBackupWithSecurityContext() ts.EnsureResources(ts.BackupResource) @@ -166,3 +219,10 @@ func (ts *BackupTestSuite) Test_GivenBackupWithTags_WhenCreatingBackupjob_ThenHa backupJob := ts.expectABackupJob() ts.assertJobHasTagArguments(backupJob) } + +func (ts *BackupTestSuite) assertCondition(conditions []metav1.Condition, condType k8upv1.ConditionType, reason k8upv1.ConditionReason, status metav1.ConditionStatus) { + cond := meta.FindStatusCondition(conditions, condType.String()) + ts.Require().NotNil(cond, "condition of type %s missing", condType) + ts.Assert().Equal(reason.String(), cond.Reason, "condition %s doesn't contain reason %s", condType, reason) + ts.Assert().Equal(status, cond.Status, "condition %s isn't %s", condType, status) +} diff --git a/operator/backupcontroller/controller_utils_integration_test.go b/operator/backupcontroller/controller_utils_integration_test.go index 0a6ef5115..581905a72 100644 --- a/operator/backupcontroller/controller_utils_integration_test.go +++ b/operator/backupcontroller/controller_utils_integration_test.go @@ -238,3 +238,14 @@ func (ts *BackupTestSuite) assertJobHasTagArguments(job *batchv1.Job) { jobArguments := job.Spec.Template.Spec.Containers[0].Args ts.Assert().Contains(jobArguments, backupTag, "backup tag in job args") } + +func (ts *BackupTestSuite) newJob(owner client.Object) *batchv1.Job { + jb := &batchv1.Job{} + jb.Name = k8upv1.BackupType.String() + "-" + ts.BackupResource.Name + jb.Namespace = ts.NS + jb.Labels = labels.Set{k8upv1.LabelK8upType: k8upv1.BackupType.String()} + jb.Spec.Template.Spec.Containers = []corev1.Container{{Name: "container", Image: "image"}} + jb.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure + ts.Assert().NoError(controllerruntime.SetControllerReference(owner, jb, ts.Scheme), "set controller ref") + return jb +} diff --git a/operator/backupcontroller/executor.go b/operator/backupcontroller/executor.go index d07f893e8..1658e40f2 100644 --- a/operator/backupcontroller/executor.go +++ b/operator/backupcontroller/executor.go @@ -105,7 +105,7 @@ func (b *BackupExecutor) startBackup(ctx context.Context) error { } batchJob := &batchv1.Job{} - batchJob.Name = k8upv1.BackupType.String() + "-" + b.backup.Name + batchJob.Name = b.jobName() batchJob.Namespace = b.backup.Namespace _, err = controllerruntime.CreateOrUpdate(ctx, b.Generic.Config.Client, batchJob, func() error { @@ -133,3 +133,7 @@ func (b *BackupExecutor) startBackup(ctx context.Context) error { func (b *BackupExecutor) cleanupOldBackups(ctx context.Context) { b.Generic.CleanupOldResources(ctx, &k8upv1.BackupList{}, b.backup.Namespace, b.backup) } + +func (b *BackupExecutor) jobName() string { + return k8upv1.BackupType.String() + "-" + b.backup.Name +} diff --git a/operator/backupcontroller/setup.go b/operator/backupcontroller/setup.go index 5660edd3c..36e09a120 100644 --- a/operator/backupcontroller/setup.go +++ b/operator/backupcontroller/setup.go @@ -3,6 +3,7 @@ package backupcontroller import ( k8upv1 "github.com/k8up-io/k8up/v2/api/v1" "github.com/k8up-io/k8up/v2/operator/reconciler" + batchv1 "k8s.io/api/batch/v1" "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -28,6 +29,7 @@ func SetupWithManager(mgr controllerruntime.Manager) error { return controllerruntime.NewControllerManagedBy(mgr). Named(name). For(&k8upv1.Backup{}). + Owns(&batchv1.Job{}). WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/operator/checkcontroller/controller.go b/operator/checkcontroller/controller.go index 99600d943..b1e18c97b 100644 --- a/operator/checkcontroller/controller.go +++ b/operator/checkcontroller/controller.go @@ -8,6 +8,7 @@ import ( "github.com/k8up-io/k8up/v2/operator/cfg" "github.com/k8up-io/k8up/v2/operator/job" "github.com/k8up-io/k8up/v2/operator/locker" + "k8s.io/apimachinery/pkg/types" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -25,24 +26,33 @@ func (r *CheckReconciler) NewObjectList() *k8upv1.CheckList { return &k8upv1.CheckList{} } -func (r *CheckReconciler) Provision(ctx context.Context, check *k8upv1.Check) (controllerruntime.Result, error) { +func (r *CheckReconciler) Provision(ctx context.Context, obj *k8upv1.Check) (controllerruntime.Result, error) { log := controllerruntime.LoggerFrom(ctx) - if check.Status.HasStarted() { - return controllerruntime.Result{}, nil - } - repository := cfg.Config.GetGlobalRepository() - if check.Spec.Backend != nil { - repository = check.Spec.Backend.String() + if obj.Spec.Backend != nil { + repository = obj.Spec.Backend.String() } - config := job.NewConfig(r.Kube, check, repository) + config := job.NewConfig(r.Kube, obj, repository) executor := NewCheckExecutor(config) - if check.Status.HasFinished() { - executor.cleanupOldChecks(ctx, check) + jobKey := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: executor.jobName(), + } + if err := job.ReconcileJobStatus(ctx, jobKey, r.Kube, obj); err != nil { + return controllerruntime.Result{}, err + } + + if obj.Status.HasStarted() { + log.V(1).Info("check just started, waiting") + return controllerruntime.Result{}, nil + } + + if obj.Status.HasFinished() { + executor.cleanupOldChecks(ctx, obj) return controllerruntime.Result{}, nil } diff --git a/operator/checkcontroller/executor.go b/operator/checkcontroller/executor.go index 5018baa2f..58b9e95cb 100644 --- a/operator/checkcontroller/executor.go +++ b/operator/checkcontroller/executor.go @@ -40,7 +40,7 @@ func (*CheckExecutor) Exclusive() bool { // Execute creates the actual batch.job on the k8s api. func (c *CheckExecutor) Execute(ctx context.Context) error { batchJob := &batchv1.Job{} - batchJob.Name = k8upv1.CheckType.String() + "-" + c.check.Name + batchJob.Name = c.jobName() batchJob.Namespace = c.check.Namespace _, err := controllerruntime.CreateOrUpdate(ctx, c.Client, batchJob, func() error { @@ -63,6 +63,10 @@ func (c *CheckExecutor) Execute(ctx context.Context) error { return nil } +func (c *CheckExecutor) jobName() string { + return k8upv1.CheckType.String() + "-" + c.check.Name +} + func (c *CheckExecutor) setupEnvVars(ctx context.Context) []corev1.EnvVar { log := controllerruntime.LoggerFrom(ctx) vars := executor.NewEnvVarConverter() diff --git a/operator/checkcontroller/setup.go b/operator/checkcontroller/setup.go index 4c0b20c93..43f91ca4f 100644 --- a/operator/checkcontroller/setup.go +++ b/operator/checkcontroller/setup.go @@ -3,12 +3,14 @@ package checkcontroller import ( k8upv1 "github.com/k8up-io/k8up/v2/api/v1" "github.com/k8up-io/k8up/v2/operator/reconciler" + batchv1 "k8s.io/api/batch/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/predicate" ) // +kubebuilder:rbac:groups=k8up.io,resources=checks,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=k8up.io,resources=checks/status;checks/finalizers,verbs=get;update;patch +// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete // SetupWithManager configures the reconciler. func SetupWithManager(mgr ctrl.Manager) error { @@ -18,6 +20,7 @@ func SetupWithManager(mgr ctrl.Manager) error { }) return ctrl.NewControllerManagedBy(mgr). For(&k8upv1.Check{}). + Owns(&batchv1.Job{}). Named(name). WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) diff --git a/operator/job/job.go b/operator/job/job.go index 8a81562d0..f978c7957 100644 --- a/operator/job/job.go +++ b/operator/job/job.go @@ -3,14 +3,18 @@ package job import ( + "context" "crypto/sha256" "fmt" k8upv1 "github.com/k8up-io/k8up/v2/api/v1" "github.com/k8up-io/k8up/v2/operator/cfg" + "github.com/k8up-io/k8up/v2/operator/monitoring" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -68,6 +72,79 @@ func MutateBatchJob(batchJob *batchv1.Job, jobObj k8upv1.JobObject, config Confi return controllerruntime.SetControllerReference(jobObj, batchJob, config.Client.Scheme()) } +func ReconcileJobStatus(ctx context.Context, key types.NamespacedName, client client.Client, obj k8upv1.JobObject) error { + log := controllerruntime.LoggerFrom(ctx) + log.V(1).Info("reconciling job", "key", key) + + batchJob := &batchv1.Job{} + err := client.Get(ctx, key, batchJob) + if err != nil { + if !errors.IsNotFound(err) { + return fmt.Errorf("unable to get job: %w", err) + } + log.V(1).Info("job not found", "key", key) + return nil + } + + if err := UpdateStatus(ctx, batchJob, obj); err != nil { + return fmt.Errorf("unable to update status in object: %w", err) + } + + log.V(1).Info("updating status") + if err := client.Status().Update(ctx, obj); err != nil { + return fmt.Errorf("obj status update failed: %w", err) + } + return nil +} + +// UpdateStatus retrieves status of batchJob and sets status of obj accordingly. +func UpdateStatus(ctx context.Context, batchJob *batchv1.Job, obj k8upv1.JobObject) error { + log := controllerruntime.LoggerFrom(ctx) + + // update status conditions based on Job status + objStatus := obj.GetStatus() + message := fmt.Sprintf("job '%s' has %d active, %d succeeded and %d failed pods", + batchJob.Name, batchJob.Status.Active, batchJob.Status.Succeeded, batchJob.Status.Failed) + + successCond := FindStatusCondition(batchJob.Status.Conditions, batchv1.JobComplete) + if successCond != nil && successCond.Status == corev1.ConditionTrue { + if !objStatus.HasSucceeded() { + // only increase success counter if new condition + monitoring.IncSuccessCounters(batchJob.Namespace, obj.GetType()) + log.Info("Job succeeded") + } + objStatus.SetSucceeded(message) + objStatus.SetFinished(fmt.Sprintf("job '%s' completed successfully", batchJob.Name)) + } + failedCond := FindStatusCondition(batchJob.Status.Conditions, batchv1.JobFailed) + if failedCond != nil && failedCond.Status == corev1.ConditionTrue { + if !objStatus.HasFailed() { + // only increase fail counter if new condition + monitoring.IncFailureCounters(batchJob.Namespace, obj.GetType()) + log.Info("Job failed") + } + objStatus.SetFailed(message) + objStatus.SetFinished(fmt.Sprintf("job '%s' has failed", batchJob.Name)) + } + if successCond == nil && failedCond == nil { + objStatus.SetStarted(message) + } + obj.SetStatus(objStatus) + + return nil +} + +// FindStatusCondition finds the condition with the given type in the batchv1.JobCondition slice. +// Returns nil if not found. +func FindStatusCondition(conditions []batchv1.JobCondition, conditionType batchv1.JobConditionType) *batchv1.JobCondition { + for _, condition := range conditions { + if condition.Type == conditionType { + return &condition + } + } + return nil +} + // Sha256Hash returns the SHA256 hash string of the given string // Returns empty string if v is empty. // The returned hash is shortened to 63 characters to fit into a label. diff --git a/operator/jobcontroller/conditions.go b/operator/jobcontroller/conditions.go deleted file mode 100644 index 86a6a651f..000000000 --- a/operator/jobcontroller/conditions.go +++ /dev/null @@ -1,14 +0,0 @@ -package jobcontroller - -import batchv1 "k8s.io/api/batch/v1" - -// FindStatusCondition finds the condition with the given type in the batchv1.JobCondition slice. -// Returns nil if not found. -func FindStatusCondition(conditions []batchv1.JobCondition, conditionType batchv1.JobConditionType) *batchv1.JobCondition { - for _, condition := range conditions { - if condition.Type == conditionType { - return &condition - } - } - return nil -} diff --git a/operator/jobcontroller/controller.go b/operator/jobcontroller/controller.go deleted file mode 100644 index 014eb4bd8..000000000 --- a/operator/jobcontroller/controller.go +++ /dev/null @@ -1,142 +0,0 @@ -package jobcontroller - -import ( - "context" - "fmt" - - k8upv1 "github.com/k8up-io/k8up/v2/api/v1" - "github.com/k8up-io/k8up/v2/operator/monitoring" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" -) - -const ( - jobFinalizerName string = "k8up.io/jobobserver" -) - -// JobReconciler reconciles a Job object -type JobReconciler struct { - Kube client.Client -} - -func (r *JobReconciler) NewObject() *batchv1.Job { - return &batchv1.Job{} -} - -func (r *JobReconciler) NewObjectList() *batchv1.JobList { - return &batchv1.JobList{} -} - -func (r *JobReconciler) Deprovision(ctx context.Context, obj *batchv1.Job) (ctrl.Result, error) { - return ctrl.Result{}, r.removeFinalizer(ctx, obj) -} - -func (r *JobReconciler) Provision(ctx context.Context, obj *batchv1.Job) (ctrl.Result, error) { - finalizerErr := r.removeFinalizer(ctx, obj) - if finalizerErr != nil { - return ctrl.Result{}, finalizerErr - } - return ctrl.Result{}, r.Handle(ctx, obj) -} - -func (r *JobReconciler) Handle(ctx context.Context, obj *batchv1.Job) error { - owner, err := r.fetchOwner(ctx, obj) - if err != nil { - if apierrors.IsNotFound(err) { - return nil // owner doesn't exist anymore, nothing to do. - } - return err - } - if !owner.GetDeletionTimestamp().IsZero() { - return nil // owner got deleted, probably from cleanup. Nothing to do. - } - - if err := r.updateOwner(ctx, obj, owner); err != nil { - return fmt.Errorf("could not update owner: %w", err) - } - return nil -} - -func (r *JobReconciler) fetchOwner(ctx context.Context, batchJob *batchv1.Job) (k8upv1.JobObject, error) { - controllerReference := metav1.GetControllerOf(batchJob) - if controllerReference == nil { - return nil, fmt.Errorf("job has no controller reference: %s/%s", batchJob.Namespace, batchJob.Name) - } - - var result k8upv1.JobObject - switch controllerReference.Kind { - case k8upv1.BackupKind: - result = &k8upv1.Backup{} - case k8upv1.ArchiveKind: - result = &k8upv1.Archive{} - case k8upv1.RestoreKind: - result = &k8upv1.Restore{} - case k8upv1.CheckKind: - result = &k8upv1.Check{} - case k8upv1.PruneKind: - result = &k8upv1.Prune{} - default: - return nil, fmt.Errorf("unrecognized controller kind in owner reference: %s", controllerReference.Kind) - } - - // fetch the owner object - err := r.Kube.Get(ctx, types.NamespacedName{Name: controllerReference.Name, Namespace: batchJob.Namespace}, result) - if err != nil { - return nil, fmt.Errorf("cannot get resource: %s/%s/%s: %w", controllerReference.Kind, batchJob.Namespace, batchJob.Name, err) - } - return result, nil -} - -func (r *JobReconciler) updateOwner(ctx context.Context, batchJob *batchv1.Job, owner k8upv1.JobObject) error { - log := ctrl.LoggerFrom(ctx) - - // update status conditions based on Job status - ownerStatus := owner.GetStatus() - message := fmt.Sprintf("job '%s' has %d active, %d succeeded and %d failed pods", - batchJob.Name, batchJob.Status.Active, batchJob.Status.Succeeded, batchJob.Status.Failed) - - successCond := FindStatusCondition(batchJob.Status.Conditions, batchv1.JobComplete) - if successCond != nil && successCond.Status == corev1.ConditionTrue { - if !ownerStatus.HasSucceeded() { - // only increase success counter if new condition - monitoring.IncSuccessCounters(batchJob.Namespace, owner.GetType()) - log.Info("Job succeeded") - } - ownerStatus.SetSucceeded(message) - ownerStatus.SetFinished(fmt.Sprintf("job '%s' completed successfully", batchJob.Name)) - } - failedCond := FindStatusCondition(batchJob.Status.Conditions, batchv1.JobFailed) - if failedCond != nil && failedCond.Status == corev1.ConditionTrue { - if !ownerStatus.HasFailed() { - // only increase fail counter if new condition - monitoring.IncFailureCounters(batchJob.Namespace, owner.GetType()) - log.Info("Job failed") - } - ownerStatus.SetFailed(message) - ownerStatus.SetFinished(fmt.Sprintf("job '%s' has failed", batchJob.Name)) - } - if successCond == nil && failedCond == nil { - ownerStatus.SetStarted(message) - } - owner.SetStatus(ownerStatus) - return r.Kube.Status().Update(ctx, owner) -} - -func (r *JobReconciler) removeFinalizer(ctx context.Context, obj *batchv1.Job) error { - _, err := controllerutil.CreateOrUpdate(ctx, r.Kube, obj, func() error { - // update to a new K8up version: Ensure that all finalizers get removed. - controllerutil.RemoveFinalizer(obj, jobFinalizerName) - controllerutil.RemoveFinalizer(obj, "k8up.syn.tools/jobobserver") // legacy finalizer - return nil - }) - if err != nil { - return fmt.Errorf("could not update finalizers: %w", err) - } - return nil -} diff --git a/operator/jobcontroller/controller_integration_test.go b/operator/jobcontroller/controller_integration_test.go deleted file mode 100644 index ed9db044c..000000000 --- a/operator/jobcontroller/controller_integration_test.go +++ /dev/null @@ -1,134 +0,0 @@ -//go:build integration - -package jobcontroller - -import ( - "context" - "testing" - - k8upv1 "github.com/k8up-io/k8up/v2/api/v1" - "github.com/k8up-io/k8up/v2/envtest" - "github.com/stretchr/testify/suite" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/uuid" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -type JobTestSuite struct { - envtest.Suite - - CancelCtx context.CancelFunc - Controller JobReconciler -} - -func Test_Backup(t *testing.T) { - suite.Run(t, new(JobTestSuite)) -} - -func (ts *JobTestSuite) BeforeTest(_, _ string) { - ts.Controller = JobReconciler{ - Kube: ts.Client, - } - ts.Ctx, ts.CancelCtx = context.WithCancel(context.Background()) -} - -func (ts *JobTestSuite) Test_GivenRunningJob_ThenKeepBackupProgressing() { - // Arrange - backup := ts.newBackup() - backupJob := ts.newJob(backup) - ts.EnsureResources(backup, backupJob) - backup.Status.Started = true - backupJob.Status.Active = 1 - ts.UpdateStatus(backup, backupJob) - - // Act - ts.whenReconciling(backupJob) - - // Assert - result := &k8upv1.Backup{} - ts.FetchResource(types.NamespacedName{Namespace: backup.Namespace, Name: backup.Name}, result) - - ts.assertCondition(result.Status.Conditions, k8upv1.ConditionProgressing, k8upv1.ReasonStarted, metav1.ConditionTrue) - ts.assertCondition(result.Status.Conditions, k8upv1.ConditionReady, k8upv1.ReasonReady, metav1.ConditionTrue) - ts.Assert().Len(result.Status.Conditions, 2, "amount of conditions") -} - -func (ts *JobTestSuite) Test_GivenCompletedJob_ThenCompleteBackup() { - // Arrange - backup := ts.newBackup() - backupJob := ts.newJob(backup) - ts.EnsureResources(backup, backupJob) - backup.Status.Started = true - backupJob.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}} - ts.UpdateStatus(backup, backupJob) - - // Act - ts.whenReconciling(backupJob) - - // Assert - result := &k8upv1.Backup{} - ts.FetchResource(types.NamespacedName{Namespace: backup.Namespace, Name: backup.Name}, result) - - ts.assertCondition(result.Status.Conditions, k8upv1.ConditionCompleted, k8upv1.ReasonSucceeded, metav1.ConditionTrue) - ts.assertCondition(result.Status.Conditions, k8upv1.ConditionProgressing, k8upv1.ReasonFinished, metav1.ConditionFalse) - ts.Assert().Len(result.Status.Conditions, 2, "amount of conditions") -} - -func (ts *JobTestSuite) Test_GivenFailedJob_ThenCompleteBackup() { - // Arrange - backup := ts.newBackup() - backupJob := ts.newJob(backup) - ts.EnsureResources(backup, backupJob) - backup.Status.Started = true - backupJob.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobFailed, Status: corev1.ConditionTrue}} - ts.UpdateStatus(backup, backupJob) - - // Act - ts.whenReconciling(backupJob) - - // Assert - result := &k8upv1.Backup{} - ts.FetchResource(types.NamespacedName{Namespace: backup.Namespace, Name: backup.Name}, result) - ts.assertCondition(result.Status.Conditions, k8upv1.ConditionCompleted, k8upv1.ReasonFailed, metav1.ConditionTrue) - ts.assertCondition(result.Status.Conditions, k8upv1.ConditionProgressing, k8upv1.ReasonFinished, metav1.ConditionFalse) - ts.Assert().Len(result.Status.Conditions, 2, "amount of conditions") -} - -func (ts *JobTestSuite) assertCondition(conditions []metav1.Condition, condType k8upv1.ConditionType, reason k8upv1.ConditionReason, status metav1.ConditionStatus) { - cond := meta.FindStatusCondition(conditions, condType.String()) - ts.Require().NotNil(cond, "condition of type %s missing", condType) - ts.Assert().Equal(reason.String(), cond.Reason, "condition %s doesn't contain reason %s", condType, reason) - ts.Assert().Equal(status, cond.Status, "condition %s isn't %s", condType, status) -} - -func (ts *JobTestSuite) whenReconciling(object *batchv1.Job) controllerruntime.Result { - result, err := ts.Controller.Provision(ts.Ctx, object) - ts.Require().NoError(err) - - return result -} - -func (ts *JobTestSuite) newBackup() *k8upv1.Backup { - obj := &k8upv1.Backup{} - obj.Name = "backup" - obj.Namespace = ts.NS - obj.UID = uuid.NewUUID() - return obj -} - -func (ts *JobTestSuite) newJob(owner client.Object) *batchv1.Job { - jb := &batchv1.Job{} - jb.Name = "backup-job" - jb.Namespace = ts.NS - jb.Labels = labels.Set{k8upv1.LabelK8upType: k8upv1.BackupType.String()} - jb.Spec.Template.Spec.Containers = []corev1.Container{{Name: "container", Image: "image"}} - jb.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure - ts.Assert().NoError(controllerruntime.SetControllerReference(owner, jb, ts.Scheme), "set controller ref") - return jb -} diff --git a/operator/jobcontroller/setup.go b/operator/jobcontroller/setup.go deleted file mode 100644 index c412450d9..000000000 --- a/operator/jobcontroller/setup.go +++ /dev/null @@ -1,32 +0,0 @@ -package jobcontroller - -import ( - "github.com/k8up-io/k8up/v2/operator/job" - "github.com/k8up-io/k8up/v2/operator/reconciler" - batchv1 "k8s.io/api/batch/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/predicate" -) - -// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=batch,resources=jobs/status;jobs/finalizers,verbs=get;update;patch - -// SetupWithManager configures the reconciler. -func SetupWithManager(mgr ctrl.Manager) error { - name := "job.k8up.io" - pred, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{MatchLabels: map[string]string{ - job.K8uplabel: "true", - }}) - if err != nil { - return err - } - r := reconciler.NewReconciler[*batchv1.Job, *batchv1.JobList](mgr.GetClient(), &JobReconciler{ - Kube: mgr.GetClient(), - }) - return ctrl.NewControllerManagedBy(mgr). - Named(name). - For(&batchv1.Job{}, builder.WithPredicates(pred)). - Complete(r) -} diff --git a/operator/prunecontroller/controller.go b/operator/prunecontroller/controller.go index 80fa1c982..3399b9bb7 100644 --- a/operator/prunecontroller/controller.go +++ b/operator/prunecontroller/controller.go @@ -8,6 +8,7 @@ import ( "github.com/k8up-io/k8up/v2/operator/cfg" "github.com/k8up-io/k8up/v2/operator/job" "github.com/k8up-io/k8up/v2/operator/locker" + "k8s.io/apimachinery/pkg/types" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -25,21 +26,30 @@ func (r *PruneReconciler) NewObjectList() *k8upv1.PruneList { return &k8upv1.PruneList{} } -func (r *PruneReconciler) Provision(ctx context.Context, prune *k8upv1.Prune) (controllerruntime.Result, error) { +func (r *PruneReconciler) Provision(ctx context.Context, obj *k8upv1.Prune) (controllerruntime.Result, error) { log := controllerruntime.LoggerFrom(ctx) - if prune.Status.HasStarted() { - return controllerruntime.Result{}, nil - } repository := cfg.Config.GetGlobalRepository() - if prune.Spec.Backend != nil { - repository = prune.Spec.Backend.String() + if obj.Spec.Backend != nil { + repository = obj.Spec.Backend.String() } - config := job.NewConfig(r.Kube, prune, repository) + config := job.NewConfig(r.Kube, obj, repository) executor := NewPruneExecutor(config) - if prune.Status.HasFinished() { - executor.cleanupOldPrunes(ctx, prune) + jobKey := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: executor.jobName(), + } + if err := job.ReconcileJobStatus(ctx, jobKey, r.Kube, obj); err != nil { + return controllerruntime.Result{}, err + } + + if obj.Status.HasStarted() { + log.V(1).Info("prune just started, waiting") + return controllerruntime.Result{}, nil + } + if obj.Status.HasFinished() { + executor.cleanupOldPrunes(ctx, obj) return controllerruntime.Result{}, nil } diff --git a/operator/prunecontroller/executor.go b/operator/prunecontroller/executor.go index a599d168a..e86f59521 100644 --- a/operator/prunecontroller/executor.go +++ b/operator/prunecontroller/executor.go @@ -32,9 +32,8 @@ func NewPruneExecutor(config job.Config) *PruneExecutor { // Execute creates the actual batch.job on the k8s api. func (p *PruneExecutor) Execute(ctx context.Context) error { - batchJob := &batchv1.Job{} - batchJob.Name = k8upv1.PruneType.String() + "-" + p.prune.Name + batchJob.Name = p.jobName() batchJob.Namespace = p.prune.Namespace _, err := controllerutil.CreateOrUpdate(ctx, p.Client, batchJob, func() error { @@ -58,6 +57,10 @@ func (p *PruneExecutor) Execute(ctx context.Context) error { return nil } +func (p *PruneExecutor) jobName() string { + return k8upv1.PruneType.String() + "-" + p.prune.Name +} + // Exclusive should return true for jobs that can't run while other jobs run. func (p *PruneExecutor) Exclusive() bool { return true diff --git a/operator/prunecontroller/setup.go b/operator/prunecontroller/setup.go index f4e8d3ada..e69529b3a 100644 --- a/operator/prunecontroller/setup.go +++ b/operator/prunecontroller/setup.go @@ -3,12 +3,14 @@ package prunecontroller import ( k8upv1 "github.com/k8up-io/k8up/v2/api/v1" "github.com/k8up-io/k8up/v2/operator/reconciler" + batchv1 "k8s.io/api/batch/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/predicate" ) // +kubebuilder:rbac:groups=k8up.io,resources=prunes,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=k8up.io,resources=prunes/status;prunes/finalizers,verbs=get;update;patch +// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete // SetupWithManager configures the reconciler. func SetupWithManager(mgr ctrl.Manager) error { @@ -18,6 +20,7 @@ func SetupWithManager(mgr ctrl.Manager) error { }) return ctrl.NewControllerManagedBy(mgr). For(&k8upv1.Prune{}). + Owns(&batchv1.Job{}). Named(name). WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) diff --git a/operator/restorecontroller/controller.go b/operator/restorecontroller/controller.go index 673e3d831..6ed6fbc87 100644 --- a/operator/restorecontroller/controller.go +++ b/operator/restorecontroller/controller.go @@ -8,6 +8,7 @@ import ( "github.com/k8up-io/k8up/v2/operator/cfg" "github.com/k8up-io/k8up/v2/operator/job" "github.com/k8up-io/k8up/v2/operator/locker" + "k8s.io/apimachinery/pkg/types" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -25,22 +26,31 @@ func (r *RestoreReconciler) NewObjectList() *k8upv1.RestoreList { return &k8upv1.RestoreList{} } -func (r *RestoreReconciler) Provision(ctx context.Context, restore *k8upv1.Restore) (controllerruntime.Result, error) { +func (r *RestoreReconciler) Provision(ctx context.Context, obj *k8upv1.Restore) (controllerruntime.Result, error) { log := controllerruntime.LoggerFrom(ctx) - if restore.Status.HasStarted() { - return controllerruntime.Result{}, nil - } - repository := cfg.Config.GetGlobalRepository() - if restore.Spec.Backend != nil { - repository = restore.Spec.Backend.String() + if obj.Spec.Backend != nil { + repository = obj.Spec.Backend.String() } - config := job.NewConfig(r.Kube, restore, repository) + config := job.NewConfig(r.Kube, obj, repository) executor := NewRestoreExecutor(config) - if restore.Status.HasFinished() { - executor.cleanupOldRestores(ctx, restore) + jobKey := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: executor.jobName(), + } + if err := job.ReconcileJobStatus(ctx, jobKey, r.Kube, obj); err != nil { + return controllerruntime.Result{}, err + } + + if obj.Status.HasStarted() { + log.V(1).Info("restore just started, waiting") + return controllerruntime.Result{}, nil + } + + if obj.Status.HasFinished() { + executor.cleanupOldRestores(ctx, obj) return controllerruntime.Result{}, nil } diff --git a/operator/restorecontroller/executor.go b/operator/restorecontroller/executor.go index 61161b0a7..ecfb0677f 100644 --- a/operator/restorecontroller/executor.go +++ b/operator/restorecontroller/executor.go @@ -60,7 +60,7 @@ func (r *RestoreExecutor) cleanupOldRestores(ctx context.Context, restore *k8upv func (r *RestoreExecutor) createRestoreObject(ctx context.Context, restore *k8upv1.Restore) (*batchv1.Job, error) { batchJob := &batchv1.Job{} - batchJob.Name = k8upv1.RestoreType.String() + "-" + r.Obj.GetName() + batchJob.Name = r.jobName() batchJob.Namespace = restore.Namespace _, err := controllerutil.CreateOrUpdate(ctx, r.Client, batchJob, func() error { mutateErr := job.MutateBatchJob(batchJob, restore, r.Config) @@ -83,6 +83,10 @@ func (r *RestoreExecutor) createRestoreObject(ctx context.Context, restore *k8up return batchJob, err } +func (r *RestoreExecutor) jobName() string { + return k8upv1.RestoreType.String() + "-" + r.Obj.GetName() +} + func (r *RestoreExecutor) args(restore *k8upv1.Restore) ([]string, error) { args := []string{"-restore"} diff --git a/operator/restorecontroller/setup.go b/operator/restorecontroller/setup.go index c5d10950c..0dd0dd725 100644 --- a/operator/restorecontroller/setup.go +++ b/operator/restorecontroller/setup.go @@ -3,12 +3,14 @@ package restorecontroller import ( k8upv1 "github.com/k8up-io/k8up/v2/api/v1" "github.com/k8up-io/k8up/v2/operator/reconciler" + batchv1 "k8s.io/api/batch/v1" "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/predicate" ) // +kubebuilder:rbac:groups=k8up.io,resources=restores,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=k8up.io,resources=restores/status;restores/finalizers,verbs=get;update;patch +// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete // SetupWithManager configures the reconciler. func SetupWithManager(mgr controllerruntime.Manager) error { @@ -18,6 +20,7 @@ func SetupWithManager(mgr controllerruntime.Manager) error { }) return controllerruntime.NewControllerManagedBy(mgr). For(&k8upv1.Restore{}). + Owns(&batchv1.Job{}). Named(name). WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r)