Skip to content

Commit

Permalink
PWX-25414: Parallel/smart upgrades with minimum app downtime (#1629)
Browse files Browse the repository at this point in the history
* [Cherry-pick into feature]: Cherry-pick of all tickets related to Smart and parallel upgrades merged into master (#1578)

* adding k8snode name label to px pods

* correcting go fmt error

* making requested changes

* Creating node PDB

* go fmt error

* Checking node version instead of cluster

* making requested changes

Conflicts:
	drivers/storage/portworx/component/disruption_budget.go

* aggregating errors

* adding logic to delete node PDB

* correcting go fmt errors

* made requested changes

* correcting after merge conflict

* [cherry-pick] PWX-36509 : StorageCluster schema changes to support parallel portworx upgrades (#1576)

* StorageCluster schema changes to support parallel portworx upgrades

Signed-off-by: hitesh-wani-px <[email protected]>

* change DisruptionSpec to Disruption and its description

Signed-off-by: hitesh-wani-px <[email protected]>

---------

Signed-off-by: hitesh-wani-px <[email protected]>

* PWX-36477 : Updating Node PDB to allow k8s upgrades (#1580)

* vendoring openstorage with nooverlappingnodes api

* adding logic to update node PDB

* Adding UTs and correcting logic

* go fmt error

* adding logic to delete cluster pdb after 3.1.2

* addressing comments requested

* skip checking node version if version is empty

* passing nodeIds instead of name to API

* making requested changes and adding UTs

* changing log to info

* PWX-36510: No disruption of volumes for portworx upgrades (#1612)

* non disruptive portworx upgrades

* adding new method to driver interface

* correcting UT

* goimport fmt

* gofmt file

* update PX version in test

* correcting new testcases

---------

Signed-off-by: hitesh-wani-px <[email protected]>
Co-authored-by: Swarupa Vijaykumar <[email protected]>
Co-authored-by: hitesh-wani-px <[email protected]>
Co-authored-by: svijaykumar-px <[email protected]>
  • Loading branch information
4 people authored Aug 12, 2024
1 parent 1a1964b commit a8fb389
Show file tree
Hide file tree
Showing 48 changed files with 25,355 additions and 42,493 deletions.
10 changes: 10 additions & 0 deletions deploy/crds/core_v1_storagecluster_crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ spec:
their place. Once the new pods are available, it then proceeds onto other
StorageCluster pods, thus ensuring that at least 70% of original number of
StorageCluster pods are available at all times during the update.
disruption:
type: object
description: >-
The default behavior is non-disruptive upgrades. This setting disables the default
non-disruptive upgrades and reverts to the previous behavior of upgrading nodes in
parallel without worrying about disruption.
properties:
allow:
type: boolean
description: Flag indicates whether updates are non-disruptive or disruptive.
deleteStrategy:
type: object
description: Delete strategy to uninstall and wipe the storage cluster.
Expand Down
257 changes: 250 additions & 7 deletions drivers/storage/portworx/component/disruption_budget.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package component

import (
"context"
"fmt"
"math"
"strconv"
"strings"

"github.com/hashicorp/go-version"
"github.com/libopenstorage/openstorage/api"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -77,9 +81,59 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error {
if err := c.createKVDBPodDisruptionBudget(cluster, ownerRef); err != nil {
return err
}
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef); err != nil {
// Create node PDB only if parallel upgrade is supported
var err error
c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace)
if err != nil {
return err
}

// Get list of portworx storage nodes
nodeClient := api.NewOpenStorageNodeClient(c.sdkConn)
ctx, err := pxutil.SetupContextWithToken(context.Background(), cluster, c.k8sClient, false)
if err != nil {
return err
}
nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters(
ctx,
&api.SdkNodeEnumerateWithFiltersRequest{},
)
if err != nil {
return fmt.Errorf("failed to enumerate nodes: %v", err)
}
if len(nodeEnumerateResponse.Nodes) == 0 {
logrus.Warnf("Cannot create/update storage PodDisruptionBudget as there are no storage nodes")
return nil
}

if pxutil.ClusterSupportsParallelUpgrade(nodeEnumerateResponse.Nodes) {
// Get the list of k8s nodes that are part of the current cluster
k8sNodeList := &v1.NodeList{}
err = c.k8sClient.List(context.TODO(), k8sNodeList)
if err != nil {
return err
}
if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil {
return err
}
if err := c.deleteClusterPodDisruptionBudget(cluster, ownerRef); err != nil {
logrus.Warnf("failed to delete cluster poddisruptionbudget if exists: %v", err)
}
if err := c.deletePortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil {
return err
}
if err := c.updateMinAvailableForNodePDB(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil {
return err
}
} else {
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil {
return err
}
if err := c.deleteAllNodePodDisruptionBudgets(cluster, ownerRef); err != nil {
logrus.Warnf("failed to delete node poddisruptionbudgets if exist: %v", err)
}
}

return nil
}

Expand All @@ -106,6 +160,7 @@ func (c *disruptionBudget) MarkDeleted() {}
func (c *disruptionBudget) createPortworxPodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
) error {
userProvidedMinValue, err := pxutil.MinAvailableForStoragePDB(cluster)
if err != nil {
Expand All @@ -114,12 +169,8 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(
}

var minAvailable int
c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace)
if err != nil {
return err
}

storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient)
storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient, nodeEnumerateResponse)
if err != nil {
c.closeSdkConn()
return err
Expand Down Expand Up @@ -178,6 +229,73 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(
return err
}

func (c *disruptionBudget) createPortworxNodePodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
k8sNodeList *v1.NodeList,
) error {
nodesNeedingPDB, err := pxutil.NodesNeedingPDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList)
if err != nil {
return err
}
errors := []error{}
for _, node := range nodesNeedingPDB {
minAvailable := intstr.FromInt(1)
pdbName := "px-" + node
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: pdbName,
Namespace: cluster.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
constants.LabelKeyClusterName: cluster.Name,
constants.OperatorLabelNodeNameKey: node,
},
},
},
}
err = k8sutil.CreateOrUpdatePodDisruptionBudget(c.k8sClient, pdb, ownerRef)
if err != nil {
logrus.Warnf("Failed to create PDB for node %s: %v", node, err)
errors = append(errors, err)
}
}
return utilerrors.NewAggregate(errors)

}

// Delete node pod disruption budget when kubertetes is not part of cluster or portworx does not run on it
func (c *disruptionBudget) deletePortworxNodePodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
k8sNodeList *v1.NodeList,
) error {
nodesToDeletePDB, err := pxutil.NodesToDeletePDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList)
if err != nil {
return err
}
errors := []error{}

for _, node := range nodesToDeletePDB {
pdbName := "px-" + node
err = k8sutil.DeletePodDisruptionBudget(
c.k8sClient, pdbName,
cluster.Namespace, *ownerRef,
)
if err != nil {
logrus.Warnf("Failed to delete PDB for node %s: %v", node, err)
errors = append(errors, err)
}
}
return utilerrors.NewAggregate(errors)
}

func (c *disruptionBudget) createKVDBPodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
Expand All @@ -186,7 +304,6 @@ func (c *disruptionBudget) createKVDBPodDisruptionBudget(
if cluster.Spec.Kvdb != nil && !cluster.Spec.Kvdb.Internal {
return nil
}

clusterSize := kvdbClusterSize(cluster)
minAvailable := intstr.FromInt(clusterSize - 1)
pdb := &policyv1.PodDisruptionBudget{
Expand Down Expand Up @@ -262,3 +379,129 @@ func RegisterDisruptionBudgetComponent() {
func init() {
RegisterDisruptionBudgetComponent()
}

func (c *disruptionBudget) updateMinAvailableForNodePDB(cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
k8sNodeList *v1.NodeList,
) error {

// GetNodesToUpgrade returns list of nodes that can be upgraded in parallel and the map of px node to k8s node name, number of nodes with PDB 0 + down nodes
// In the case when non disruptive upgrade is disabled, it returns all cordoned nodes which aren't already selected for the upgrade
nodesToUpgrade, cordonedPxNodesMap, numPxNodesDown, err := pxutil.GetNodesToUpgrade(cluster, nodeEnumerateResponse, k8sNodeList, c.k8sClient, c.sdkConn)
if err != nil {
return err
}
// Return if there are no nodes to upgrade
if len(nodesToUpgrade) == 0 {
return nil
}

// If user has mentioned minimum nodes that need to be running, then honor that value
storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient, nodeEnumerateResponse)
if err != nil {
c.closeSdkConn()
return err
}
userProvidedMinValue, err := pxutil.MinAvailableForStoragePDB(cluster)
if err != nil {
logrus.Warnf("Invalid value for annotation %s: %v", pxutil.AnnotationStoragePodDisruptionBudget, err)
userProvidedMinValue = -2
}

// Calculate the minimum number of nodes that should be available
quorumValue := int(math.Floor(float64(storageNodesCount)/2) + 1)
calculatedMinAvailable := quorumValue
// When non disruptive upgrades is disabled
if cluster.Annotations != nil && cluster.Annotations[pxutil.AnnotationsDisableNonDisruptiveUpgrade] == "true" {
if cluster.Annotations[pxutil.AnnotationStoragePodDisruptionBudget] == "" {
calculatedMinAvailable = storageNodesCount - 1
} else if userProvidedMinValue < quorumValue || userProvidedMinValue >= storageNodesCount {
calculatedMinAvailable = storageNodesCount - 1
// Log an error only if this is the first time we encounter an invalid value
if userProvidedMinValue != c.annotatedMinAvailable {
errmsg := fmt.Sprintf("Invalid minAvailable annotation value for storage pod disruption budget. Using default value: %d", calculatedMinAvailable)
c.recorder.Event(cluster, v1.EventTypeWarning, util.InvalidMinAvailable, errmsg)
}
} else if userProvidedMinValue >= quorumValue && userProvidedMinValue < storageNodesCount {
calculatedMinAvailable = userProvidedMinValue
}
} else {
// When non disruptive upgrades is enabled
if userProvidedMinValue >= storageNodesCount && userProvidedMinValue != c.annotatedMinAvailable {
errmsg := fmt.Sprintf("Invalid minAvailable annotation value for storage pod disruption budget. Using default value: %d", calculatedMinAvailable)
c.recorder.Event(cluster, v1.EventTypeWarning, util.InvalidMinAvailable, errmsg)
} else if userProvidedMinValue >= quorumValue && userProvidedMinValue < storageNodesCount {
calculatedMinAvailable = userProvidedMinValue
}
}
c.annotatedMinAvailable = userProvidedMinValue
downNodesCount := numPxNodesDown
// Update minAvailable to 0 for nodes in nodesToUpgrade
for _, node := range nodesToUpgrade {
// Ensure that portworx quorum or valid minAvailable provided by user is always maintained
maxUnavailable := storageNodesCount - calculatedMinAvailable
if downNodesCount >= maxUnavailable {
logrus.Infof("Number of down PX nodes: %d, is equal to or exceeds allowed maximum: %d. Total storage nodes: %d", downNodesCount, maxUnavailable, storageNodesCount)
break
}
k8sNode := cordonedPxNodesMap[node]
pdbName := "px-" + k8sNode
minAvailable := intstr.FromInt(0)
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: pdbName,
Namespace: cluster.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
constants.LabelKeyClusterName: cluster.Name,
constants.OperatorLabelNodeNameKey: k8sNode,
},
},
},
}
err = k8sutil.CreateOrUpdatePodDisruptionBudget(c.k8sClient, pdb, ownerRef)
if err != nil {
logrus.Warnf("Failed to update PDB for node %s: %v", node, err)
} else {
downNodesCount++
}
}
return nil

}

func (c *disruptionBudget) deleteClusterPodDisruptionBudget(cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference) error {

err := k8sutil.DeletePodDisruptionBudget(c.k8sClient, StoragePodDisruptionBudgetName, cluster.Namespace, *ownerRef)
if err != nil {
logrus.Warnf("Failed to delete cluster PDB %s: %v", StoragePodDisruptionBudgetName, err)
return err
}

return nil
}

func (c *disruptionBudget) deleteAllNodePodDisruptionBudgets(cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference) error {
errors := []error{}
// Get the list of poddisruptionbudgets
pdbList := &policyv1.PodDisruptionBudgetList{}
err := c.k8sClient.List(context.TODO(), pdbList, client.InNamespace(cluster.Namespace))
if err != nil {
logrus.Warnf("failed to list poddisruptionbudgets: %v", err)
}
for _, pdb := range pdbList.Items {
if strings.HasPrefix(pdb.Name, "px") && pdb.Name != StoragePodDisruptionBudgetName && pdb.Name != "px-kvdb" {
err := k8sutil.DeletePodDisruptionBudget(c.k8sClient, pdb.Name, cluster.Namespace, *ownerRef)
if err != nil {
logrus.Warnf("Failed to delete node PDB %s: %v", pdb.Name, err)
errors = append(errors, err)
}
}
}
return utilerrors.NewAggregate(errors)
}
Loading

0 comments on commit a8fb389

Please sign in to comment.