Skip to content

Commit

Permalink
Fix upgrade detection during parallel recovery (#789)
Browse files Browse the repository at this point in the history
### Description
Fixes a bug where parallel recovery did not engage if the cluster had
gone through a version upgrade beforehand.
Reason was that the upgrade logic added the status for each nodepool
twice leading to the recovery logic incorrectly detecting if an upgrade
was in progress.
Also did a small refactoring of names and constants and fixed a warning
by my IDE about unused parameters.

No changes to the CRDs or functionality, just internal logic fixes.

### Issues Resolved
Fixes #730 

### Check List
- [x] Commits are signed per the DCO using --signoff 
- [x] Unittest added for the new/changed functionality and all unit
tests are successful
- [x] Customer-visible features documented
- [x] No linter warnings (`make lint`)

If CRDs are changed:
- [-] CRD YAMLs updated (`make manifests`) and also copied into the helm
chart
- [-] Changes to CRDs documented

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and
signing off your commits, please check
[here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin).

Signed-off-by: Sebastian Woehrl <[email protected]>
  • Loading branch information
swoehrl-mw authored May 21, 2024
1 parent 5a2d2ae commit 4f59766
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 49 deletions.
2 changes: 1 addition & 1 deletion opensearch-operator/pkg/builders/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ var _ = Describe("Builders", func() {
Expect(result.Spec.Template.Spec.Containers[0].ReadinessProbe.FailureThreshold).To(Equal(int32(9)))
})
})

When("Configuring InitHelper Resources", func() {
It("should propagate Resources to all init containers", func() {
clusterObject := ClusterDescWithVersion("2.2.1")
Expand Down
36 changes: 31 additions & 5 deletions opensearch-operator/pkg/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"encoding/json"
"errors"
"fmt"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"reflect"
"sort"
"time"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"

policyv1 "k8s.io/api/policy/v1"

opsterv1 "github.com/Opster/opensearch-k8s-operator/opensearch-operator/api/v1"
Expand Down Expand Up @@ -79,6 +80,22 @@ func FindFirstPartial(
return item, false
}

func FindAllPartial(
arr []opsterv1.ComponentStatus,
item opsterv1.ComponentStatus,
predicator func(opsterv1.ComponentStatus, opsterv1.ComponentStatus) (opsterv1.ComponentStatus, bool),
) []opsterv1.ComponentStatus {
var result []opsterv1.ComponentStatus

for i := 0; i < len(arr); i++ {
itemInArr, found := predicator(arr[i], item)
if found {
result = append(result, itemInArr)
}
}
return result
}

func FindByPath(obj interface{}, keys []string) (interface{}, bool) {
mobj, ok := obj.(map[string]interface{})
if !ok {
Expand Down Expand Up @@ -116,7 +133,7 @@ func UsernameAndPassword(k8sClient k8s.K8sClient, cr *opsterv1.OpenSearchCluster
}
}

func GetByDescriptionAndGroup(left opsterv1.ComponentStatus, right opsterv1.ComponentStatus) (opsterv1.ComponentStatus, bool) {
func GetByDescriptionAndComponent(left opsterv1.ComponentStatus, right opsterv1.ComponentStatus) (opsterv1.ComponentStatus, bool) {
if left.Description == right.Description && left.Component == right.Component {
return left, true
}
Expand Down Expand Up @@ -430,12 +447,21 @@ func CalculateJvmHeapSize(nodePool *opsterv1.NodePool) string {
return nodePool.Jvm
}

func UpgradeInProgress(status opsterv1.ClusterStatus) bool {
func IsUpgradeInProgress(status opsterv1.ClusterStatus) bool {
componentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
}
_, found := FindFirstPartial(status.ComponentsStatus, componentStatus, GetByComponent)
return found
foundStatus := FindAllPartial(status.ComponentsStatus, componentStatus, GetByComponent)
inProgress := false

// check all statuses if any of the nodepools are still in progress or pending
for i := 0; i < len(foundStatus); i++ {
if foundStatus[i].Status != "Upgraded" && foundStatus[i].Status != "Finished" {
inProgress = true
}
}

return inProgress
}

func ReplicaHostName(currentSts appsv1.StatefulSet, repNum int32) string {
Expand Down
4 changes: 2 additions & 2 deletions opensearch-operator/pkg/reconcilers/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (r *ClusterReconciler) reconcileNodeStatefulSet(nodePool opsterv1.NodePool,
} else {
// A failure is assumed if n PVCs exist but less than n-1 pods (one missing pod is allowed for rolling restart purposes)
// We can assume the cluster is in a failure state and cannot recover on its own
if !helpers.UpgradeInProgress(r.instance.Status) &&
if !helpers.IsUpgradeInProgress(r.instance.Status) &&
pvcCount >= int(nodePool.Replicas) && existing.Status.ReadyReplicas < nodePool.Replicas-1 {
r.logger.Info(fmt.Sprintf("Detected recovery situation for nodepool %s: PVC count: %d, replicas: %d. Recreating STS with parallel mode", nodePool.Component, pvcCount, existing.Status.Replicas))
if existing.Spec.PodManagementPolicy != appsv1.ParallelPodManagement {
Expand Down Expand Up @@ -301,7 +301,7 @@ func (r *ClusterReconciler) checkForEmptyDirRecovery() (*ctrl.Result, error) {
Description: nodePool.Component,
}
comp := r.instance.Status.ComponentsStatus
_, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndGroup)
_, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndComponent)

if found {
return &ctrl.Result{}, nil
Expand Down
5 changes: 0 additions & 5 deletions opensearch-operator/pkg/reconcilers/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,16 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/cisco-open/operator-tools/pkg/reconciler"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
)

func newConfigurationReconciler(
client *k8s.MockK8sClient,
ctx context.Context,
recorder record.EventRecorder,
reconcilerContext *ReconcilerContext,
instance *opsterv1.OpenSearchCluster,
opts ...reconciler.ResourceReconcilerOption,
) *ConfigurationReconciler {
return &ConfigurationReconciler{
client: client,
Expand Down Expand Up @@ -69,7 +66,6 @@ var _ = Describe("Configuration Controller", func() {

underTest := newConfigurationReconciler(
mockClient,
context.Background(),
&helpers.MockEventRecorder{},
&reconcilerContext,
&spec,
Expand Down Expand Up @@ -116,7 +112,6 @@ var _ = Describe("Configuration Controller", func() {

underTest := newConfigurationReconciler(
mockClient,
context.Background(),
&helpers.MockEventRecorder{},
&reconcilerContext,
&spec,
Expand Down
3 changes: 2 additions & 1 deletion opensearch-operator/pkg/reconcilers/rollingRestart.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) {
lg.V(1).Info("Restart complete. Reactivating shard allocation")
return ctrl.Result{Requeue: true}, err
}
r.recorder.AnnotatedEventf(r.instance, map[string]string{"cluster-name": r.instance.GetName()}, "Normal", "RollingRestart", "Rolling restart completed")
if err = r.updateStatus(statusFinished); err != nil {
return ctrl.Result{Requeue: true}, err
}
Expand All @@ -129,7 +130,7 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) {
if err := r.updateStatus(statusInProgress); err != nil {
return ctrl.Result{Requeue: true}, err
}
r.recorder.AnnotatedEventf(r.instance, map[string]string{"cluster-name": r.instance.GetName()}, "Normal", "RollingRestart", "Starting to rolling restart")
r.recorder.AnnotatedEventf(r.instance, map[string]string{"cluster-name": r.instance.GetName()}, "Normal", "RollingRestart", "Starting rolling restart")

// If there is work to do create an Opensearch Client
var err error
Expand Down
2 changes: 1 addition & 1 deletion opensearch-operator/pkg/reconcilers/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (r *ScalerReconciler) reconcileNodePool(nodePool *opsterv1.NodePool) (bool,
Description: nodePool.Component,
}
comp := r.instance.Status.ComponentsStatus
currentStatus, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndGroup)
currentStatus, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndComponent)

desireReplicaDiff := *currentSts.Spec.Replicas - nodePool.Replicas
if desireReplicaDiff == 0 {
Expand Down
2 changes: 1 addition & 1 deletion opensearch-operator/pkg/reconcilers/securityconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (r *SecurityconfigReconciler) Reconcile() (ctrl.Result, error) {
}

r.logger.Info("Starting securityconfig update job")
r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Security", "Starting to securityconfig update job")
r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Security", "Starting securityconfig update job")

job = builders.NewSecurityconfigUpdateJob(
r.instance,
Expand Down
72 changes: 39 additions & 33 deletions opensearch-operator/pkg/reconcilers/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ var (
ErrUnexpectedStatus = errors.New("unexpected upgrade status")
)

const (
componentNameUpgrader = "Upgrader"
upgradeStatusPending = "Pending"
upgradeStatusInProgress = "Upgrading"
)

type UpgradeReconciler struct {
client k8s.K8sClient
ctx context.Context
Expand Down Expand Up @@ -93,18 +99,18 @@ func (r *UpgradeReconciler) Reconcile() (ctrl.Result, error) {

// Work on the current nodepool as appropriate
switch currentStatus.Status {
case "Pending":
case upgradeStatusPending:
// Set it to upgrading and requeue
err := r.client.UpdateOpenSearchClusterStatus(client.ObjectKeyFromObject(r.instance), func(instance *opsterv1.OpenSearchCluster) {
currentStatus.Status = "Upgrading"
currentStatus.Status = upgradeStatusInProgress
instance.Status.ComponentsStatus = append(instance.Status.ComponentsStatus, currentStatus)
})
r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Upgrade", "Starting upgrade of node pool '%s'", currentStatus.Description)
return ctrl.Result{
Requeue: true,
RequeueAfter: 15 * time.Second,
}, err
case "Upgrading":
case upgradeStatusInProgress:
err := r.doNodePoolUpgrade(nodePool)
return ctrl.Result{
Requeue: true,
Expand All @@ -116,16 +122,16 @@ func (r *UpgradeReconciler) Reconcile() (ctrl.Result, error) {
instance.Status.Version = instance.Spec.General.Version
for _, pool := range instance.Spec.NodePools {
componentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
}
currentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndGroup)
currentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndComponent)
if found {
instance.Status.ComponentsStatus = helpers.RemoveIt(currentStatus, instance.Status.ComponentsStatus)
}
}
})
r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Upgrade", "Finished upgrade - NewVersion: %s", r.instance.Status.Version)
r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Upgrade", "Finished upgrade - NewVersion: %s", r.instance.Spec.General.Version)
return ctrl.Result{}, err
default:
// We should never get here so return an error
Expand Down Expand Up @@ -190,71 +196,71 @@ func (r *UpgradeReconciler) findNextNodePoolForUpgrade() (opsterv1.NodePool, ops
pool, found := r.findInProgress(dataNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Upgrading",
Status: upgradeStatusInProgress,
}
}
// Pick the first unworked on node next
pool, found = r.findNextPool(dataNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Pending",
Status: upgradeStatusPending,
}
}
// Next do the same for any nodes that are data and master
pool, found = r.findInProgress(dataAndMasterNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Upgrading",
Status: upgradeStatusInProgress,
}
}
pool, found = r.findNextPool(dataAndMasterNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Pending",
Status: upgradeStatusPending,
}
}

// Finally do the non data nodes
pool, found = r.findInProgress(otherNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Upgrading",
Status: upgradeStatusInProgress,
}
}
pool, found = r.findNextPool(otherNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Pending",
Status: upgradeStatusPending,
}
}

// If we get here all nodes should be upgraded
return opsterv1.NodePool{}, opsterv1.ComponentStatus{
Component: "Upgrade",
Component: componentNameUpgrader,
Status: "Finished",
}
}

func (r *UpgradeReconciler) findInProgress(pools []opsterv1.NodePool) (opsterv1.NodePool, bool) {
for _, nodePool := range pools {
componentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: nodePool.Component,
}
currentStatus, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndGroup)
if found && currentStatus.Status == "Upgrading" {
currentStatus, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndComponent)
if found && currentStatus.Status == upgradeStatusInProgress {
return nodePool, true
}
}
Expand All @@ -264,10 +270,10 @@ func (r *UpgradeReconciler) findInProgress(pools []opsterv1.NodePool) (opsterv1.
func (r *UpgradeReconciler) findNextPool(pools []opsterv1.NodePool) (opsterv1.NodePool, bool) {
for _, nodePool := range pools {
componentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: nodePool.Component,
}
_, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndGroup)
_, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndComponent)
if !found {
return nodePool, true
}
Expand Down Expand Up @@ -323,12 +329,12 @@ func (r *UpgradeReconciler) doNodePoolUpgrade(pool opsterv1.NodePool) error {

return r.client.UpdateOpenSearchClusterStatus(client.ObjectKeyFromObject(r.instance), func(instance *opsterv1.OpenSearchCluster) {
currentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Status: "Upgrading",
Component: componentNameUpgrader,
Status: upgradeStatusInProgress,
Description: pool.Component,
}
componentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Status: "Upgraded",
Description: pool.Component,
}
Expand Down Expand Up @@ -383,22 +389,22 @@ func (r *UpgradeReconciler) setComponentConditions(conditions []string, componen

err := r.client.UpdateOpenSearchClusterStatus(client.ObjectKeyFromObject(r.instance), func(instance *opsterv1.OpenSearchCluster) {
currentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Status: "Upgrading",
Component: componentNameUpgrader,
Status: upgradeStatusInProgress,
Description: component,
}
componentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, currentStatus, helpers.GetByDescriptionAndGroup)
componentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, currentStatus, helpers.GetByDescriptionAndComponent)
newStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Status: "Upgrading",
Component: componentNameUpgrader,
Status: upgradeStatusInProgress,
Description: component,
Conditions: conditions,
}
if found {
conditions = append(componentStatus.Conditions, conditions...)
}

instance.Status.ComponentsStatus = helpers.Replace(currentStatus, newStatus, instance.Status.ComponentsStatus)
instance.Status.ComponentsStatus = helpers.Replace(componentStatus, newStatus, instance.Status.ComponentsStatus)
})
if err != nil {
r.logger.Error(err, "Could not update status")
Expand Down

0 comments on commit 4f59766

Please sign in to comment.