Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
dtaivpp authored May 24, 2024
2 parents 2434a88 + 4f59766 commit 3bd7e72
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 49 deletions.
2 changes: 1 addition & 1 deletion opensearch-operator/pkg/builders/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ var _ = Describe("Builders", func() {
Expect(result.Spec.Template.Spec.Containers[0].ReadinessProbe.FailureThreshold).To(Equal(int32(9)))
})
})

When("Configuring InitHelper Resources", func() {
It("should propagate Resources to all init containers", func() {
clusterObject := ClusterDescWithVersion("2.2.1")
Expand Down
36 changes: 31 additions & 5 deletions opensearch-operator/pkg/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"encoding/json"
"errors"
"fmt"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"reflect"
"sort"
"time"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"

policyv1 "k8s.io/api/policy/v1"

opsterv1 "github.com/Opster/opensearch-k8s-operator/opensearch-operator/api/v1"
Expand Down Expand Up @@ -79,6 +80,22 @@ func FindFirstPartial(
return item, false
}

func FindAllPartial(
arr []opsterv1.ComponentStatus,
item opsterv1.ComponentStatus,
predicator func(opsterv1.ComponentStatus, opsterv1.ComponentStatus) (opsterv1.ComponentStatus, bool),
) []opsterv1.ComponentStatus {
var result []opsterv1.ComponentStatus

for i := 0; i < len(arr); i++ {
itemInArr, found := predicator(arr[i], item)
if found {
result = append(result, itemInArr)
}
}
return result
}

func FindByPath(obj interface{}, keys []string) (interface{}, bool) {
mobj, ok := obj.(map[string]interface{})
if !ok {
Expand Down Expand Up @@ -116,7 +133,7 @@ func UsernameAndPassword(k8sClient k8s.K8sClient, cr *opsterv1.OpenSearchCluster
}
}

func GetByDescriptionAndGroup(left opsterv1.ComponentStatus, right opsterv1.ComponentStatus) (opsterv1.ComponentStatus, bool) {
func GetByDescriptionAndComponent(left opsterv1.ComponentStatus, right opsterv1.ComponentStatus) (opsterv1.ComponentStatus, bool) {
if left.Description == right.Description && left.Component == right.Component {
return left, true
}
Expand Down Expand Up @@ -430,12 +447,21 @@ func CalculateJvmHeapSize(nodePool *opsterv1.NodePool) string {
return nodePool.Jvm
}

func UpgradeInProgress(status opsterv1.ClusterStatus) bool {
func IsUpgradeInProgress(status opsterv1.ClusterStatus) bool {
componentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
}
_, found := FindFirstPartial(status.ComponentsStatus, componentStatus, GetByComponent)
return found
foundStatus := FindAllPartial(status.ComponentsStatus, componentStatus, GetByComponent)
inProgress := false

// check all statuses if any of the nodepools are still in progress or pending
for i := 0; i < len(foundStatus); i++ {
if foundStatus[i].Status != "Upgraded" && foundStatus[i].Status != "Finished" {
inProgress = true
}
}

return inProgress
}

func ReplicaHostName(currentSts appsv1.StatefulSet, repNum int32) string {
Expand Down
4 changes: 2 additions & 2 deletions opensearch-operator/pkg/reconcilers/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (r *ClusterReconciler) reconcileNodeStatefulSet(nodePool opsterv1.NodePool,
} else {
// A failure is assumed if n PVCs exist but less than n-1 pods (one missing pod is allowed for rolling restart purposes)
// We can assume the cluster is in a failure state and cannot recover on its own
if !helpers.UpgradeInProgress(r.instance.Status) &&
if !helpers.IsUpgradeInProgress(r.instance.Status) &&
pvcCount >= int(nodePool.Replicas) && existing.Status.ReadyReplicas < nodePool.Replicas-1 {
r.logger.Info(fmt.Sprintf("Detected recovery situation for nodepool %s: PVC count: %d, replicas: %d. Recreating STS with parallel mode", nodePool.Component, pvcCount, existing.Status.Replicas))
if existing.Spec.PodManagementPolicy != appsv1.ParallelPodManagement {
Expand Down Expand Up @@ -301,7 +301,7 @@ func (r *ClusterReconciler) checkForEmptyDirRecovery() (*ctrl.Result, error) {
Description: nodePool.Component,
}
comp := r.instance.Status.ComponentsStatus
_, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndGroup)
_, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndComponent)

if found {
return &ctrl.Result{}, nil
Expand Down
5 changes: 0 additions & 5 deletions opensearch-operator/pkg/reconcilers/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,16 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/cisco-open/operator-tools/pkg/reconciler"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
)

func newConfigurationReconciler(
client *k8s.MockK8sClient,
ctx context.Context,
recorder record.EventRecorder,
reconcilerContext *ReconcilerContext,
instance *opsterv1.OpenSearchCluster,
opts ...reconciler.ResourceReconcilerOption,
) *ConfigurationReconciler {
return &ConfigurationReconciler{
client: client,
Expand Down Expand Up @@ -69,7 +66,6 @@ var _ = Describe("Configuration Controller", func() {

underTest := newConfigurationReconciler(
mockClient,
context.Background(),
&helpers.MockEventRecorder{},
&reconcilerContext,
&spec,
Expand Down Expand Up @@ -116,7 +112,6 @@ var _ = Describe("Configuration Controller", func() {

underTest := newConfigurationReconciler(
mockClient,
context.Background(),
&helpers.MockEventRecorder{},
&reconcilerContext,
&spec,
Expand Down
3 changes: 2 additions & 1 deletion opensearch-operator/pkg/reconcilers/rollingRestart.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) {
lg.V(1).Info("Restart complete. Reactivating shard allocation")
return ctrl.Result{Requeue: true}, err
}
r.recorder.AnnotatedEventf(r.instance, map[string]string{"cluster-name": r.instance.GetName()}, "Normal", "RollingRestart", "Rolling restart completed")
if err = r.updateStatus(statusFinished); err != nil {
return ctrl.Result{Requeue: true}, err
}
Expand All @@ -129,7 +130,7 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) {
if err := r.updateStatus(statusInProgress); err != nil {
return ctrl.Result{Requeue: true}, err
}
r.recorder.AnnotatedEventf(r.instance, map[string]string{"cluster-name": r.instance.GetName()}, "Normal", "RollingRestart", "Starting to rolling restart")
r.recorder.AnnotatedEventf(r.instance, map[string]string{"cluster-name": r.instance.GetName()}, "Normal", "RollingRestart", "Starting rolling restart")

// If there is work to do create an Opensearch Client
var err error
Expand Down
2 changes: 1 addition & 1 deletion opensearch-operator/pkg/reconcilers/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (r *ScalerReconciler) reconcileNodePool(nodePool *opsterv1.NodePool) (bool,
Description: nodePool.Component,
}
comp := r.instance.Status.ComponentsStatus
currentStatus, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndGroup)
currentStatus, found := helpers.FindFirstPartial(comp, componentStatus, helpers.GetByDescriptionAndComponent)

desireReplicaDiff := *currentSts.Spec.Replicas - nodePool.Replicas
if desireReplicaDiff == 0 {
Expand Down
2 changes: 1 addition & 1 deletion opensearch-operator/pkg/reconcilers/securityconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (r *SecurityconfigReconciler) Reconcile() (ctrl.Result, error) {
}

r.logger.Info("Starting securityconfig update job")
r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Security", "Starting to securityconfig update job")
r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Security", "Starting securityconfig update job")

job = builders.NewSecurityconfigUpdateJob(
r.instance,
Expand Down
72 changes: 39 additions & 33 deletions opensearch-operator/pkg/reconcilers/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ var (
ErrUnexpectedStatus = errors.New("unexpected upgrade status")
)

const (
componentNameUpgrader = "Upgrader"
upgradeStatusPending = "Pending"
upgradeStatusInProgress = "Upgrading"
)

type UpgradeReconciler struct {
client k8s.K8sClient
ctx context.Context
Expand Down Expand Up @@ -93,18 +99,18 @@ func (r *UpgradeReconciler) Reconcile() (ctrl.Result, error) {

// Work on the current nodepool as appropriate
switch currentStatus.Status {
case "Pending":
case upgradeStatusPending:
// Set it to upgrading and requeue
err := r.client.UpdateOpenSearchClusterStatus(client.ObjectKeyFromObject(r.instance), func(instance *opsterv1.OpenSearchCluster) {
currentStatus.Status = "Upgrading"
currentStatus.Status = upgradeStatusInProgress
instance.Status.ComponentsStatus = append(instance.Status.ComponentsStatus, currentStatus)
})
r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Upgrade", "Starting upgrade of node pool '%s'", currentStatus.Description)
return ctrl.Result{
Requeue: true,
RequeueAfter: 15 * time.Second,
}, err
case "Upgrading":
case upgradeStatusInProgress:
err := r.doNodePoolUpgrade(nodePool)
return ctrl.Result{
Requeue: true,
Expand All @@ -116,16 +122,16 @@ func (r *UpgradeReconciler) Reconcile() (ctrl.Result, error) {
instance.Status.Version = instance.Spec.General.Version
for _, pool := range instance.Spec.NodePools {
componentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
}
currentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndGroup)
currentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndComponent)
if found {
instance.Status.ComponentsStatus = helpers.RemoveIt(currentStatus, instance.Status.ComponentsStatus)
}
}
})
r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Upgrade", "Finished upgrade - NewVersion: %s", r.instance.Status.Version)
r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Upgrade", "Finished upgrade - NewVersion: %s", r.instance.Spec.General.Version)
return ctrl.Result{}, err
default:
// We should never get here so return an error
Expand Down Expand Up @@ -190,71 +196,71 @@ func (r *UpgradeReconciler) findNextNodePoolForUpgrade() (opsterv1.NodePool, ops
pool, found := r.findInProgress(dataNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Upgrading",
Status: upgradeStatusInProgress,
}
}
// Pick the first unworked on node next
pool, found = r.findNextPool(dataNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Pending",
Status: upgradeStatusPending,
}
}
// Next do the same for any nodes that are data and master
pool, found = r.findInProgress(dataAndMasterNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Upgrading",
Status: upgradeStatusInProgress,
}
}
pool, found = r.findNextPool(dataAndMasterNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Pending",
Status: upgradeStatusPending,
}
}

// Finally do the non data nodes
pool, found = r.findInProgress(otherNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Upgrading",
Status: upgradeStatusInProgress,
}
}
pool, found = r.findNextPool(otherNodes)
if found {
return pool, opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: pool.Component,
Status: "Pending",
Status: upgradeStatusPending,
}
}

// If we get here all nodes should be upgraded
return opsterv1.NodePool{}, opsterv1.ComponentStatus{
Component: "Upgrade",
Component: componentNameUpgrader,
Status: "Finished",
}
}

func (r *UpgradeReconciler) findInProgress(pools []opsterv1.NodePool) (opsterv1.NodePool, bool) {
for _, nodePool := range pools {
componentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: nodePool.Component,
}
currentStatus, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndGroup)
if found && currentStatus.Status == "Upgrading" {
currentStatus, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndComponent)
if found && currentStatus.Status == upgradeStatusInProgress {
return nodePool, true
}
}
Expand All @@ -264,10 +270,10 @@ func (r *UpgradeReconciler) findInProgress(pools []opsterv1.NodePool) (opsterv1.
func (r *UpgradeReconciler) findNextPool(pools []opsterv1.NodePool) (opsterv1.NodePool, bool) {
for _, nodePool := range pools {
componentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Description: nodePool.Component,
}
_, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndGroup)
_, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, componentStatus, helpers.GetByDescriptionAndComponent)
if !found {
return nodePool, true
}
Expand Down Expand Up @@ -323,12 +329,12 @@ func (r *UpgradeReconciler) doNodePoolUpgrade(pool opsterv1.NodePool) error {

return r.client.UpdateOpenSearchClusterStatus(client.ObjectKeyFromObject(r.instance), func(instance *opsterv1.OpenSearchCluster) {
currentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Status: "Upgrading",
Component: componentNameUpgrader,
Status: upgradeStatusInProgress,
Description: pool.Component,
}
componentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Component: componentNameUpgrader,
Status: "Upgraded",
Description: pool.Component,
}
Expand Down Expand Up @@ -383,22 +389,22 @@ func (r *UpgradeReconciler) setComponentConditions(conditions []string, componen

err := r.client.UpdateOpenSearchClusterStatus(client.ObjectKeyFromObject(r.instance), func(instance *opsterv1.OpenSearchCluster) {
currentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Status: "Upgrading",
Component: componentNameUpgrader,
Status: upgradeStatusInProgress,
Description: component,
}
componentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, currentStatus, helpers.GetByDescriptionAndGroup)
componentStatus, found := helpers.FindFirstPartial(instance.Status.ComponentsStatus, currentStatus, helpers.GetByDescriptionAndComponent)
newStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
Status: "Upgrading",
Component: componentNameUpgrader,
Status: upgradeStatusInProgress,
Description: component,
Conditions: conditions,
}
if found {
conditions = append(componentStatus.Conditions, conditions...)
}

instance.Status.ComponentsStatus = helpers.Replace(currentStatus, newStatus, instance.Status.ComponentsStatus)
instance.Status.ComponentsStatus = helpers.Replace(componentStatus, newStatus, instance.Status.ComponentsStatus)
})
if err != nil {
r.logger.Error(err, "Could not update status")
Expand Down

0 comments on commit 3bd7e72

Please sign in to comment.