diff --git a/controllers/cluster_controller.go b/controllers/cluster_controller.go index 0848083fc2..1556d4cf93 100644 --- a/controllers/cluster_controller.go +++ b/controllers/cluster_controller.go @@ -202,7 +202,10 @@ func (r *ClusterReconciler) reconcile(ctx context.Context, cluster *apiv1.Cluste } // Ensure we reconcile the orphan resources if present when we reconcile for the first time a cluster - if err := r.reconcileRestoredCluster(ctx, cluster); err != nil { + if res, err := r.reconcileRestoredCluster(ctx, cluster); res != nil || err != nil { + if res != nil { + return *res, nil + } return ctrl.Result{}, fmt.Errorf("cannot reconcile restored Cluster: %w", err) } diff --git a/controllers/cluster_restore.go b/controllers/cluster_restore.go index 0694cfafa0..2a77dcffed 100644 --- a/controllers/cluster_restore.go +++ b/controllers/cluster_restore.go @@ -18,8 +18,11 @@ package controllers import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" @@ -31,29 +34,39 @@ import ( // reconcileRestoredCluster ensures that we own again any orphan resources when cluster gets reconciled for // the first time -func (r *ClusterReconciler) reconcileRestoredCluster(ctx context.Context, cluster *apiv1.Cluster) error { +func (r *ClusterReconciler) reconcileRestoredCluster( + ctx context.Context, + cluster *apiv1.Cluster, +) (*ctrl.Result, error) { contextLogger := log.FromContext(ctx) // No need to check this on a cluster which has been already deployed if cluster.Status.LatestGeneratedNode != 0 { - return nil + return nil, nil } // Get the list of PVCs belonging to this cluster but not owned by it pvcs, err := getOrphanPVCs(ctx, r.Client, cluster) if err != nil { - return err + return nil, err } if len(pvcs) == 0 { contextLogger.Info("no orphan PVCs found, skipping the restored cluster reconciliation") - return nil + return nil, nil } - contextLogger.Info("found orphan pvcs, trying to restore the cluster", "pvcs", pvcs) + if res, err := ensureClusterRestoreCanStart(ctx, r.Client, cluster); res != nil || err != nil { + return res, err + } + + if err := ensureOrphanPodsAreDeleted(ctx, r.Client, cluster); err != nil { + return nil, fmt.Errorf("encountered an error while deleting an orphan pod: %w", err) + } + highestSerial, primarySerial, err := getNodeSerialsFromPVCs(pvcs) if err != nil { - return err + return nil, err } if primarySerial == 0 { @@ -63,16 +76,27 @@ func (r *ClusterReconciler) reconcileRestoredCluster(ctx context.Context, cluste contextLogger.Debug("proceeding to remove the fencing annotation if present") if err := ensureClusterIsNotFenced(ctx, r.Client, cluster); err != nil { - return err + return nil, err } contextLogger.Debug("proceeding to restore the cluster status") if err := restoreClusterStatus(ctx, r.Client, cluster, highestSerial, primarySerial); err != nil { - return err + return nil, err } contextLogger.Debug("restored the cluster status, proceeding to restore the orphan PVCS") - return restoreOrphanPVCs(ctx, r.Client, cluster, pvcs) + return nil, restoreOrphanPVCs(ctx, r.Client, cluster, pvcs) +} + +// ensureClusterRestoreCanStart is a function where the plugins can inject their custom logic to tell the +// restore process to wait before starting the process +// nolint: revive +func ensureClusterRestoreCanStart( + ctx context.Context, + c client.Client, + cluster *apiv1.Cluster, +) (*ctrl.Result, error) { + return nil, nil } func ensureClusterIsNotFenced( @@ -148,6 +172,49 @@ func getOrphanPVCs( return orphanPVCs, nil } +func ensureOrphanPodsAreDeleted(ctx context.Context, c client.Client, cluster *apiv1.Cluster) error { + contextLogger := log.FromContext(ctx).WithValues("orphan_pod_cleaner") + + var podList corev1.PodList + if err := c.List( + ctx, + &podList, + client.InNamespace(cluster.Namespace), + client.MatchingLabels{utils.ClusterLabelName: cluster.Name}, + ); err != nil { + return err + } + + orphanPodList := make([]corev1.Pod, 0, podList.Size()) + orphanPodNames := make([]string, 0, podList.Size()) + for idx := range podList.Items { + pod := podList.Items[idx] + if len(pod.OwnerReferences) == 0 { + orphanPodList = append(orphanPodList, pod) + orphanPodNames = append(orphanPodNames, pod.Name) + } + } + + if len(orphanPodList) == 0 { + return nil + } + + contextLogger.Info( + "Found one or more orphan pods, deleting them", + "orphanPodNames", orphanPodNames, + ) + + for idx := range orphanPodList { + pod := orphanPodList[idx] + contextLogger.Debug("Deleting orphan pod", "podName", pod.Name) + if err := c.Delete(ctx, &pod); err != nil && !apierrs.IsNotFound(err) { + return err + } + } + + return nil +} + // getNodeSerialsFromPVCs tries to obtain the highestSerial and the primary serial from a group of PVCs func getNodeSerialsFromPVCs( pvcs []corev1.PersistentVolumeClaim,