From c8660ac171251d4f822f74c43f46decbff41d388 Mon Sep 17 00:00:00 2001 From: Yaroslav Borbat <86148689+yaroslavborbat@users.noreply.github.com> Date: Wed, 3 Jul 2024 19:54:46 +0300 Subject: [PATCH] fix(vm): add sync metadata handler (#176) Add sync metadata handler. --------- Signed-off-by: yaroslavborbat --- .../pkg/controller/vm/internal/agent.go | 21 +- .../controller/vm/internal/block_device.go | 3 +- .../pkg/controller/vm/internal/lifecycle.go | 2 - .../controller/vm/internal/provisioning.go | 1 - .../pkg/controller/vm/internal/sync_kvvm.go | 4 +- .../controller/vm/internal/sync_metadata.go | 233 ++++++++++++++++++ .../pkg/controller/vm/vm_controller.go | 14 +- .../pkg/controller/vm/vm_reconciler.go | 2 +- 8 files changed, 257 insertions(+), 23 deletions(-) create mode 100644 images/virtualization-artifact/pkg/controller/vm/internal/sync_metadata.go diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/agent.go b/images/virtualization-artifact/pkg/controller/vm/internal/agent.go index 828a8876b..567d0259e 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/agent.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/agent.go @@ -92,11 +92,12 @@ func (h *AgentHandler) syncAgentReady(vm *virtv2.VirtualMachine, kvvmi *virtv1.V for _, c := range kvvmi.Status.Conditions { // TODO: wrap kvvmi reasons if c.Type == virtv1.VirtualMachineInstanceAgentConnected { - mgr.Update(cb. - Status(metav1.ConditionStatus(c.Status)). - Reason(c.Reason). - Message(c.Message). - Condition()) + status := conditionStatus(string(c.Status)) + cb.Status(status).Reason(c.Reason) + if status != metav1.ConditionTrue { + cb.Message(c.Message) + } + mgr.Update(cb.Condition()) vm.Status.Conditions = mgr.Generate() return } @@ -116,7 +117,7 @@ func (h *AgentHandler) syncAgentVersionNotSupport(vm *virtv2.VirtualMachine, kvv cb := conditions.NewConditionBuilder2(vmcondition.TypeAgentVersionNotSupported).Generation(vm.GetGeneration()) if kvvmi == nil { - mgr.Update(cb.Status(metav1.ConditionUnknown). + mgr.Update(cb.Status(metav1.ConditionFalse). Reason2(vmcondition.ReasonAgentNotReady). Message("Failed to check version, because Vm is not running."). Condition()) @@ -125,8 +126,12 @@ func (h *AgentHandler) syncAgentVersionNotSupport(vm *virtv2.VirtualMachine, kvv } for _, c := range kvvmi.Status.Conditions { if c.Type == virtv1.VirtualMachineInstanceUnsupportedAgent { - mgr.Update(cb.Status(conditionStatus(string(c.Status))). - Reason(c.Reason).Message(c.Message).Condition()) + status := conditionStatus(string(c.Status)) + cb.Status(status).Reason(c.Reason) + if status != metav1.ConditionTrue { + cb.Message(c.Message) + } + mgr.Update(cb.Condition()) vm.Status.Conditions = mgr.Generate() return } diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/block_device.go b/images/virtualization-artifact/pkg/controller/vm/internal/block_device.go index 25e8fb1fc..23e3022ae 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/block_device.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/block_device.go @@ -132,12 +132,11 @@ func (h *BlockDeviceHandler) Handle(ctx context.Context, s state.VirtualMachineS Message(msg). Condition()) changed.Status.Conditions = mgr.Generate() - return reconcile.Result{RequeueAfter: 2 * time.Second}, nil + return reconcile.Result{RequeueAfter: 60 * time.Second}, nil } mgr.Update(cb.Status(metav1.ConditionTrue). Reason2(vmcondition.ReasonBlockDevicesAttachmentReady). - Message(fmt.Sprintf("All block devices are ready: %d/%d", countBD, countBD)). Condition()) changed.Status.Conditions = mgr.Generate() return reconcile.Result{}, nil diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/lifecycle.go b/images/virtualization-artifact/pkg/controller/vm/internal/lifecycle.go index cb3c7ce04..054b0e731 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/lifecycle.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/lifecycle.go @@ -135,7 +135,6 @@ func (h *LifeCycleHandler) syncMigrationState(vm *virtv2.VirtualMachine, kvvmi * vm.Status.MigrationState.EndTimestamp == nil { mgr.Update(cb. Reason2(vmcondition.ReasonVmIsMigrating). - Message(fmt.Sprintf("Migration started at %q", vm.Status.MigrationState.StartTimestamp)). Status(metav1.ConditionTrue). Condition()) vm.Status.Conditions = mgr.Generate() @@ -160,7 +159,6 @@ func (h *LifeCycleHandler) syncPodStarted(vm *virtv2.VirtualMachine, pod *corev1 mgr.Update(cb. Status(metav1.ConditionTrue). Reason2(vmcondition.ReasonPodStarted). - Message(fmt.Sprintf("Pod started at %q", pod.Status.StartTime.String())). Condition()) vm.Status.Conditions = mgr.Generate() return diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/provisioning.go b/images/virtualization-artifact/pkg/controller/vm/internal/provisioning.go index 0ff9568dc..d20733288 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/provisioning.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/provisioning.go @@ -66,7 +66,6 @@ func (h *ProvisioningHandler) Handle(ctx context.Context, s state.VirtualMachine if current.Spec.Provisioning == nil { mgr.Update(cb.Status(metav1.ConditionTrue). Reason2(vmcondition.ReasonProvisioningReady). - Message("Provisioning is not defined."). Condition()) changed.Status.Conditions = mgr.Generate() return reconcile.Result{}, nil diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/sync_kvvm.go b/images/virtualization-artifact/pkg/controller/vm/internal/sync_kvvm.go index d1a2c06ad..6cec0dacd 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/sync_kvvm.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/sync_kvvm.go @@ -154,7 +154,7 @@ func (h *SyncKvvmHandler) syncKVVM(ctx context.Context, s state.VirtualMachineSt Generation(current.GetGeneration()) err = h.createKVVM(ctx, s) if err != nil { - cb.Status(metav1.ConditionTrue). + cb.Status(metav1.ConditionFalse). Reason2(vmcondition.ReasonConfigurationNotApplied). Message(fmt.Sprintf("Failed to apply configuration: %s", err.Error())) } else { @@ -235,13 +235,11 @@ func (h *SyncKvvmHandler) syncKVVM(ctx context.Context, s state.VirtualMachineSt Generation(current.GetGeneration()). Status(metav1.ConditionTrue). Reason2(vmcondition.ReasonConfigurationApplied). - Message("No changes found to apply."). Condition()) mgr.Update(conditions.NewConditionBuilder2(vmcondition.TypeAwaitingRestartToApplyConfiguration). Generation(current.GetGeneration()). Status(metav1.ConditionFalse). Reason2(vmcondition.ReasonRestartNoNeed). - Message("No changes found to apply."). Condition()) } changed.Status.Conditions = mgr.Generate() diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/sync_metadata.go b/images/virtualization-artifact/pkg/controller/vm/internal/sync_metadata.go new file mode 100644 index 000000000..eeddf8a13 --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vm/internal/sync_metadata.go @@ -0,0 +1,233 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + virtv1 "kubevirt.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + merger "github.com/deckhouse/virtualization-controller/pkg/common" + "github.com/deckhouse/virtualization-controller/pkg/controller/common" + "github.com/deckhouse/virtualization-controller/pkg/controller/vm/internal/state" + virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +const nameSyncMetadataHandler = "SyncMetadataHandler" + +func NewSyncMetadataHandler(client client.Client) *SyncMetadataHandler { + return &SyncMetadataHandler{client: client} +} + +type SyncMetadataHandler struct { + client client.Client +} + +func (h *SyncMetadataHandler) Handle(ctx context.Context, s state.VirtualMachineState) (reconcile.Result, error) { + if isDeletion(s.VirtualMachine().Current()) { + return reconcile.Result{}, nil + } + + kvvm, err := s.KVVM(ctx) + if err != nil { + return reconcile.Result{}, err + } + if kvvm == nil { + return reconcile.Result{}, nil + } + + current := s.VirtualMachine().Current() + + // Propagate user specified labels and annotations from the d8 VM to kubevirt VM. + metaUpdated, err := PropagateVMMetadata(current, kvvm, kvvm) + if err != nil { + return reconcile.Result{}, err + } + + if metaUpdated { + if err = h.client.Update(ctx, kvvm); err != nil { + return reconcile.Result{}, fmt.Errorf("failed to update metadata KubeVirt VM %q: %w", kvvm.GetName(), err) + } + } + + kvvmi, err := s.KVVMI(ctx) + if err != nil { + return reconcile.Result{}, err + } + // Propagate user specified labels and annotations from the d8 VM to the kubevirt VirtualMachineInstance. + if kvvmi != nil { + metaUpdated, err = PropagateVMMetadata(current, kvvm, kvvmi) + if err != nil { + return reconcile.Result{}, err + } + + if metaUpdated { + if err = h.client.Update(ctx, kvvmi); err != nil { + return reconcile.Result{}, fmt.Errorf("failed to update metadata KubeVirt VMI %q: %w", kvvmi.GetName(), err) + } + } + } + + pods, err := s.Pods(ctx) + if err != nil { + return reconcile.Result{}, err + } + + // Propagate user specified labels and annotations from the d8 VM to the kubevirt virtual machine Pods. + if pods != nil { + for _, pod := range pods.Items { + // Update only Running pods. + if pod.Status.Phase != corev1.PodRunning { + continue + } + metaUpdated, err = PropagateVMMetadata(current, kvvm, &pod) + if err != nil { + return reconcile.Result{}, err + } + + if metaUpdated { + if err = h.client.Update(ctx, &pod); err != nil { + return reconcile.Result{}, fmt.Errorf("fauled to update KubeVirt Pod %q: %w", pod.GetName(), err) + } + } + } + } + err = SetLastPropagatedLabels(kvvm, current) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to set last propagated labels: %w", err) + } + + err = SetLastPropagatedAnnotations(kvvm, current) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to set last propagated annotations: %w", err) + } + + return reconcile.Result{}, nil +} + +func (h *SyncMetadataHandler) Name() string { + return nameSyncMetadataHandler +} + +// PropagateVMMetadata merges labels and annotations from the input VM into destination object. +// Attach related labels and some dangerous annotations are not copied. +// Return true if destination object was changed. +func PropagateVMMetadata(vm *virtv2.VirtualMachine, kvvm *virtv1.VirtualMachine, destObj client.Object) (bool, error) { + // No changes if dest is nil. + if destObj == nil { + return false, nil + } + + // 1. Propagate labels. + lastPropagatedLabels, err := GetLastPropagatedLabels(kvvm) + if err != nil { + return false, err + } + + newLabels, labelsChanged := merger.ApplyMapChanges(destObj.GetLabels(), lastPropagatedLabels, vm.GetLabels()) + if labelsChanged { + destObj.SetLabels(newLabels) + } + + // 1. Propagate annotations. + lastPropagatedAnno, err := GetLastPropagatedAnnotations(kvvm) + if err != nil { + return false, err + } + + // Remove dangerous annotations. + curAnno := RemoveNonPropagatableAnnotations(vm.GetAnnotations()) + + newAnno, annoChanged := merger.ApplyMapChanges(destObj.GetAnnotations(), lastPropagatedAnno, curAnno) + if annoChanged { + destObj.SetAnnotations(newAnno) + } + + return labelsChanged || annoChanged, nil +} + +func GetLastPropagatedLabels(kvvm *virtv1.VirtualMachine) (map[string]string, error) { + var lastPropagatedLabels map[string]string + + if kvvm.Annotations[common.LastPropagatedVMLabelsAnnotation] != "" { + err := json.Unmarshal([]byte(kvvm.Annotations[common.LastPropagatedVMLabelsAnnotation]), &lastPropagatedLabels) + if err != nil { + return nil, err + } + } + + return lastPropagatedLabels, nil +} + +func SetLastPropagatedLabels(kvvm *virtv1.VirtualMachine, vm *virtv2.VirtualMachine) error { + data, err := json.Marshal(vm.GetLabels()) + if err != nil { + return err + } + + common.AddLabel(kvvm, common.LastPropagatedVMLabelsAnnotation, string(data)) + + return nil +} + +func GetLastPropagatedAnnotations(kvvm *virtv1.VirtualMachine) (map[string]string, error) { + var lastPropagatedAnno map[string]string + + if kvvm.Annotations[common.LastPropagatedVMAnnotationsAnnotation] != "" { + err := json.Unmarshal([]byte(kvvm.Annotations[common.LastPropagatedVMAnnotationsAnnotation]), &lastPropagatedAnno) + if err != nil { + return nil, err + } + } + + return lastPropagatedAnno, nil +} + +func SetLastPropagatedAnnotations(kvvm *virtv1.VirtualMachine, vm *virtv2.VirtualMachine) error { + data, err := json.Marshal(RemoveNonPropagatableAnnotations(vm.GetAnnotations())) + if err != nil { + return err + } + + common.AddLabel(kvvm, common.LastPropagatedVMAnnotationsAnnotation, string(data)) + + return nil +} + +// RemoveNonPropagatableAnnotations removes well known annotations that are dangerous to propagate. +func RemoveNonPropagatableAnnotations(anno map[string]string) map[string]string { + res := make(map[string]string) + + for k, v := range anno { + if k == common.LastPropagatedVMAnnotationsAnnotation || k == common.LastPropagatedVMLabelsAnnotation { + continue + } + + if strings.HasPrefix(k, "kubectl.kubernetes.io") { + continue + } + + res[k] = v + } + return res +} diff --git a/images/virtualization-artifact/pkg/controller/vm/vm_controller.go b/images/virtualization-artifact/pkg/controller/vm/vm_controller.go index ff480c2c0..ae9a50986 100644 --- a/images/virtualization-artifact/pkg/controller/vm/vm_controller.go +++ b/images/virtualization-artifact/pkg/controller/vm/vm_controller.go @@ -49,14 +49,16 @@ func NewController( logger := log.With("controller", controllerName) recorder := mgr.GetEventRecorderFor(controllerName) mgrCache := mgr.GetCache() + client := mgr.GetClient() handlers := []Handler{ - internal.NewDeletionHandler(mgr.GetClient(), logger), - internal.NewCPUHandler(mgr.GetClient(), recorder, logger), - internal.NewIPAMHandler(ipam.New(), mgr.GetClient(), recorder, logger), - internal.NewBlockDeviceHandler(mgr.GetClient(), recorder, logger), - internal.NewProvisioningHandler(mgr.GetClient()), + internal.NewDeletionHandler(client, logger), + internal.NewCPUHandler(client, recorder, logger), + internal.NewIPAMHandler(ipam.New(), client, recorder, logger), + internal.NewBlockDeviceHandler(client, recorder, logger), + internal.NewProvisioningHandler(client), internal.NewAgentHandler(), - internal.NewSyncKvvmHandler(dvcrSettings, mgr.GetClient(), recorder, logger), + internal.NewSyncKvvmHandler(dvcrSettings, client, recorder, logger), + internal.NewSyncMetadataHandler(client), internal.NewLifeCycleHandler(mgr.GetClient(), recorder, logger), } r := NewReconciler(mgr.GetClient(), logger, handlers...) diff --git a/images/virtualization-artifact/pkg/controller/vm/vm_reconciler.go b/images/virtualization-artifact/pkg/controller/vm/vm_reconciler.go index 69a86996e..20edb8480 100644 --- a/images/virtualization-artifact/pkg/controller/vm/vm_reconciler.go +++ b/images/virtualization-artifact/pkg/controller/vm/vm_reconciler.go @@ -308,7 +308,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco var handlerErr error for _, h := range r.handlers { - r.logger.Info("Run handler", slog.String("name", h.Name())) + r.logger.Debug("Run handler", slog.String("name", h.Name())) res, err := h.Handle(ctx, s) if err != nil { r.logger.Error("The handler failed with an error", slog.String("name", h.Name()), log.SlogErr(err))