Skip to content

Commit

Permalink
Merge pull request #591 from Danil-Grigorev/agent-deployed-based-on-r…
Browse files Browse the repository at this point in the history
…eady

Check agent deployment based on `Ready` condition
  • Loading branch information
alexander-demicev authored Jul 9, 2024
2 parents b37d451 + dffc6f2 commit 0795c35
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 22 deletions.
71 changes: 71 additions & 0 deletions internal/controllers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/http"
"time"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -179,6 +180,41 @@ func createImportManifest(ctx context.Context, remoteClient client.Client, in io
return nil
}

func validateImportReadiness(ctx context.Context, remoteClient client.Client, in io.Reader) (bool, error) {
log := log.FromContext(ctx)

jobs := &batchv1.JobList{}
if err := remoteClient.List(ctx, jobs, client.MatchingLabels(map[string]string{"cattle.io/creator": "norman"})); err != nil {
return false, fmt.Errorf("error looking for cleanup job: %w", err)
}

for _, job := range jobs.Items {
if job.GenerateName == "cattle-cleanup-" {
log.Info("cleanup job is being performed, waiting...", "gvk", job.GroupVersionKind(), "name", job.GetName(), "namespace", job.GetNamespace())
return true, nil
}
}

reader := yamlDecoder.NewYAMLReader(bufio.NewReaderSize(in, 4096))

for {
raw, err := reader.Read()
if errors.Is(err, io.EOF) {
break
}

if err != nil {
return false, err
}

if requeue, err := verifyRawManifest(ctx, remoteClient, raw); err != nil || requeue {
return requeue, err
}
}

return false, nil
}

func createRawManifest(ctx context.Context, remoteClient client.Client, bytes []byte) error {
items, err := utilyaml.ToUnstructured(bytes)
if err != nil {
Expand All @@ -194,6 +230,41 @@ func createRawManifest(ctx context.Context, remoteClient client.Client, bytes []
return nil
}

func verifyRawManifest(ctx context.Context, remoteClient client.Client, bytes []byte) (bool, error) {
items, err := utilyaml.ToUnstructured(bytes)
if err != nil {
return false, fmt.Errorf("error unmarshalling bytes or empty object passed: %w", err)
}

for _, obj := range items {
if requeue, err := checkDeletion(ctx, remoteClient, obj.DeepCopy()); err != nil || requeue {
return requeue, err
}
}

return false, nil
}

func checkDeletion(ctx context.Context, c client.Client, obj client.Object) (bool, error) {
log := log.FromContext(ctx)
gvk := obj.GetObjectKind().GroupVersionKind()

err := c.Get(ctx, client.ObjectKeyFromObject(obj), obj)
if apierrors.IsNotFound(err) {
log.V(4).Info("object is missing, ready to be created", "gvk", gvk, "name", obj.GetName(), "namespace", obj.GetNamespace())
return false, nil
} else if err != nil {
return false, fmt.Errorf("checking object in remote cluster: %w", err)
}

if obj.GetDeletionTimestamp() != nil {
log.Info("object is being deleted, waiting", "gvk", gvk, "name", obj.GetName(), "namespace", obj.GetNamespace())
return true, nil
}

return false, nil
}

func createObject(ctx context.Context, c client.Client, obj client.Object) error {
log := log.FromContext(ctx)
gvk := obj.GetObjectKind().GroupVersionKind()
Expand Down
9 changes: 8 additions & 1 deletion internal/controllers/import_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (r *CAPIImportReconciler) reconcileNormal(ctx context.Context, capiCluster
log.Info("found cluster name", "name", rancherCluster.Status.ClusterName)

if rancherCluster.Status.AgentDeployed {
log.Info("agent already deployed, no action needed")
log.Info("agent is deployed, no action needed")
return ctrl.Result{}, nil
}

Expand All @@ -273,6 +273,13 @@ func (r *CAPIImportReconciler) reconcileNormal(ctx context.Context, capiCluster
return ctrl.Result{}, fmt.Errorf("getting remote cluster client: %w", err)
}

if requeue, err := validateImportReadiness(ctx, remoteClient, strings.NewReader(manifest)); err != nil {
return ctrl.Result{}, fmt.Errorf("verifying import manifest: %w", err)
} else if requeue {
log.Info("Import manifests are being deleted, not ready to be applied yet, requeue")
return ctrl.Result{RequeueAfter: defaultRequeueDuration}, nil
}

if err := createImportManifest(ctx, remoteClient, strings.NewReader(manifest)); err != nil {
return ctrl.Result{}, fmt.Errorf("creating import manifest: %w", err)
}
Expand Down
11 changes: 9 additions & 2 deletions internal/controllers/import_controller_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ func (r *CAPIImportManagementV3Reconciler) reconcileNormal(ctx context.Context,
log.Info("Successfully propagated labels to Rancher cluster")
}

if conditions.IsTrue(rancherCluster, managementv3.ClusterConditionAgentDeployed) {
log.Info("agent already deployed, no action needed")
if conditions.IsTrue(rancherCluster, managementv3.ClusterConditionReady) {
log.Info("agent is ready, no action needed")
return ctrl.Result{}, nil
}

Expand All @@ -337,6 +337,13 @@ func (r *CAPIImportManagementV3Reconciler) reconcileNormal(ctx context.Context,
return ctrl.Result{}, fmt.Errorf("getting remote cluster client: %w", err)
}

if requeue, err := validateImportReadiness(ctx, remoteClient, strings.NewReader(manifest)); err != nil {
return ctrl.Result{}, fmt.Errorf("verifying import manifest: %w", err)
} else if requeue {
log.Info("Import manifests are being deleted, not ready to be applied yet, requeue")
return ctrl.Result{RequeueAfter: defaultRequeueDuration}, nil
}

if err := createImportManifest(ctx, remoteClient, strings.NewReader(manifest)); err != nil {
return ctrl.Result{}, fmt.Errorf("creating import manifest: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/controllers/import_controller_v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ var _ = Describe("reconcile CAPI Cluster", func() {
cluster := rancherClusters.Items[0]
Expect(cluster.Name).To(ContainSubstring("c-"))

conditions.Set(&cluster, conditions.TrueCondition(managementv3.ClusterConditionAgentDeployed))
Expect(conditions.IsTrue(&cluster, managementv3.ClusterConditionAgentDeployed)).To(BeTrue())
conditions.Set(&cluster, conditions.TrueCondition(managementv3.ClusterConditionReady))
Expect(conditions.IsTrue(&cluster, managementv3.ClusterConditionReady)).To(BeTrue())
Expect(cl.Status().Update(ctx, &cluster)).To(Succeed())

_, err := r.Reconcile(ctx, reconcile.Request{
Expand Down Expand Up @@ -477,8 +477,8 @@ var _ = Describe("reconcile CAPI Cluster", func() {

Eventually(ctx, func(g Gomega) {
g.Expect(cl.Get(ctx, client.ObjectKeyFromObject(rancherCluster), rancherCluster)).To(Succeed())
conditions.Set(rancherCluster, conditions.TrueCondition(managementv3.ClusterConditionAgentDeployed))
g.Expect(conditions.IsTrue(rancherCluster, managementv3.ClusterConditionAgentDeployed)).To(BeTrue())
conditions.Set(rancherCluster, conditions.TrueCondition(managementv3.ClusterConditionReady))
g.Expect(conditions.IsTrue(rancherCluster, managementv3.ClusterConditionReady)).To(BeTrue())
g.Expect(cl.Status().Update(ctx, rancherCluster)).To(Succeed())
}).Should(Succeed())

Expand Down
14 changes: 14 additions & 0 deletions internal/rancher/provisioning/v1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package v1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
)

// Cluster is the struct representing a Rancher Cluster.
Expand All @@ -41,6 +43,8 @@ type ClusterStatus struct {
ClusterName string `json:"clusterName,omitempty"`
AgentDeployed bool `json:"agentDeployed,omitempty"`
Ready bool `json:"ready,omitempty"`

Conditions clusterv1.Conditions `json:"conditions,omitempty"`
}

// ClusterList contains a list of ClusterList.
Expand All @@ -51,6 +55,16 @@ type ClusterList struct {
Items []Cluster `json:"items"`
}

// GetConditions method to implement capi conditions getter interface.
func (c *Cluster) GetConditions() clusterv1.Conditions {
return c.Status.Conditions
}

// SetConditions method to implement capi conditions setter interface.
func (c *Cluster) SetConditions(conditions clusterv1.Conditions) {
c.Status.Conditions = conditions
}

func init() {
SchemeBuilder.Register(&Cluster{}, &ClusterList{})
}
10 changes: 9 additions & 1 deletion internal/rancher/provisioning/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 0 additions & 14 deletions test/e2e/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,6 @@ func CreateRepoName(specName string) string {
}

func DumpSpecResourcesAndCleanup(ctx context.Context, specName string, clusterProxy framework.ClusterProxy, artifactFolder string, namespace *corev1.Namespace, cancelWatches context.CancelFunc, capiCluster *types.NamespacedName, intervalsGetter func(spec, key string) []interface{}, skipCleanup bool) {
turtlesframework.Byf("Dumping logs from the %q workload cluster", capiCluster.Name)

// Dump all the logs from the workload cluster before deleting them.
clusterProxy.CollectWorkloadClusterLogs(ctx, capiCluster.Namespace, capiCluster.Name, filepath.Join(artifactFolder, "clusters", capiCluster.Name))

turtlesframework.Byf("Dumping all the Cluster API resources in the %q namespace", namespace.Name)

// Dump all Cluster API related resources to artifacts before deleting them.
framework.DumpAllResources(ctx, framework.DumpAllResourcesInput{
Lister: clusterProxy.GetClient(),
Namespace: namespace.Name,
LogPath: filepath.Join(artifactFolder, "clusters", clusterProxy.GetName(), "resources"),
})

if !skipCleanup {
turtlesframework.Byf("Deleting cluster %s", capiCluster)
// While https://github.com/kubernetes-sigs/cluster-api/issues/2955 is addressed in future iterations, there is a chance
Expand Down

0 comments on commit 0795c35

Please sign in to comment.