Skip to content

Commit

Permalink
fix owner reconciliation
Browse files Browse the repository at this point in the history
fixes #806

Signed-off-by: Michael Weibel <[email protected]>
  • Loading branch information
mweibel committed Jan 30, 2023
1 parent 4d91ba8 commit c8d4209
Show file tree
Hide file tree
Showing 24 changed files with 282 additions and 385 deletions.
2 changes: 0 additions & 2 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/k8up-io/k8up/v2/operator/backupcontroller"
"github.com/k8up-io/k8up/v2/operator/cfg"
"github.com/k8up-io/k8up/v2/operator/checkcontroller"
"github.com/k8up-io/k8up/v2/operator/jobcontroller"
"github.com/k8up-io/k8up/v2/operator/prunecontroller"
"github.com/k8up-io/k8up/v2/operator/restorecontroller"
"github.com/k8up-io/k8up/v2/operator/schedulecontroller"
Expand Down Expand Up @@ -117,7 +116,6 @@ func operatorMain(c *cli.Context) error {
"Archive": archivecontroller.SetupWithManager,
"Check": checkcontroller.SetupWithManager,
"Prune": prunecontroller.SetupWithManager,
"Job": jobcontroller.SetupWithManager,
} {
if setupErr := setupFn(mgr); setupErr != nil {
operatorLog.Error(setupErr, "unable to initialize operator mode", "step", "controller", "controller", name)
Expand Down
9 changes: 0 additions & 9 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ rules:
- patch
- update
- watch
- apiGroups:
- batch
resources:
- jobs/finalizers
- jobs/status
verbs:
- get
- patch
- update
- apiGroups:
- coordination.k8s.io
resources:
Expand Down
34 changes: 22 additions & 12 deletions operator/archivecontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/k8up-io/k8up/v2/operator/cfg"
"github.com/k8up-io/k8up/v2/operator/job"
"github.com/k8up-io/k8up/v2/operator/locker"
"k8s.io/apimachinery/pkg/types"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -25,25 +26,34 @@ func (r *ArchiveReconciler) NewObjectList() *k8upv1.ArchiveList {
return &k8upv1.ArchiveList{}
}

func (r *ArchiveReconciler) Provision(ctx context.Context, archive *k8upv1.Archive) (controllerruntime.Result, error) {
func (r *ArchiveReconciler) Provision(ctx context.Context, obj *k8upv1.Archive) (controllerruntime.Result, error) {
log := controllerruntime.LoggerFrom(ctx)

if archive.Status.HasStarted() {
return controllerruntime.Result{}, nil
}

repository := cfg.Config.GetGlobalRepository()
if archive.Spec.Backend != nil {
repository = archive.Spec.Backend.String()
if obj.Spec.Backend != nil {
repository = obj.Spec.Backend.String()
}
if archive.Spec.RestoreSpec == nil {
archive.Spec.RestoreSpec = &k8upv1.RestoreSpec{}
if obj.Spec.RestoreSpec == nil {
obj.Spec.RestoreSpec = &k8upv1.RestoreSpec{}
}
config := job.NewConfig(r.Kube, archive, repository)
config := job.NewConfig(r.Kube, obj, repository)
executor := NewArchiveExecutor(config)

if archive.Status.HasFinished() {
executor.cleanupOldArchives(ctx, archive)
jobKey := types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: executor.jobName(),
}
if err := job.ReconcileJobStatus(ctx, jobKey, r.Kube, obj); err != nil {
return controllerruntime.Result{}, err
}

if obj.Status.HasStarted() {
log.V(1).Info("archive just started, waiting")
return controllerruntime.Result{}, nil
}

if obj.Status.HasFinished() {
executor.cleanupOldArchives(ctx, obj)
return controllerruntime.Result{}, nil
}

Expand Down
6 changes: 5 additions & 1 deletion operator/archivecontroller/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (a *ArchiveExecutor) Execute(ctx context.Context) error {
archive := a.Obj.(*k8upv1.Archive)

batchJob := &batchv1.Job{}
batchJob.Name = k8upv1.ArchiveType.String() + "-" + a.Obj.GetName()
batchJob.Name = a.jobName()
batchJob.Namespace = archive.Namespace

_, err := controllerutil.CreateOrUpdate(ctx, a.Client, batchJob, func() error {
Expand All @@ -63,6 +63,10 @@ func (a *ArchiveExecutor) Execute(ctx context.Context) error {
return nil
}

func (a *ArchiveExecutor) jobName() string {
return k8upv1.ArchiveType.String() + "-" + a.Obj.GetName()
}

func (a *ArchiveExecutor) setupArgs(archive *k8upv1.Archive) []string {
args := []string{"-archive", "-restoreType", "s3"}

Expand Down
3 changes: 3 additions & 0 deletions operator/archivecontroller/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package archivecontroller
import (
k8upv1 "github.com/k8up-io/k8up/v2/api/v1"
"github.com/k8up-io/k8up/v2/operator/reconciler"
batchv1 "k8s.io/api/batch/v1"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// +kubebuilder:rbac:groups=k8up.io,resources=archives,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=k8up.io,resources=archives/status;archives/finalizers,verbs=get;update;patch
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete

// SetupWithManager configures the reconciler.
func SetupWithManager(mgr controllerruntime.Manager) error {
Expand All @@ -18,6 +20,7 @@ func SetupWithManager(mgr controllerruntime.Manager) error {
})
return controllerruntime.NewControllerManagedBy(mgr).
For(&k8upv1.Archive{}).
Owns(&batchv1.Job{}).
Named(name).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
Expand Down
19 changes: 14 additions & 5 deletions operator/backupcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/k8up-io/k8up/v2/operator/job"
"github.com/k8up-io/k8up/v2/operator/locker"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -30,25 +31,33 @@ func (r *BackupReconciler) NewObjectList() *k8upv1.BackupList {
func (r *BackupReconciler) Provision(ctx context.Context, obj *k8upv1.Backup) (reconcile.Result, error) {
log := controllerruntime.LoggerFrom(ctx)

if obj.Status.HasStarted() {
return controllerruntime.Result{RequeueAfter: 30 * time.Second}, nil // nothing to do, wait until finished
}

repository := cfg.Config.GetGlobalRepository()
if obj.Spec.Backend != nil {
repository = obj.Spec.Backend.String()
}
config := job.NewConfig(r.Kube, obj, repository)
executor := NewBackupExecutor(config)

jobKey := types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: executor.jobName(),
}
if err := job.ReconcileJobStatus(ctx, jobKey, r.Kube, obj); err != nil {
return controllerruntime.Result{}, err
}

if obj.Status.HasStarted() {
log.V(1).Info("backup just started, waiting")
return controllerruntime.Result{RequeueAfter: 30 * time.Second}, nil
}
if obj.Status.HasFinished() || isPrebackupFailed(obj) {
cleanupCond := meta.FindStatusCondition(obj.Status.Conditions, k8upv1.ConditionScrubbed.String())
if cleanupCond == nil || cleanupCond.Reason != k8upv1.ReasonSucceeded.String() {
executor.cleanupOldBackups(ctx)
}

prebackupCond := meta.FindStatusCondition(obj.Status.Conditions, k8upv1.ConditionPreBackupPodReady.String())
if prebackupCond.Reason == k8upv1.ReasonFinished.String() || prebackupCond.Reason == k8upv1.ReasonFailed.String() || prebackupCond.Reason == k8upv1.ReasonNoPreBackupPodsFound.String() {
if prebackupCond != nil && (prebackupCond.Reason == k8upv1.ReasonFinished.String() || prebackupCond.Reason == k8upv1.ReasonFailed.String() || prebackupCond.Reason == k8upv1.ReasonNoPreBackupPodsFound.String()) {
// only ignore future reconciles if we have stopped all prebackup deployments in an earlier reconciliation.
return controllerruntime.Result{}, nil
}
Expand Down
60 changes: 60 additions & 0 deletions operator/backupcontroller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
k8upv1 "github.com/k8up-io/k8up/v2/api/v1"
"github.com/k8up-io/k8up/v2/envtest"
"github.com/stretchr/testify/suite"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
Expand Down Expand Up @@ -44,6 +47,56 @@ func (ts *BackupTestSuite) Test_GivenBackup_ExpectBackupJob() {
ts.expectABackupJob()
}

func (ts *BackupTestSuite) Test_GivenBackup_AndJob_KeepBackupProgressing() {
backupJob := ts.newJob(ts.BackupResource)
ts.EnsureResources(ts.BackupResource, backupJob)
ts.BackupResource.Status.Started = true
backupJob.Status.Active = 1
ts.UpdateStatus(ts.BackupResource, backupJob)

ts.whenReconciling(ts.BackupResource)

result := &k8upv1.Backup{}
err := ts.Client.Get(ts.Ctx, k8upv1.MapToNamespacedName(ts.BackupResource), result)
ts.Require().NoError(err)
ts.assertCondition(result.Status.Conditions, k8upv1.ConditionProgressing, k8upv1.ReasonStarted, metav1.ConditionTrue)
ts.assertCondition(result.Status.Conditions, k8upv1.ConditionReady, k8upv1.ReasonReady, metav1.ConditionTrue)
ts.Assert().Len(result.Status.Conditions, 2, "amount of conditions")
}

func (ts *BackupTestSuite) Test_GivenBackup_AndCompletedJob_ThenCompleteBackup() {
backupJob := ts.newJob(ts.BackupResource)
ts.EnsureResources(ts.BackupResource, backupJob)
ts.BackupResource.Status.Started = true
backupJob.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}}
ts.UpdateStatus(ts.BackupResource, backupJob)

ts.whenReconciling(ts.BackupResource)

result := &k8upv1.Backup{}
ts.FetchResource(types.NamespacedName{Namespace: ts.BackupResource.Namespace, Name: ts.BackupResource.Name}, result)

ts.assertCondition(result.Status.Conditions, k8upv1.ConditionCompleted, k8upv1.ReasonSucceeded, metav1.ConditionTrue)
ts.assertCondition(result.Status.Conditions, k8upv1.ConditionProgressing, k8upv1.ReasonFinished, metav1.ConditionFalse)
ts.Assert().Len(result.Status.Conditions, 4, "amount of conditions")
}

func (ts *BackupTestSuite) Test_GivenBackup_AndFailedJob_ThenCompleteBackup() {
backupJob := ts.newJob(ts.BackupResource)
ts.EnsureResources(ts.BackupResource, backupJob)
ts.BackupResource.Status.Started = true
backupJob.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobFailed, Status: corev1.ConditionTrue}}
ts.UpdateStatus(ts.BackupResource, backupJob)

ts.whenReconciling(ts.BackupResource)

result := &k8upv1.Backup{}
ts.FetchResource(types.NamespacedName{Namespace: ts.BackupResource.Namespace, Name: ts.BackupResource.Name}, result)
ts.assertCondition(result.Status.Conditions, k8upv1.ConditionCompleted, k8upv1.ReasonFailed, metav1.ConditionTrue)
ts.assertCondition(result.Status.Conditions, k8upv1.ConditionProgressing, k8upv1.ReasonFinished, metav1.ConditionFalse)
ts.Assert().Len(result.Status.Conditions, 4, "amount of conditions")
}

func (ts *BackupTestSuite) Test_GivenBackupWithSecurityContext_ExpectBackupJobWithSecurityContext() {
ts.BackupResource = ts.newBackupWithSecurityContext()
ts.EnsureResources(ts.BackupResource)
Expand Down Expand Up @@ -166,3 +219,10 @@ func (ts *BackupTestSuite) Test_GivenBackupWithTags_WhenCreatingBackupjob_ThenHa
backupJob := ts.expectABackupJob()
ts.assertJobHasTagArguments(backupJob)
}

func (ts *BackupTestSuite) assertCondition(conditions []metav1.Condition, condType k8upv1.ConditionType, reason k8upv1.ConditionReason, status metav1.ConditionStatus) {
cond := meta.FindStatusCondition(conditions, condType.String())
ts.Require().NotNil(cond, "condition of type %s missing", condType)
ts.Assert().Equal(reason.String(), cond.Reason, "condition %s doesn't contain reason %s", condType, reason)
ts.Assert().Equal(status, cond.Status, "condition %s isn't %s", condType, status)
}
11 changes: 11 additions & 0 deletions operator/backupcontroller/controller_utils_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,14 @@ func (ts *BackupTestSuite) assertJobHasTagArguments(job *batchv1.Job) {
jobArguments := job.Spec.Template.Spec.Containers[0].Args
ts.Assert().Contains(jobArguments, backupTag, "backup tag in job args")
}

func (ts *BackupTestSuite) newJob(owner client.Object) *batchv1.Job {
jb := &batchv1.Job{}
jb.Name = k8upv1.BackupType.String() + "-" + ts.BackupResource.Name
jb.Namespace = ts.NS
jb.Labels = labels.Set{k8upv1.LabelK8upType: k8upv1.BackupType.String()}
jb.Spec.Template.Spec.Containers = []corev1.Container{{Name: "container", Image: "image"}}
jb.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
ts.Assert().NoError(controllerruntime.SetControllerReference(owner, jb, ts.Scheme), "set controller ref")
return jb
}
6 changes: 5 additions & 1 deletion operator/backupcontroller/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (b *BackupExecutor) startBackup(ctx context.Context) error {
}

batchJob := &batchv1.Job{}
batchJob.Name = k8upv1.BackupType.String() + "-" + b.backup.Name
batchJob.Name = b.jobName()
batchJob.Namespace = b.backup.Namespace

_, err = controllerruntime.CreateOrUpdate(ctx, b.Generic.Config.Client, batchJob, func() error {
Expand Down Expand Up @@ -133,3 +133,7 @@ func (b *BackupExecutor) startBackup(ctx context.Context) error {
func (b *BackupExecutor) cleanupOldBackups(ctx context.Context) {
b.Generic.CleanupOldResources(ctx, &k8upv1.BackupList{}, b.backup.Namespace, b.backup)
}

func (b *BackupExecutor) jobName() string {
return k8upv1.BackupType.String() + "-" + b.backup.Name
}
2 changes: 2 additions & 0 deletions operator/backupcontroller/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backupcontroller
import (
k8upv1 "github.com/k8up-io/k8up/v2/api/v1"
"github.com/k8up-io/k8up/v2/operator/reconciler"
batchv1 "k8s.io/api/batch/v1"
"sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
Expand All @@ -28,6 +29,7 @@ func SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).
Named(name).
For(&k8upv1.Backup{}).
Owns(&batchv1.Job{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}
30 changes: 20 additions & 10 deletions operator/checkcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/k8up-io/k8up/v2/operator/cfg"
"github.com/k8up-io/k8up/v2/operator/job"
"github.com/k8up-io/k8up/v2/operator/locker"
"k8s.io/apimachinery/pkg/types"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -25,24 +26,33 @@ func (r *CheckReconciler) NewObjectList() *k8upv1.CheckList {
return &k8upv1.CheckList{}
}

func (r *CheckReconciler) Provision(ctx context.Context, check *k8upv1.Check) (controllerruntime.Result, error) {
func (r *CheckReconciler) Provision(ctx context.Context, obj *k8upv1.Check) (controllerruntime.Result, error) {
log := controllerruntime.LoggerFrom(ctx)

if check.Status.HasStarted() {
return controllerruntime.Result{}, nil
}

repository := cfg.Config.GetGlobalRepository()
if check.Spec.Backend != nil {
repository = check.Spec.Backend.String()
if obj.Spec.Backend != nil {
repository = obj.Spec.Backend.String()
}

config := job.NewConfig(r.Kube, check, repository)
config := job.NewConfig(r.Kube, obj, repository)

executor := NewCheckExecutor(config)

if check.Status.HasFinished() {
executor.cleanupOldChecks(ctx, check)
jobKey := types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: executor.jobName(),
}
if err := job.ReconcileJobStatus(ctx, jobKey, r.Kube, obj); err != nil {
return controllerruntime.Result{}, err
}

if obj.Status.HasStarted() {
log.V(1).Info("check just started, waiting")
return controllerruntime.Result{}, nil
}

if obj.Status.HasFinished() {
executor.cleanupOldChecks(ctx, obj)
return controllerruntime.Result{}, nil
}

Expand Down
6 changes: 5 additions & 1 deletion operator/checkcontroller/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (*CheckExecutor) Exclusive() bool {
// Execute creates the actual batch.job on the k8s api.
func (c *CheckExecutor) Execute(ctx context.Context) error {
batchJob := &batchv1.Job{}
batchJob.Name = k8upv1.CheckType.String() + "-" + c.check.Name
batchJob.Name = c.jobName()
batchJob.Namespace = c.check.Namespace

_, err := controllerruntime.CreateOrUpdate(ctx, c.Client, batchJob, func() error {
Expand All @@ -63,6 +63,10 @@ func (c *CheckExecutor) Execute(ctx context.Context) error {
return nil
}

func (c *CheckExecutor) jobName() string {
return k8upv1.CheckType.String() + "-" + c.check.Name
}

func (c *CheckExecutor) setupEnvVars(ctx context.Context) []corev1.EnvVar {
log := controllerruntime.LoggerFrom(ctx)
vars := executor.NewEnvVarConverter()
Expand Down
3 changes: 3 additions & 0 deletions operator/checkcontroller/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package checkcontroller
import (
k8upv1 "github.com/k8up-io/k8up/v2/api/v1"
"github.com/k8up-io/k8up/v2/operator/reconciler"
batchv1 "k8s.io/api/batch/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// +kubebuilder:rbac:groups=k8up.io,resources=checks,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=k8up.io,resources=checks/status;checks/finalizers,verbs=get;update;patch
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete

// SetupWithManager configures the reconciler.
func SetupWithManager(mgr ctrl.Manager) error {
Expand All @@ -18,6 +20,7 @@ func SetupWithManager(mgr ctrl.Manager) error {
})
return ctrl.NewControllerManagedBy(mgr).
For(&k8upv1.Check{}).
Owns(&batchv1.Job{}).
Named(name).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
Expand Down
Loading

0 comments on commit c8d4209

Please sign in to comment.