Skip to content

Commit

Permalink
Updates to cert handling (#567)
Browse files Browse the repository at this point in the history
This PR introduces a couple of changes to cert handling.

The first minor one allows certMgr generated certs to be used as CAs for
autogenerating node certs. This just normalizes the keys where the pem
data is stored.

The second deals with a change in behaviour for Opensearch. Prior to 2.0
Opensearch used the transport CA to verify the admin cert. Post 2.0 it
uses the http CA. If both are the same this isn't a problem, but it
shows up if different CAs are used for both.

To solve this the code has been refactored and split apart to allow for
the logic updates.
  • Loading branch information
swoehrl-mw authored Jul 28, 2023
2 parents 33d9c53 + 4e793b3 commit 181c86a
Show file tree
Hide file tree
Showing 18 changed files with 681 additions and 116 deletions.

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions opensearch-operator/api/v1/opensearch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,10 @@ type OpenSearchCluster struct {
}

type ComponentStatus struct {
Component string `json:"component,omitempty"`
Status string `json:"status,omitempty"`
Description string `json:"description,omitempty"`
Component string `json:"component,omitempty"`
Status string `json:"status,omitempty"`
Description string `json:"description,omitempty"`
Conditions []string `json:"conditions,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
9 changes: 8 additions & 1 deletion opensearch-operator/api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ spec:
httpPort: 9200
vendor: opensearch
serviceName: deploy-and-upgrade
additionalConfig:
cluster.routing.allocation.disk.watermark.low: 500m
cluster.routing.allocation.disk.watermark.high: 300m
cluster.routing.allocation.disk.watermark.flood_stage: 100m
confMgmt:
smartScaler: true
dashboards:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
opsterv1 "opensearch.opster.io/api/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -69,8 +71,28 @@ var _ = Describe("DeployAndUpgrade", Ordered, func() {
Eventually(func() int32 {
err := k8sClient.Get(context.Background(), client.ObjectKey{Name: name + "-masters", Namespace: namespace}, &sts)
if err == nil {
GinkgoWriter.Printf("%+v\n", sts.Status)
pods := &corev1.PodList{}
err := k8sClient.List(context.Background(), pods, client.InNamespace(namespace))
if err == nil {
for _, pod := range pods.Items {
revision, ok := pod.Labels["controller-revision-hash"]
GinkgoWriter.Printf("Pod: %s\tPhase: %s", pod.Name, pod.Status.Phase)
if ok {
GinkgoWriter.Printf("\tRevision: %s\t Image: %s", revision, pod.Spec.Containers[0].Image)
}
GinkgoWriter.Println()
}
} else {
GinkgoWriter.Println(err)
}
cluster := &opsterv1.OpenSearchCluster{}
k8sClient.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, cluster)
GinkgoWriter.Printf("Cluster: %+v\n", cluster.Status)

return sts.Status.UpdatedReplicas
}
GinkgoWriter.Println(err)
return 0
}, time.Minute*15, time.Second*5).Should(Equal(int32(3)))
})
Expand Down
11 changes: 10 additions & 1 deletion opensearch-operator/functionaltests/operatortests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
opsterv1 "opensearch.opster.io/api/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -18,7 +22,12 @@ func TestAPIs(t *testing.T) {
if err != nil {
panic(err.Error())
}
k8sClient, err = client.New(config, client.Options{})
scheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(opsterv1.AddToScheme(scheme))
k8sClient, err = client.New(config, client.Options{
Scheme: scheme,
})
if err != nil {
panic(err.Error())
}
Expand Down
2 changes: 2 additions & 0 deletions opensearch-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/opensearch-project/opensearch-go v1.1.0
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.61.1
github.com/samber/lo v1.38.1
go.uber.org/zap v1.24.0
k8s.io/api v0.27.2
k8s.io/apiextensions-apiserver v0.27.2
Expand Down Expand Up @@ -71,6 +72,7 @@ require (
github.com/wayneashleyberry/terminal-dimensions v1.1.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sys v0.8.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions opensearch-operator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand Down Expand Up @@ -523,6 +525,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,35 +147,35 @@ func createClusterSettingsAllocationEnable(enable ClusterSettingsAllocation) res
}}
}

func CheckClusterStatusForRestart(service *OsClusterClient, drainNodes bool) (bool, error) {
func CheckClusterStatusForRestart(service *OsClusterClient, drainNodes bool) (bool, string, error) {
health, err := service.GetHealth()
if err != nil {
return false, err
return false, "failed to fetch health", err
}

if health.Status == "green" {
return true, nil
return true, "", nil
}

if drainNodes {
return false, nil
return false, "cluster is not green and drain nodes is enabled", nil
}

flatSettings, err := service.GetFlatClusterSettings()
if err != nil {
return false, err
return false, "could not fetch cluster settings", err
}

if flatSettings.Transient.ClusterRoutingAllocationEnable == string(ClusterSettingsAllocationAll) {
return false, nil
return false, "waiting for health to be green", nil
}

// Set shard routing to all
if err := SetClusterShardAllocation(service, ClusterSettingsAllocationAll); err != nil {
return false, err
return false, "failed to set shard allocation", err
}

return false, nil
return false, "enabled shard allocation", nil
}

func ReactivateShardAllocation(service *OsClusterClient) error {
Expand Down
9 changes: 0 additions & 9 deletions opensearch-operator/pkg/builders/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,10 +860,6 @@ func StsName(cr *opsterv1.OpenSearchCluster, nodePool *opsterv1.NodePool) string
return cr.Name + "-" + nodePool.Component
}

func ReplicaHostName(currentSts appsv1.StatefulSet, repNum int32) string {
return fmt.Sprintf("%s-%d", currentSts.ObjectMeta.Name, repNum)
}

func DiscoveryServiceName(cr *opsterv1.OpenSearchCluster) string {
return fmt.Sprintf("%s-discovery", cr.Name)
}
Expand All @@ -872,11 +868,6 @@ func BootstrapPodName(cr *opsterv1.OpenSearchCluster) string {
return fmt.Sprintf("%s-bootstrap-0", cr.Name)
}

func WorkingPodForRollingRestart(sts *appsv1.StatefulSet) string {
ordinal := pointer.Int32Deref(sts.Spec.Replicas, 1) - 1 - sts.Status.UpdatedReplicas
return ReplicaHostName(*sts, ordinal)
}

func STSInNodePools(sts appsv1.StatefulSet, nodepools []opsterv1.NodePool) bool {
for _, nodepool := range nodepools {
if sts.Labels[helpers.NodePoolLabel] == nodepool.Component {
Expand Down
52 changes: 51 additions & 1 deletion opensearch-operator/pkg/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"k8s.io/apimachinery/pkg/types"

"github.com/hashicorp/go-version"
"github.com/samber/lo"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -24,6 +25,8 @@ import (
const (
stsUpdateWaitTime = 30
updateStepTime = 3

stsRevisionLabel = "controller-revision-hash"
)

func ContainsString(slice []string, s string) bool {
Expand All @@ -45,7 +48,7 @@ func GetField(v *appsv1.StatefulSetSpec, field string) interface{} {

func RemoveIt(ss opsterv1.ComponentStatus, ssSlice []opsterv1.ComponentStatus) []opsterv1.ComponentStatus {
for idx, v := range ssSlice {
if v == ss {
if ComponentStatusEqual(v, ss) {
return append(ssSlice[0:idx], ssSlice[idx+1:]...)
}
}
Expand All @@ -57,6 +60,10 @@ func Replace(remove opsterv1.ComponentStatus, add opsterv1.ComponentStatus, ssSl
return fullSliced
}

func ComponentStatusEqual(left opsterv1.ComponentStatus, right opsterv1.ComponentStatus) bool {
return left.Component == right.Component && left.Description == right.Description && left.Status == right.Status
}

func FindFirstPartial(
arr []opsterv1.ComponentStatus,
item opsterv1.ComponentStatus,
Expand Down Expand Up @@ -115,6 +122,13 @@ func GetByDescriptionAndGroup(left opsterv1.ComponentStatus, right opsterv1.Comp
return right, false
}

func GetByComponent(left opsterv1.ComponentStatus, right opsterv1.ComponentStatus) (opsterv1.ComponentStatus, bool) {
if left.Component == right.Component {
return left, true
}
return right, false
}

func MergeConfigs(left map[string]string, right map[string]string) map[string]string {
if left == nil {
return right
Expand Down Expand Up @@ -380,3 +394,39 @@ func CalculateJvmHeapSize(nodePool *opsterv1.NodePool) string {

return nodePool.Jvm
}

func UpgradeInProgress(status opsterv1.ClusterStatus) bool {
componentStatus := opsterv1.ComponentStatus{
Component: "Upgrader",
}
_, found := FindFirstPartial(status.ComponentsStatus, componentStatus, GetByComponent)
return found
}
func ReplicaHostName(currentSts appsv1.StatefulSet, repNum int32) string {
return fmt.Sprintf("%s-%d", currentSts.ObjectMeta.Name, repNum)
}

func WorkingPodForRollingRestart(ctx context.Context, k8sClient client.Client, sts *appsv1.StatefulSet) (string, error) {
replicas := lo.FromPtrOr(sts.Spec.Replicas, 1)
// Handle the simple case
if replicas == sts.Status.UpdatedReplicas+sts.Status.CurrentReplicas {
ordinal := replicas - 1 - sts.Status.UpdatedReplicas
return ReplicaHostName(*sts, ordinal), nil
}
// If there are potentially mixed revisions we need to check each pod
for i := replicas - 1; i >= 0; i-- {
podName := ReplicaHostName(*sts, i)
pod := &corev1.Pod{}
if err := k8sClient.Get(ctx, types.NamespacedName{Name: podName, Namespace: sts.Namespace}, pod); err != nil {
return "", err
}
podRevision, ok := pod.Labels[stsRevisionLabel]
if !ok {
return "", fmt.Errorf("pod %s has no revision label", podName)
}
if podRevision != sts.Status.UpdateRevision {
return podName, nil
}
}
return "", errors.New("unable to calculate the working pod for rolling restart")
}
6 changes: 4 additions & 2 deletions opensearch-operator/pkg/reconcilers/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ func (r *ClusterReconciler) reconcileNodeStatefulSet(nodePool opsterv1.NodePool,
}

// Detect cluster failure and initiate parallel recovery
if helpers.ParallelRecoveryMode() && (nodePool.Persistence == nil || nodePool.Persistence.PersistenceSource.PVC != nil) {
if helpers.ParallelRecoveryMode() &&
(nodePool.Persistence == nil || nodePool.Persistence.PersistenceSource.PVC != nil) {
// This logic only works if the STS uses PVCs
// First check if the STS already has a readable status (CurrentRevision == "" indicates the STS is newly created and the controller has not yet updated the status properly)
if existing.Status.CurrentRevision == "" {
Expand All @@ -262,7 +263,8 @@ func (r *ClusterReconciler) reconcileNodeStatefulSet(nodePool opsterv1.NodePool,
} else {
// A failure is assumed if n PVCs exist but less than n-1 pods (one missing pod is allowed for rolling restart purposes)
// We can assume the cluster is in a failure state and cannot recover on its own
if pvcCount >= int(nodePool.Replicas) && existing.Status.ReadyReplicas < nodePool.Replicas-1 {
if !helpers.UpgradeInProgress(r.instance.Status) &&
pvcCount >= int(nodePool.Replicas) && existing.Status.ReadyReplicas < nodePool.Replicas-1 {
r.logger.Info(fmt.Sprintf("Detected recovery situation for nodepool %s: PVC count: %d, replicas: %d. Recreating STS with parallel mode", nodePool.Component, pvcCount, existing.Status.Replicas))
if existing.Spec.PodManagementPolicy != appsv1.ParallelPodManagement {
// Switch to Parallel to jumpstart the cluster
Expand Down
15 changes: 13 additions & 2 deletions opensearch-operator/pkg/reconcilers/rollingRestart.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) {
return ctrl.Result{}, nil
}

// Skip a rolling restart if the cluster hasn't finished initializing
if !r.instance.Status.Initialized {
return ctrl.Result{
Requeue: true,
RequeueAfter: 10 * time.Second,
}, nil
}

if err := r.updateStatus(statusInProgress); err != nil {
return ctrl.Result{Requeue: true}, err
}
Expand Down Expand Up @@ -155,7 +163,7 @@ func (r *RollingRestartReconciler) restartStatefulSetPod(sts *appsv1.StatefulSet
lg.Info("only 2 data nodes and drain is set, some shards may not drain")
}

ready, err := services.CheckClusterStatusForRestart(r.osClient, r.instance.Spec.General.DrainDataNodes)
ready, _, err := services.CheckClusterStatusForRestart(r.osClient, r.instance.Spec.General.DrainDataNodes)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -166,7 +174,10 @@ func (r *RollingRestartReconciler) restartStatefulSetPod(sts *appsv1.StatefulSet
}, nil
}

workingPod := builders.WorkingPodForRollingRestart(sts)
workingPod, err := helpers.WorkingPodForRollingRestart(r.ctx, r.Client, sts)
if err != nil {
return ctrl.Result{}, err
}

ready, err = services.PreparePodForDelete(r.osClient, workingPod, r.instance.Spec.General.DrainDataNodes, dataCount)
if err != nil {
Expand Down
Loading

0 comments on commit 181c86a

Please sign in to comment.