Skip to content

Commit

Permalink
fix(vm): add sync metadata handler (#176)
Browse files Browse the repository at this point in the history
Add sync metadata handler.
---------

Signed-off-by: yaroslavborbat <[email protected]>
  • Loading branch information
yaroslavborbat authored Jul 3, 2024
1 parent d0a9659 commit c8660ac
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 23 deletions.
21 changes: 13 additions & 8 deletions images/virtualization-artifact/pkg/controller/vm/internal/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ func (h *AgentHandler) syncAgentReady(vm *virtv2.VirtualMachine, kvvmi *virtv1.V
for _, c := range kvvmi.Status.Conditions {
// TODO: wrap kvvmi reasons
if c.Type == virtv1.VirtualMachineInstanceAgentConnected {
mgr.Update(cb.
Status(metav1.ConditionStatus(c.Status)).
Reason(c.Reason).
Message(c.Message).
Condition())
status := conditionStatus(string(c.Status))
cb.Status(status).Reason(c.Reason)
if status != metav1.ConditionTrue {
cb.Message(c.Message)
}
mgr.Update(cb.Condition())
vm.Status.Conditions = mgr.Generate()
return
}
Expand All @@ -116,7 +117,7 @@ func (h *AgentHandler) syncAgentVersionNotSupport(vm *virtv2.VirtualMachine, kvv
cb := conditions.NewConditionBuilder2(vmcondition.TypeAgentVersionNotSupported).Generation(vm.GetGeneration())

if kvvmi == nil {
mgr.Update(cb.Status(metav1.ConditionUnknown).
mgr.Update(cb.Status(metav1.ConditionFalse).
Reason2(vmcondition.ReasonAgentNotReady).
Message("Failed to check version, because Vm is not running.").
Condition())
Expand All @@ -125,8 +126,12 @@ func (h *AgentHandler) syncAgentVersionNotSupport(vm *virtv2.VirtualMachine, kvv
}
for _, c := range kvvmi.Status.Conditions {
if c.Type == virtv1.VirtualMachineInstanceUnsupportedAgent {
mgr.Update(cb.Status(conditionStatus(string(c.Status))).
Reason(c.Reason).Message(c.Message).Condition())
status := conditionStatus(string(c.Status))
cb.Status(status).Reason(c.Reason)
if status != metav1.ConditionTrue {
cb.Message(c.Message)
}
mgr.Update(cb.Condition())
vm.Status.Conditions = mgr.Generate()
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,11 @@ func (h *BlockDeviceHandler) Handle(ctx context.Context, s state.VirtualMachineS
Message(msg).
Condition())
changed.Status.Conditions = mgr.Generate()
return reconcile.Result{RequeueAfter: 2 * time.Second}, nil
return reconcile.Result{RequeueAfter: 60 * time.Second}, nil
}

mgr.Update(cb.Status(metav1.ConditionTrue).
Reason2(vmcondition.ReasonBlockDevicesAttachmentReady).
Message(fmt.Sprintf("All block devices are ready: %d/%d", countBD, countBD)).
Condition())
changed.Status.Conditions = mgr.Generate()
return reconcile.Result{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func (h *LifeCycleHandler) syncMigrationState(vm *virtv2.VirtualMachine, kvvmi *
vm.Status.MigrationState.EndTimestamp == nil {
mgr.Update(cb.
Reason2(vmcondition.ReasonVmIsMigrating).
Message(fmt.Sprintf("Migration started at %q", vm.Status.MigrationState.StartTimestamp)).
Status(metav1.ConditionTrue).
Condition())
vm.Status.Conditions = mgr.Generate()
Expand All @@ -160,7 +159,6 @@ func (h *LifeCycleHandler) syncPodStarted(vm *virtv2.VirtualMachine, pod *corev1
mgr.Update(cb.
Status(metav1.ConditionTrue).
Reason2(vmcondition.ReasonPodStarted).
Message(fmt.Sprintf("Pod started at %q", pod.Status.StartTime.String())).
Condition())
vm.Status.Conditions = mgr.Generate()
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func (h *ProvisioningHandler) Handle(ctx context.Context, s state.VirtualMachine
if current.Spec.Provisioning == nil {
mgr.Update(cb.Status(metav1.ConditionTrue).
Reason2(vmcondition.ReasonProvisioningReady).
Message("Provisioning is not defined.").
Condition())
changed.Status.Conditions = mgr.Generate()
return reconcile.Result{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (h *SyncKvvmHandler) syncKVVM(ctx context.Context, s state.VirtualMachineSt
Generation(current.GetGeneration())
err = h.createKVVM(ctx, s)
if err != nil {
cb.Status(metav1.ConditionTrue).
cb.Status(metav1.ConditionFalse).
Reason2(vmcondition.ReasonConfigurationNotApplied).
Message(fmt.Sprintf("Failed to apply configuration: %s", err.Error()))
} else {
Expand Down Expand Up @@ -235,13 +235,11 @@ func (h *SyncKvvmHandler) syncKVVM(ctx context.Context, s state.VirtualMachineSt
Generation(current.GetGeneration()).
Status(metav1.ConditionTrue).
Reason2(vmcondition.ReasonConfigurationApplied).
Message("No changes found to apply.").
Condition())
mgr.Update(conditions.NewConditionBuilder2(vmcondition.TypeAwaitingRestartToApplyConfiguration).
Generation(current.GetGeneration()).
Status(metav1.ConditionFalse).
Reason2(vmcondition.ReasonRestartNoNeed).
Message("No changes found to apply.").
Condition())
}
changed.Status.Conditions = mgr.Generate()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
Copyright 2024 Flant JSC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
"context"
"encoding/json"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
virtv1 "kubevirt.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

merger "github.com/deckhouse/virtualization-controller/pkg/common"
"github.com/deckhouse/virtualization-controller/pkg/controller/common"
"github.com/deckhouse/virtualization-controller/pkg/controller/vm/internal/state"
virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2"
)

const nameSyncMetadataHandler = "SyncMetadataHandler"

func NewSyncMetadataHandler(client client.Client) *SyncMetadataHandler {
return &SyncMetadataHandler{client: client}
}

type SyncMetadataHandler struct {
client client.Client
}

func (h *SyncMetadataHandler) Handle(ctx context.Context, s state.VirtualMachineState) (reconcile.Result, error) {
if isDeletion(s.VirtualMachine().Current()) {
return reconcile.Result{}, nil
}

kvvm, err := s.KVVM(ctx)
if err != nil {
return reconcile.Result{}, err
}
if kvvm == nil {
return reconcile.Result{}, nil
}

current := s.VirtualMachine().Current()

// Propagate user specified labels and annotations from the d8 VM to kubevirt VM.
metaUpdated, err := PropagateVMMetadata(current, kvvm, kvvm)
if err != nil {
return reconcile.Result{}, err
}

if metaUpdated {
if err = h.client.Update(ctx, kvvm); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update metadata KubeVirt VM %q: %w", kvvm.GetName(), err)
}
}

kvvmi, err := s.KVVMI(ctx)
if err != nil {
return reconcile.Result{}, err
}
// Propagate user specified labels and annotations from the d8 VM to the kubevirt VirtualMachineInstance.
if kvvmi != nil {
metaUpdated, err = PropagateVMMetadata(current, kvvm, kvvmi)
if err != nil {
return reconcile.Result{}, err
}

if metaUpdated {
if err = h.client.Update(ctx, kvvmi); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update metadata KubeVirt VMI %q: %w", kvvmi.GetName(), err)
}
}
}

pods, err := s.Pods(ctx)
if err != nil {
return reconcile.Result{}, err
}

// Propagate user specified labels and annotations from the d8 VM to the kubevirt virtual machine Pods.
if pods != nil {
for _, pod := range pods.Items {
// Update only Running pods.
if pod.Status.Phase != corev1.PodRunning {
continue
}
metaUpdated, err = PropagateVMMetadata(current, kvvm, &pod)
if err != nil {
return reconcile.Result{}, err
}

if metaUpdated {
if err = h.client.Update(ctx, &pod); err != nil {
return reconcile.Result{}, fmt.Errorf("fauled to update KubeVirt Pod %q: %w", pod.GetName(), err)
}
}
}
}
err = SetLastPropagatedLabels(kvvm, current)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to set last propagated labels: %w", err)
}

err = SetLastPropagatedAnnotations(kvvm, current)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to set last propagated annotations: %w", err)
}

return reconcile.Result{}, nil
}

func (h *SyncMetadataHandler) Name() string {
return nameSyncMetadataHandler
}

// PropagateVMMetadata merges labels and annotations from the input VM into destination object.
// Attach related labels and some dangerous annotations are not copied.
// Return true if destination object was changed.
func PropagateVMMetadata(vm *virtv2.VirtualMachine, kvvm *virtv1.VirtualMachine, destObj client.Object) (bool, error) {
// No changes if dest is nil.
if destObj == nil {
return false, nil
}

// 1. Propagate labels.
lastPropagatedLabels, err := GetLastPropagatedLabels(kvvm)
if err != nil {
return false, err
}

newLabels, labelsChanged := merger.ApplyMapChanges(destObj.GetLabels(), lastPropagatedLabels, vm.GetLabels())
if labelsChanged {
destObj.SetLabels(newLabels)
}

// 1. Propagate annotations.
lastPropagatedAnno, err := GetLastPropagatedAnnotations(kvvm)
if err != nil {
return false, err
}

// Remove dangerous annotations.
curAnno := RemoveNonPropagatableAnnotations(vm.GetAnnotations())

newAnno, annoChanged := merger.ApplyMapChanges(destObj.GetAnnotations(), lastPropagatedAnno, curAnno)
if annoChanged {
destObj.SetAnnotations(newAnno)
}

return labelsChanged || annoChanged, nil
}

func GetLastPropagatedLabels(kvvm *virtv1.VirtualMachine) (map[string]string, error) {
var lastPropagatedLabels map[string]string

if kvvm.Annotations[common.LastPropagatedVMLabelsAnnotation] != "" {
err := json.Unmarshal([]byte(kvvm.Annotations[common.LastPropagatedVMLabelsAnnotation]), &lastPropagatedLabels)
if err != nil {
return nil, err
}
}

return lastPropagatedLabels, nil
}

func SetLastPropagatedLabels(kvvm *virtv1.VirtualMachine, vm *virtv2.VirtualMachine) error {
data, err := json.Marshal(vm.GetLabels())
if err != nil {
return err
}

common.AddLabel(kvvm, common.LastPropagatedVMLabelsAnnotation, string(data))

return nil
}

func GetLastPropagatedAnnotations(kvvm *virtv1.VirtualMachine) (map[string]string, error) {
var lastPropagatedAnno map[string]string

if kvvm.Annotations[common.LastPropagatedVMAnnotationsAnnotation] != "" {
err := json.Unmarshal([]byte(kvvm.Annotations[common.LastPropagatedVMAnnotationsAnnotation]), &lastPropagatedAnno)
if err != nil {
return nil, err
}
}

return lastPropagatedAnno, nil
}

func SetLastPropagatedAnnotations(kvvm *virtv1.VirtualMachine, vm *virtv2.VirtualMachine) error {
data, err := json.Marshal(RemoveNonPropagatableAnnotations(vm.GetAnnotations()))
if err != nil {
return err
}

common.AddLabel(kvvm, common.LastPropagatedVMAnnotationsAnnotation, string(data))

return nil
}

// RemoveNonPropagatableAnnotations removes well known annotations that are dangerous to propagate.
func RemoveNonPropagatableAnnotations(anno map[string]string) map[string]string {
res := make(map[string]string)

for k, v := range anno {
if k == common.LastPropagatedVMAnnotationsAnnotation || k == common.LastPropagatedVMLabelsAnnotation {
continue
}

if strings.HasPrefix(k, "kubectl.kubernetes.io") {
continue
}

res[k] = v
}
return res
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,16 @@ func NewController(
logger := log.With("controller", controllerName)
recorder := mgr.GetEventRecorderFor(controllerName)
mgrCache := mgr.GetCache()
client := mgr.GetClient()
handlers := []Handler{
internal.NewDeletionHandler(mgr.GetClient(), logger),
internal.NewCPUHandler(mgr.GetClient(), recorder, logger),
internal.NewIPAMHandler(ipam.New(), mgr.GetClient(), recorder, logger),
internal.NewBlockDeviceHandler(mgr.GetClient(), recorder, logger),
internal.NewProvisioningHandler(mgr.GetClient()),
internal.NewDeletionHandler(client, logger),
internal.NewCPUHandler(client, recorder, logger),
internal.NewIPAMHandler(ipam.New(), client, recorder, logger),
internal.NewBlockDeviceHandler(client, recorder, logger),
internal.NewProvisioningHandler(client),
internal.NewAgentHandler(),
internal.NewSyncKvvmHandler(dvcrSettings, mgr.GetClient(), recorder, logger),
internal.NewSyncKvvmHandler(dvcrSettings, client, recorder, logger),
internal.NewSyncMetadataHandler(client),
internal.NewLifeCycleHandler(mgr.GetClient(), recorder, logger),
}
r := NewReconciler(mgr.GetClient(), logger, handlers...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
var handlerErr error

for _, h := range r.handlers {
r.logger.Info("Run handler", slog.String("name", h.Name()))
r.logger.Debug("Run handler", slog.String("name", h.Name()))
res, err := h.Handle(ctx, s)
if err != nil {
r.logger.Error("The handler failed with an error", slog.String("name", h.Name()), log.SlogErr(err))
Expand Down

0 comments on commit c8660ac

Please sign in to comment.