From 4f59766a066fd9f35a686d419b9360275ad7852b Mon Sep 17 00:00:00 2001 From: Sebastian Woehrl Date: Tue, 21 May 2024 08:46:17 +0200 Subject: [PATCH] Fix upgrade detection during parallel recovery (#789) ### Description Fixes a bug where parallel recovery did not engage if the cluster had gone through a version upgrade beforehand. Reason was that the upgrade logic added the status for each nodepool twice leading to the recovery logic incorrectly detecting if an upgrade was in progress. Also did a small refactoring of names and constants and fixed a warning by my IDE about unused parameters. No changes to the CRDs or functionality, just internal logic fixes. ### Issues Resolved Fixes #730 ### Check List - [x] Commits are signed per the DCO using --signoff - [x] Unittest added for the new/changed functionality and all unit tests are successful - [x] Customer-visible features documented - [x] No linter warnings (`make lint`) If CRDs are changed: - [-] CRD YAMLs updated (`make manifests`) and also copied into the helm chart - [-] Changes to CRDs documented By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin). Signed-off-by: Sebastian Woehrl --- .../pkg/builders/cluster_test.go | 2 +- opensearch-operator/pkg/helpers/helpers.go | 36 ++++++++-- .../pkg/reconcilers/cluster.go | 4 +- .../pkg/reconcilers/configuration_test.go | 5 -- .../pkg/reconcilers/rollingRestart.go | 3 +- opensearch-operator/pkg/reconcilers/scaler.go | 2 +- .../pkg/reconcilers/securityconfig.go | 2 +- .../pkg/reconcilers/upgrade.go | 72 ++++++++++--------- 8 files changed, 77 insertions(+), 49 deletions(-) diff --git a/opensearch-operator/pkg/builders/cluster_test.go b/opensearch-operator/pkg/builders/cluster_test.go index c7f22c53..7b0e2580 100644 --- a/opensearch-operator/pkg/builders/cluster_test.go +++ b/opensearch-operator/pkg/builders/cluster_test.go @@ -777,7 +777,7 @@ var _ = Describe("Builders", func() { Expect(result.Spec.Template.Spec.Containers[0].ReadinessProbe.FailureThreshold).To(Equal(int32(9))) }) }) - + When("Configuring InitHelper Resources", func() { It("should propagate Resources to all init containers", func() { clusterObject := ClusterDescWithVersion("2.2.1") diff --git a/opensearch-operator/pkg/helpers/helpers.go b/opensearch-operator/pkg/helpers/helpers.go index 7795f260..aa6de1d5 100644 --- a/opensearch-operator/pkg/helpers/helpers.go +++ b/opensearch-operator/pkg/helpers/helpers.go @@ -4,11 +4,12 @@ import ( "encoding/json" "errors" "fmt" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "reflect" "sort" "time" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + policyv1 "k8s.io/api/policy/v1" opsterv1 "github.com/Opster/opensearch-k8s-operator/opensearch-operator/api/v1" @@ -79,6 +80,22 @@ func FindFirstPartial( return item, false } +func FindAllPartial( + arr []opsterv1.ComponentStatus, + item opsterv1.ComponentStatus, + predicator func(opsterv1.ComponentStatus, opsterv1.ComponentStatus) (opsterv1.ComponentStatus, bool), +) []opsterv1.ComponentStatus { + var result []opsterv1.ComponentStatus + + for i := 0; i < len(arr); i++ { + itemInArr, found := predicator(arr[i], item) + if found { + result = append(result, itemInArr) + } + } + return result +} + func FindByPath(obj interface{}, keys []string) (interface{}, bool) { mobj, ok := obj.(map[string]interface{}) if !ok { @@ -116,7 +133,7 @@ func UsernameAndPassword(k8sClient k8s.K8sClient, cr *opsterv1.OpenSearchCluster } } -func GetByDescriptionAndGroup(left opsterv1.ComponentStatus, right opsterv1.ComponentStatus) (opsterv1.ComponentStatus, bool) { +func GetByDescriptionAndComponent(left opsterv1.ComponentStatus, right opsterv1.ComponentStatus) (opsterv1.ComponentStatus, bool) { if left.Description == right.Description && left.Component == right.Component { return left, true } @@ -430,12 +447,21 @@ func CalculateJvmHeapSize(nodePool *opsterv1.NodePool) string { return nodePool.Jvm } -func UpgradeInProgress(status opsterv1.ClusterStatus) bool { +func IsUpgradeInProgress(status opsterv1.ClusterStatus) bool { componentStatus := opsterv1.ComponentStatus{ Component: "Upgrader", } - _, found := FindFirstPartial(status.ComponentsStatus, componentStatus, GetByComponent) - return found + foundStatus := FindAllPartial(status.ComponentsStatus, componentStatus, GetByComponent) + inProgress := false + + // check all statuses if any of the nodepools are still in progress or pending + for i := 0; i < len(foundStatus); i++ { + if foundStatus[i].Status != "Upgraded" && foundStatus[i].Status != "Finished" { + inProgress = true + } + } + + return inProgress } func ReplicaHostName(currentSts appsv1.StatefulSet, repNum int32) string { diff --git a/opensearch-operator/pkg/reconcilers/cluster.go b/opensearch-operator/pkg/reconcilers/cluster.go index 6cea13bf..0a4a3249 100644 --- a/opensearch-operator/pkg/reconcilers/cluster.go +++ b/opensearch-operator/pkg/reconcilers/cluster.go @@ -200,7 +200,7 @@ func (r *ClusterReconciler) reconcileNodeStatefulSet(nodePool opsterv1.NodePool, } else { // A failure is assumed if n PVCs exist but less than n-1 pods (one missing pod is allowed for rolling restart purposes) // We can assume the cluster is in a failure state and cannot recover on its own - if !helpers.UpgradeInProgress(r.instance.Status) && + if !helpers.IsUpgradeInProgress(r.instance.Status) && pvcCount >= int(nodePool.Replicas) && existing.Status.ReadyReplicas < nodePool.Replicas-1 { r.logger.Info(fmt.Sprintf("Detected recovery situation for nodepool %s: PVC count: %d, replicas: %d. Recreating STS with parallel mode", nodePool.Component, pvcCount, existing.Status.Replicas)) if existing.Spec.PodManagementPolicy != appsv1.ParallelPodManagement { @@ -301,7 +301,7 @@ func (r *ClusterReconciler) checkForEmptyDirRecovery() (*ctrl.Result, error) { Description: nodePool.Component, } comp := r.instance.Status.ComponentsStatus - _, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndGroup) + _, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndComponent) if found { return &ctrl.Result{}, nil diff --git a/opensearch-operator/pkg/reconcilers/configuration_test.go b/opensearch-operator/pkg/reconcilers/configuration_test.go index df697962..efab81bf 100644 --- a/opensearch-operator/pkg/reconcilers/configuration_test.go +++ b/opensearch-operator/pkg/reconcilers/configuration_test.go @@ -13,7 +13,6 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" - "github.com/cisco-open/operator-tools/pkg/reconciler" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/stretchr/testify/mock" @@ -21,11 +20,9 @@ import ( func newConfigurationReconciler( client *k8s.MockK8sClient, - ctx context.Context, recorder record.EventRecorder, reconcilerContext *ReconcilerContext, instance *opsterv1.OpenSearchCluster, - opts ...reconciler.ResourceReconcilerOption, ) *ConfigurationReconciler { return &ConfigurationReconciler{ client: client, @@ -69,7 +66,6 @@ var _ = Describe("Configuration Controller", func() { underTest := newConfigurationReconciler( mockClient, - context.Background(), &helpers.MockEventRecorder{}, &reconcilerContext, &spec, @@ -116,7 +112,6 @@ var _ = Describe("Configuration Controller", func() { underTest := newConfigurationReconciler( mockClient, - context.Background(), &helpers.MockEventRecorder{}, &reconcilerContext, &spec, diff --git a/opensearch-operator/pkg/reconcilers/rollingRestart.go b/opensearch-operator/pkg/reconcilers/rollingRestart.go index 748c06d8..483654a4 100644 --- a/opensearch-operator/pkg/reconcilers/rollingRestart.go +++ b/opensearch-operator/pkg/reconcilers/rollingRestart.go @@ -110,6 +110,7 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) { lg.V(1).Info("Restart complete. Reactivating shard allocation") return ctrl.Result{Requeue: true}, err } + r.recorder.AnnotatedEventf(r.instance, map[string]string{"cluster-name": r.instance.GetName()}, "Normal", "RollingRestart", "Rolling restart completed") if err = r.updateStatus(statusFinished); err != nil { return ctrl.Result{Requeue: true}, err } @@ -129,7 +130,7 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) { if err := r.updateStatus(statusInProgress); err != nil { return ctrl.Result{Requeue: true}, err } - r.recorder.AnnotatedEventf(r.instance, map[string]string{"cluster-name": r.instance.GetName()}, "Normal", "RollingRestart", "Starting to rolling restart") + r.recorder.AnnotatedEventf(r.instance, map[string]string{"cluster-name": r.instance.GetName()}, "Normal", "RollingRestart", "Starting rolling restart") // If there is work to do create an Opensearch Client var err error diff --git a/opensearch-operator/pkg/reconcilers/scaler.go b/opensearch-operator/pkg/reconcilers/scaler.go index 8e5d5bde..297e53d0 100644 --- a/opensearch-operator/pkg/reconcilers/scaler.go +++ b/opensearch-operator/pkg/reconcilers/scaler.go @@ -78,7 +78,7 @@ func (r *ScalerReconciler) reconcileNodePool(nodePool *opsterv1.NodePool) (bool, Description: nodePool.Component, } comp := r.instance.Status.ComponentsStatus - currentStatus, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndGroup) + currentStatus, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndComponent) desireReplicaDiff := *currentSts.Spec.Replicas - nodePool.Replicas if desireReplicaDiff == 0 { diff --git a/opensearch-operator/pkg/reconcilers/securityconfig.go b/opensearch-operator/pkg/reconcilers/securityconfig.go index 36e5cca2..bc042ee4 100644 --- a/opensearch-operator/pkg/reconcilers/securityconfig.go +++ b/opensearch-operator/pkg/reconcilers/securityconfig.go @@ -162,7 +162,7 @@ func (r *SecurityconfigReconciler) Reconcile() (ctrl.Result, error) { } r.logger.Info("Starting securityconfig update job") - r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Security", "Starting to securityconfig update job") + r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Security", "Starting securityconfig update job") job = builders.NewSecurityconfigUpdateJob( r.instance, diff --git a/opensearch-operator/pkg/reconcilers/upgrade.go b/opensearch-operator/pkg/reconcilers/upgrade.go index 9b9d8599..73a3acfc 100644 --- a/opensearch-operator/pkg/reconcilers/upgrade.go +++ b/opensearch-operator/pkg/reconcilers/upgrade.go @@ -30,6 +30,12 @@ var ( ErrUnexpectedStatus = errors.New("unexpected upgrade status") ) +const ( + componentNameUpgrader = "Upgrader" + upgradeStatusPending = "Pending" + upgradeStatusInProgress = "Upgrading" +) + type UpgradeReconciler struct { client k8s.K8sClient ctx context.Context @@ -93,10 +99,10 @@ func (r *UpgradeReconciler) Reconcile() (ctrl.Result, error) { // Work on the current nodepool as appropriate switch currentStatus.Status { - case "Pending": + case upgradeStatusPending: // Set it to upgrading and requeue err := r.client.UpdateOpenSearchClusterStatus(client.ObjectKeyFromObject(r.instance), func(instance *opsterv1.OpenSearchCluster) { - currentStatus.Status = "Upgrading" + currentStatus.Status = upgradeStatusInProgress instance.Status.ComponentsStatus = append(instance.Status.ComponentsStatus, currentStatus) }) r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Upgrade", "Starting upgrade of node pool '%s'", currentStatus.Description) @@ -104,7 +110,7 @@ func (r *UpgradeReconciler) Reconcile() (ctrl.Result, error) { Requeue: true, RequeueAfter: 15 * time.Second, }, err - case "Upgrading": + case upgradeStatusInProgress: err := r.doNodePoolUpgrade(nodePool) return ctrl.Result{ Requeue: true, @@ -116,16 +122,16 @@ func (r *UpgradeReconciler) Reconcile() (ctrl.Result, error) { instance.Status.Version = instance.Spec.General.Version for _, pool := range instance.Spec.NodePools { componentStatus := opsterv1.ComponentStatus{ - Component: "Upgrader", + Component: componentNameUpgrader, Description: pool.Component, } - currentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndGroup) + currentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndComponent) if found { instance.Status.ComponentsStatus = helpers.RemoveIt(currentStatus, instance.Status.ComponentsStatus) } } }) - r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Upgrade", "Finished upgrade - NewVersion: %s", r.instance.Status.Version) + r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Upgrade", "Finished upgrade - NewVersion: %s", r.instance.Spec.General.Version) return ctrl.Result{}, err default: // We should never get here so return an error @@ -190,35 +196,35 @@ func (r *UpgradeReconciler) findNextNodePoolForUpgrade() (opsterv1.NodePool, ops pool, found := r.findInProgress(dataNodes) if found { return pool, opsterv1.ComponentStatus{ - Component: "Upgrader", + Component: componentNameUpgrader, Description: pool.Component, - Status: "Upgrading", + Status: upgradeStatusInProgress, } } // Pick the first unworked on node next pool, found = r.findNextPool(dataNodes) if found { return pool, opsterv1.ComponentStatus{ - Component: "Upgrader", + Component: componentNameUpgrader, Description: pool.Component, - Status: "Pending", + Status: upgradeStatusPending, } } // Next do the same for any nodes that are data and master pool, found = r.findInProgress(dataAndMasterNodes) if found { return pool, opsterv1.ComponentStatus{ - Component: "Upgrader", + Component: componentNameUpgrader, Description: pool.Component, - Status: "Upgrading", + Status: upgradeStatusInProgress, } } pool, found = r.findNextPool(dataAndMasterNodes) if found { return pool, opsterv1.ComponentStatus{ - Component: "Upgrader", + Component: componentNameUpgrader, Description: pool.Component, - Status: "Pending", + Status: upgradeStatusPending, } } @@ -226,23 +232,23 @@ func (r *UpgradeReconciler) findNextNodePoolForUpgrade() (opsterv1.NodePool, ops pool, found = r.findInProgress(otherNodes) if found { return pool, opsterv1.ComponentStatus{ - Component: "Upgrader", + Component: componentNameUpgrader, Description: pool.Component, - Status: "Upgrading", + Status: upgradeStatusInProgress, } } pool, found = r.findNextPool(otherNodes) if found { return pool, opsterv1.ComponentStatus{ - Component: "Upgrader", + Component: componentNameUpgrader, Description: pool.Component, - Status: "Pending", + Status: upgradeStatusPending, } } // If we get here all nodes should be upgraded return opsterv1.NodePool{}, opsterv1.ComponentStatus{ - Component: "Upgrade", + Component: componentNameUpgrader, Status: "Finished", } } @@ -250,11 +256,11 @@ func (r *UpgradeReconciler) findNextNodePoolForUpgrade() (opsterv1.NodePool, ops func (r *UpgradeReconciler) findInProgress(pools []opsterv1.NodePool) (opsterv1.NodePool, bool) { for _, nodePool := range pools { componentStatus := opsterv1.ComponentStatus{ - Component: "Upgrader", + Component: componentNameUpgrader, Description: nodePool.Component, } - currentStatus, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndGroup) - if found && currentStatus.Status == "Upgrading" { + currentStatus, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndComponent) + if found && currentStatus.Status == upgradeStatusInProgress { return nodePool, true } } @@ -264,10 +270,10 @@ func (r *UpgradeReconciler) findInProgress(pools []opsterv1.NodePool) (opsterv1. func (r *UpgradeReconciler) findNextPool(pools []opsterv1.NodePool) (opsterv1.NodePool, bool) { for _, nodePool := range pools { componentStatus := opsterv1.ComponentStatus{ - Component: "Upgrader", + Component: componentNameUpgrader, Description: nodePool.Component, } - _, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndGroup) + _, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndComponent) if !found { return nodePool, true } @@ -323,12 +329,12 @@ func (r *UpgradeReconciler) doNodePoolUpgrade(pool opsterv1.NodePool) error { return r.client.UpdateOpenSearchClusterStatus(client.ObjectKeyFromObject(r.instance), func(instance *opsterv1.OpenSearchCluster) { currentStatus := opsterv1.ComponentStatus{ - Component: "Upgrader", - Status: "Upgrading", + Component: componentNameUpgrader, + Status: upgradeStatusInProgress, Description: pool.Component, } componentStatus := opsterv1.ComponentStatus{ - Component: "Upgrader", + Component: componentNameUpgrader, Status: "Upgraded", Description: pool.Component, } @@ -383,14 +389,14 @@ func (r *UpgradeReconciler) setComponentConditions(conditions []string, componen err := r.client.UpdateOpenSearchClusterStatus(client.ObjectKeyFromObject(r.instance), func(instance *opsterv1.OpenSearchCluster) { currentStatus := opsterv1.ComponentStatus{ - Component: "Upgrader", - Status: "Upgrading", + Component: componentNameUpgrader, + Status: upgradeStatusInProgress, Description: component, } - componentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, currentStatus, helpers.GetByDescriptionAndGroup) + componentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, currentStatus, helpers.GetByDescriptionAndComponent) newStatus := opsterv1.ComponentStatus{ - Component: "Upgrader", - Status: "Upgrading", + Component: componentNameUpgrader, + Status: upgradeStatusInProgress, Description: component, Conditions: conditions, } @@ -398,7 +404,7 @@ func (r *UpgradeReconciler) setComponentConditions(conditions []string, componen conditions = append(componentStatus.Conditions, conditions...) } - instance.Status.ComponentsStatus = helpers.Replace(currentStatus, newStatus, instance.Status.ComponentsStatus) + instance.Status.ComponentsStatus = helpers.Replace(componentStatus, newStatus, instance.Status.ComponentsStatus) }) if err != nil { r.logger.Error(err, "Could not update status")