Skip to content

Commit

Permalink
feat(controller, vmop): wait for the desired state of the vm (#84)
Browse files Browse the repository at this point in the history
wait for the desired state of the vm
---------

Signed-off-by: yaroslavborbat <[email protected]>
  • Loading branch information
yaroslavborbat authored May 16, 2024
1 parent 1ffcab7 commit 94fac98
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 45 deletions.
2 changes: 1 addition & 1 deletion images/virtualization-artifact/hack/mirrord.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ if ! kubectl -n "${NAMESPACE}" get "deployment/${NEW_NAME}" &>/dev/null; then
(.spec.template.spec.containers[] | select(.name == $CONTAINER_NAME) ) |= (.command= [ "/bin/bash", "-c", "--" ] | .args = [ "while true; do sleep 60; done;" ] ) |
.spec.replicas = 1 |
.spec.template.metadata.labels.mirror = "true" |
.spec.template.metadata.labels.ownerName = $NEW_NAME' \
.spec.template.metadata.labels.ownerName = $NEW_NAME' | \
kubectl create -f -
fi

Expand Down
121 changes: 89 additions & 32 deletions images/virtualization-artifact/pkg/controller/vmop/vmop_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand All @@ -27,62 +30,104 @@ func NewReconciler() *Reconciler {
}

func (r *Reconciler) SetupController(_ context.Context, mgr manager.Manager, ctr controller.Controller) error {
return ctr.Watch(source.Kind(mgr.GetCache(), &virtv2.VirtualMachineOperation{}), &handler.EnqueueRequestForObject{})
err := ctr.Watch(source.Kind(mgr.GetCache(), &virtv2.VirtualMachineOperation{}), &handler.EnqueueRequestForObject{})
if err != nil {
return fmt.Errorf("error setting watch on VMOP: %w", err)
}
// Subscribe on VirtualMachines.
if err = ctr.Watch(
source.Kind(mgr.GetCache(), &virtv2.VirtualMachine{}),
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, vm client.Object) []reconcile.Request {
c := mgr.GetClient()
vmops := &virtv2.VirtualMachineOperationList{}
if err := c.List(ctx, vmops, client.InNamespace(vm.GetNamespace())); err != nil {
return nil
}
var requests []reconcile.Request
for _, vmop := range vmops.Items {
if vmop.Spec.VirtualMachine == vm.GetName() && vmop.Status.Phase == virtv2.VMOPPhaseInProgress {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: vmop.GetNamespace(),
Name: vmop.GetName(),
},
})
break
}
}
return requests
}),
predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return true },
DeleteFunc: func(e event.DeleteEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
oldVM := e.ObjectOld.(*virtv2.VirtualMachine)
newVM := e.ObjectNew.(*virtv2.VirtualMachine)
return oldVM.Status.Phase != newVM.Status.Phase
},
},
); err != nil {
return fmt.Errorf("error setting watch on VirtualMachine: %w", err)
}
return nil
}

func (r *Reconciler) Sync(ctx context.Context, req reconcile.Request, state *ReconcilerState, opts two_phase_reconciler.ReconcilerOptions) error {
log := opts.Log.WithValues("vmop.name", state.VMOP.Current().GetName())

switch {
case state.IsDeletion():
log.V(1).Info("Delete VMOP, remove protective finalizers")
return r.cleanupOnDeletion(ctx, state, opts)
return r.removeFinalizers(ctx, state, opts)
case state.IsCompleted():
log.V(2).Info("VMOP completed", "namespacedName", req.String())
return r.removeFinalizers(ctx, state, opts)

case state.IsFailed():
log.V(2).Info("VMOP failed", "namespacedName", req.String())
return r.removeFinalizers(ctx, state, opts)
case !state.IsProtected():
// Set protective finalizer atomically.
if controllerutil.AddFinalizer(state.VMOP.Changed(), virtv2.FinalizerVMOPCleanup) {
state.SetReconcilerResult(&reconcile.Result{Requeue: true})
return nil
}
case state.IsCompleted():
log.V(2).Info("VMOP completed", "namespacedName", req.String())
return r.removeVMFinalizers(ctx, state, opts)

case state.IsFailed():
log.V(2).Info("VMOP failed", "namespacedName", req.String())
return r.removeVMFinalizers(ctx, state, opts)
case state.VmIsEmpty():
state.SetReconcilerResult(&reconcile.Result{RequeueAfter: 2 * time.Second})
return nil
}
found, err := state.OtherVMOPInProgress(ctx)
if err != nil {
return err
}
if found {
state.SetReconcilerResult(&reconcile.Result{Requeue: true})
state.SetReconcilerResult(&reconcile.Result{RequeueAfter: 15 * time.Second})
return nil
}
if !state.IsInProgress() {
state.SetInProgress()
state.SetReconcilerResult(&reconcile.Result{Requeue: true})
return r.ensureVMFinalizers(ctx, state, opts)
}

if !r.isOperationAllowed(state.VMOP.Current().Spec.Type, state) {
return nil
}
err = r.doOperation(ctx, state.VMOP.Current().Spec, state)
if err != nil {
msg := "The operation completed with an error."
state.SetOperationResult(false, fmt.Sprintf("%s %s", msg, err.Error()))
opts.Recorder.Event(state.VMOP.Current(), corev1.EventTypeWarning, virtv2.ReasonErrVMOPFailed, msg)
log.V(1).Error(err, msg, "vmop.name", state.VMOP.Current().GetName(), "vmop.namespace", state.VMOP.Current().GetNamespace())
} else {
err = r.ensureVMFinalizers(ctx, state, opts)
if err != nil {
return err
}
if !r.isOperationAllowed(state.VMOP.Current().Spec.Type, state) {
return nil
}
err = r.doOperation(ctx, state.VMOP.Current().Spec, state)
if err != nil {
msg := "The operation completed with an error."
state.SetOperationResult(false, fmt.Sprintf("%s %s", msg, err.Error()))
opts.Recorder.Event(state.VMOP.Current(), corev1.EventTypeWarning, virtv2.ReasonErrVMOPFailed, msg)
log.V(1).Error(err, msg, "vmop.name", state.VMOP.Current().GetName(), "vmop.namespace", state.VMOP.Current().GetNamespace())
return nil
}
state.SetOperationResult(true, "")
msg := "The operation completed without errors."
opts.Recorder.Event(state.VMOP.Current(), corev1.EventTypeNormal, virtv2.ReasonVMOPSucceeded, msg)
log.V(2).Info(msg, "vmop.name", state.VMOP.Current().GetName(), "vmop.namespace", state.VMOP.Current().GetNamespace())
return nil
}
if r.IsCompleted(state.VMOP.Current().Spec.Type, state.VM.Status.Phase) {
return nil
}
state.SetReconcilerResult(&reconcile.Result{RequeueAfter: 60 * time.Second})
return nil
}

Expand All @@ -97,7 +142,7 @@ func (r *Reconciler) UpdateStatus(_ context.Context, _ reconcile.Request, state
vmopStatus := state.VMOP.Current().Status.DeepCopy()

switch {
case state.IsFailed(), state.IsCompleted():
case state.IsFailed(), state.IsCompleted(), state.IsInProgress():
// No need to update status.
break
case vmopStatus.Phase == "":
Expand All @@ -113,19 +158,20 @@ func (r *Reconciler) UpdateStatus(_ context.Context, _ reconcile.Request, state
vmopStatus.Phase = virtv2.VMOPPhaseFailed
vmopStatus.FailureReason = virtv2.ReasonErrVMOPNotPermitted
vmopStatus.FailureMessage = fmt.Sprintf("operation %q not permitted for vm.status.phase=%q", state.VMOP.Current().Spec.Type, state.VM.Status.Phase)
case state.GetInProgress():
vmopStatus.Phase = virtv2.VMOPPhaseInProgress
}

if result := state.GetOperationResult(); result != nil {
if result.WasSuccessful() {
vmopStatus.Phase = virtv2.VMOPPhaseCompleted
vmopStatus.Phase = virtv2.VMOPPhaseInProgress
} else {
vmopStatus.Phase = virtv2.VMOPPhaseFailed
vmopStatus.FailureReason = virtv2.ReasonErrVMOPFailed
vmopStatus.FailureMessage = result.Message()
}
}
if state.IsInProgress() && r.IsCompleted(state.VMOP.Current().Spec.Type, state.VM.Status.Phase) {
vmopStatus.Phase = virtv2.VMOPPhaseCompleted
}
state.VMOP.Changed().Status = *vmopStatus
return nil
}
Expand All @@ -152,7 +198,7 @@ func (r *Reconciler) removeVMFinalizers(ctx context.Context, state *ReconcilerSt
return nil
}

func (r *Reconciler) cleanupOnDeletion(ctx context.Context, state *ReconcilerState, opts two_phase_reconciler.ReconcilerOptions) error {
func (r *Reconciler) removeFinalizers(ctx context.Context, state *ReconcilerState, opts two_phase_reconciler.ReconcilerOptions) error {
if err := r.removeVMFinalizers(ctx, state, opts); err != nil {
return err
}
Expand Down Expand Up @@ -240,3 +286,14 @@ func (r *Reconciler) isOperationAllowedForVmPhase(op virtv2.VMOPOperation, phase
return false
}
}

func (r *Reconciler) IsCompleted(op virtv2.VMOPOperation, phase virtv2.MachinePhase) bool {
switch op {
case virtv2.VMOPOperationTypeRestart, virtv2.VMOPOperationTypeStart:
return phase == virtv2.MachineRunning
case virtv2.VMOPOperationTypeStop:
return phase == virtv2.MachineStopped
default:
return false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type ReconcilerState struct {
VM *virtv2.VirtualMachine

operationResult *OperationResult
inProgress bool
}

type OperationResult struct {
Expand Down Expand Up @@ -102,7 +101,10 @@ func (state *ReconcilerState) IsDeletion() bool {
if state.VMOP.IsEmpty() {
return false
}
return state.VMOP.Current().DeletionTimestamp != nil
if !state.VmIsEmpty() && state.VM.DeletionTimestamp != nil {
return true
}
return state.VMOP.Current().DeletionTimestamp != nil && !state.IsInProgress()
}

func (state *ReconcilerState) IsProtected() bool {
Expand Down Expand Up @@ -135,8 +137,8 @@ func (state *ReconcilerState) VmIsEmpty() bool {
}

func (state *ReconcilerState) OtherVMOPInProgress(ctx context.Context) (bool, error) {
vmops := virtv2.VirtualMachineOperationList{}
err := state.Client.List(ctx, &vmops, &client.ListOptions{Namespace: state.VMOP.Current().Namespace})
var vmops virtv2.VirtualMachineOperationList
err := state.Client.List(ctx, &vmops, client.InNamespace(state.VMOP.Current().GetNamespace()))
if err != nil {
return false, err
}
Expand All @@ -161,14 +163,6 @@ func (state *ReconcilerState) GetOperationResult() *OperationResult {
return state.operationResult
}

func (state *ReconcilerState) SetInProgress() {
state.inProgress = true
}

func (state *ReconcilerState) GetInProgress() bool {
return state.inProgress
}

func (state *ReconcilerState) GetKVVM(ctx context.Context) (*virtv1.VirtualMachine, error) {
if state.VmIsEmpty() {
return nil, fmt.Errorf("VM %s not found", state.VMOP.Current().Spec.VirtualMachine)
Expand Down

0 comments on commit 94fac98

Please sign in to comment.