From dffc6f21ecb47af8eb30919a427209d60a7de17b Mon Sep 17 00:00:00 2001 From: Danil-Grigorev Date: Mon, 8 Jul 2024 15:09:34 +0200 Subject: [PATCH] Delay cluster import until manifests are deleted - Prevent executing cluster import until the child cluster manifests are fully removed. - e2e: Rely on crust-gather for logs collection - Check for cleanup job before import Signed-off-by: Danil-Grigorev --- internal/controllers/helpers.go | 71 +++++++++++++++++++ internal/controllers/import_controller.go | 9 ++- internal/controllers/import_controller_v3.go | 11 ++- .../controllers/import_controller_v3_test.go | 8 +-- internal/rancher/provisioning/v1/cluster.go | 14 ++++ .../provisioning/v1/zz_generated.deepcopy.go | 10 ++- test/e2e/helpers.go | 14 ---- 7 files changed, 115 insertions(+), 22 deletions(-) diff --git a/internal/controllers/helpers.go b/internal/controllers/helpers.go index e81ad35e..ba81fbff 100644 --- a/internal/controllers/helpers.go +++ b/internal/controllers/helpers.go @@ -26,6 +26,7 @@ import ( "net/http" "time" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -179,6 +180,41 @@ func createImportManifest(ctx context.Context, remoteClient client.Client, in io return nil } +func validateImportReadiness(ctx context.Context, remoteClient client.Client, in io.Reader) (bool, error) { + log := log.FromContext(ctx) + + jobs := &batchv1.JobList{} + if err := remoteClient.List(ctx, jobs, client.MatchingLabels(map[string]string{"cattle.io/creator": "norman"})); err != nil { + return false, fmt.Errorf("error looking for cleanup job: %w", err) + } + + for _, job := range jobs.Items { + if job.GenerateName == "cattle-cleanup-" { + log.Info("cleanup job is being performed, waiting...", "gvk", job.GroupVersionKind(), "name", job.GetName(), "namespace", job.GetNamespace()) + return true, nil + } + } + + reader := yamlDecoder.NewYAMLReader(bufio.NewReaderSize(in, 4096)) + + for { + raw, err := reader.Read() + if errors.Is(err, io.EOF) { + break + } + + if err != nil { + return false, err + } + + if requeue, err := verifyRawManifest(ctx, remoteClient, raw); err != nil || requeue { + return requeue, err + } + } + + return false, nil +} + func createRawManifest(ctx context.Context, remoteClient client.Client, bytes []byte) error { items, err := utilyaml.ToUnstructured(bytes) if err != nil { @@ -194,6 +230,41 @@ func createRawManifest(ctx context.Context, remoteClient client.Client, bytes [] return nil } +func verifyRawManifest(ctx context.Context, remoteClient client.Client, bytes []byte) (bool, error) { + items, err := utilyaml.ToUnstructured(bytes) + if err != nil { + return false, fmt.Errorf("error unmarshalling bytes or empty object passed: %w", err) + } + + for _, obj := range items { + if requeue, err := checkDeletion(ctx, remoteClient, obj.DeepCopy()); err != nil || requeue { + return requeue, err + } + } + + return false, nil +} + +func checkDeletion(ctx context.Context, c client.Client, obj client.Object) (bool, error) { + log := log.FromContext(ctx) + gvk := obj.GetObjectKind().GroupVersionKind() + + err := c.Get(ctx, client.ObjectKeyFromObject(obj), obj) + if apierrors.IsNotFound(err) { + log.V(4).Info("object is missing, ready to be created", "gvk", gvk, "name", obj.GetName(), "namespace", obj.GetNamespace()) + return false, nil + } else if err != nil { + return false, fmt.Errorf("checking object in remote cluster: %w", err) + } + + if obj.GetDeletionTimestamp() != nil { + log.Info("object is being deleted, waiting", "gvk", gvk, "name", obj.GetName(), "namespace", obj.GetNamespace()) + return true, nil + } + + return false, nil +} + func createObject(ctx context.Context, c client.Client, obj client.Object) error { log := log.FromContext(ctx) gvk := obj.GetObjectKind().GroupVersionKind() diff --git a/internal/controllers/import_controller.go b/internal/controllers/import_controller.go index 5d4280af..ee909f7b 100644 --- a/internal/controllers/import_controller.go +++ b/internal/controllers/import_controller.go @@ -251,7 +251,7 @@ func (r *CAPIImportReconciler) reconcileNormal(ctx context.Context, capiCluster log.Info("found cluster name", "name", rancherCluster.Status.ClusterName) if rancherCluster.Status.AgentDeployed { - log.Info("agent already deployed, no action needed") + log.Info("agent is deployed, no action needed") return ctrl.Result{}, nil } @@ -273,6 +273,13 @@ func (r *CAPIImportReconciler) reconcileNormal(ctx context.Context, capiCluster return ctrl.Result{}, fmt.Errorf("getting remote cluster client: %w", err) } + if requeue, err := validateImportReadiness(ctx, remoteClient, strings.NewReader(manifest)); err != nil { + return ctrl.Result{}, fmt.Errorf("verifying import manifest: %w", err) + } else if requeue { + log.Info("Import manifests are being deleted, not ready to be applied yet, requeue") + return ctrl.Result{RequeueAfter: defaultRequeueDuration}, nil + } + if err := createImportManifest(ctx, remoteClient, strings.NewReader(manifest)); err != nil { return ctrl.Result{}, fmt.Errorf("creating import manifest: %w", err) } diff --git a/internal/controllers/import_controller_v3.go b/internal/controllers/import_controller_v3.go index 3fa92000..d6106cdd 100644 --- a/internal/controllers/import_controller_v3.go +++ b/internal/controllers/import_controller_v3.go @@ -314,8 +314,8 @@ func (r *CAPIImportManagementV3Reconciler) reconcileNormal(ctx context.Context, log.Info("Successfully propagated labels to Rancher cluster") } - if conditions.IsTrue(rancherCluster, managementv3.ClusterConditionAgentDeployed) { - log.Info("agent already deployed, no action needed") + if conditions.IsTrue(rancherCluster, managementv3.ClusterConditionReady) { + log.Info("agent is ready, no action needed") return ctrl.Result{}, nil } @@ -337,6 +337,13 @@ func (r *CAPIImportManagementV3Reconciler) reconcileNormal(ctx context.Context, return ctrl.Result{}, fmt.Errorf("getting remote cluster client: %w", err) } + if requeue, err := validateImportReadiness(ctx, remoteClient, strings.NewReader(manifest)); err != nil { + return ctrl.Result{}, fmt.Errorf("verifying import manifest: %w", err) + } else if requeue { + log.Info("Import manifests are being deleted, not ready to be applied yet, requeue") + return ctrl.Result{RequeueAfter: defaultRequeueDuration}, nil + } + if err := createImportManifest(ctx, remoteClient, strings.NewReader(manifest)); err != nil { return ctrl.Result{}, fmt.Errorf("creating import manifest: %w", err) } diff --git a/internal/controllers/import_controller_v3_test.go b/internal/controllers/import_controller_v3_test.go index 53010ac0..5eca51b5 100644 --- a/internal/controllers/import_controller_v3_test.go +++ b/internal/controllers/import_controller_v3_test.go @@ -332,8 +332,8 @@ var _ = Describe("reconcile CAPI Cluster", func() { cluster := rancherClusters.Items[0] Expect(cluster.Name).To(ContainSubstring("c-")) - conditions.Set(&cluster, conditions.TrueCondition(managementv3.ClusterConditionAgentDeployed)) - Expect(conditions.IsTrue(&cluster, managementv3.ClusterConditionAgentDeployed)).To(BeTrue()) + conditions.Set(&cluster, conditions.TrueCondition(managementv3.ClusterConditionReady)) + Expect(conditions.IsTrue(&cluster, managementv3.ClusterConditionReady)).To(BeTrue()) Expect(cl.Status().Update(ctx, &cluster)).To(Succeed()) _, err := r.Reconcile(ctx, reconcile.Request{ @@ -477,8 +477,8 @@ var _ = Describe("reconcile CAPI Cluster", func() { Eventually(ctx, func(g Gomega) { g.Expect(cl.Get(ctx, client.ObjectKeyFromObject(rancherCluster), rancherCluster)).To(Succeed()) - conditions.Set(rancherCluster, conditions.TrueCondition(managementv3.ClusterConditionAgentDeployed)) - g.Expect(conditions.IsTrue(rancherCluster, managementv3.ClusterConditionAgentDeployed)).To(BeTrue()) + conditions.Set(rancherCluster, conditions.TrueCondition(managementv3.ClusterConditionReady)) + g.Expect(conditions.IsTrue(rancherCluster, managementv3.ClusterConditionReady)).To(BeTrue()) g.Expect(cl.Status().Update(ctx, rancherCluster)).To(Succeed()) }).Should(Succeed()) diff --git a/internal/rancher/provisioning/v1/cluster.go b/internal/rancher/provisioning/v1/cluster.go index 5453b002..f229c08e 100644 --- a/internal/rancher/provisioning/v1/cluster.go +++ b/internal/rancher/provisioning/v1/cluster.go @@ -18,6 +18,8 @@ package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" ) // Cluster is the struct representing a Rancher Cluster. @@ -41,6 +43,8 @@ type ClusterStatus struct { ClusterName string `json:"clusterName,omitempty"` AgentDeployed bool `json:"agentDeployed,omitempty"` Ready bool `json:"ready,omitempty"` + + Conditions clusterv1.Conditions `json:"conditions,omitempty"` } // ClusterList contains a list of ClusterList. @@ -51,6 +55,16 @@ type ClusterList struct { Items []Cluster `json:"items"` } +// GetConditions method to implement capi conditions getter interface. +func (c *Cluster) GetConditions() clusterv1.Conditions { + return c.Status.Conditions +} + +// SetConditions method to implement capi conditions setter interface. +func (c *Cluster) SetConditions(conditions clusterv1.Conditions) { + c.Status.Conditions = conditions +} + func init() { SchemeBuilder.Register(&Cluster{}, &ClusterList{}) } diff --git a/internal/rancher/provisioning/v1/zz_generated.deepcopy.go b/internal/rancher/provisioning/v1/zz_generated.deepcopy.go index f70e6fb9..971c78fb 100644 --- a/internal/rancher/provisioning/v1/zz_generated.deepcopy.go +++ b/internal/rancher/provisioning/v1/zz_generated.deepcopy.go @@ -23,6 +23,7 @@ package v1 import ( corev1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/cluster-api/api/v1beta1" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -31,7 +32,7 @@ func (in *Cluster) DeepCopyInto(out *Cluster) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Cluster. @@ -107,6 +108,13 @@ func (in *ClusterSpec) DeepCopy() *ClusterSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterStatus) DeepCopyInto(out *ClusterStatus) { *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make(v1beta1.Conditions, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterStatus. diff --git a/test/e2e/helpers.go b/test/e2e/helpers.go index 21cb1eee..49c2208f 100644 --- a/test/e2e/helpers.go +++ b/test/e2e/helpers.go @@ -60,20 +60,6 @@ func CreateRepoName(specName string) string { } func DumpSpecResourcesAndCleanup(ctx context.Context, specName string, clusterProxy framework.ClusterProxy, artifactFolder string, namespace *corev1.Namespace, cancelWatches context.CancelFunc, capiCluster *types.NamespacedName, intervalsGetter func(spec, key string) []interface{}, skipCleanup bool) { - turtlesframework.Byf("Dumping logs from the %q workload cluster", capiCluster.Name) - - // Dump all the logs from the workload cluster before deleting them. - clusterProxy.CollectWorkloadClusterLogs(ctx, capiCluster.Namespace, capiCluster.Name, filepath.Join(artifactFolder, "clusters", capiCluster.Name)) - - turtlesframework.Byf("Dumping all the Cluster API resources in the %q namespace", namespace.Name) - - // Dump all Cluster API related resources to artifacts before deleting them. - framework.DumpAllResources(ctx, framework.DumpAllResourcesInput{ - Lister: clusterProxy.GetClient(), - Namespace: namespace.Name, - LogPath: filepath.Join(artifactFolder, "clusters", clusterProxy.GetName(), "resources"), - }) - if !skipCleanup { turtlesframework.Byf("Deleting cluster %s", capiCluster) // While https://github.com/kubernetes-sigs/cluster-api/issues/2955 is addressed in future iterations, there is a chance