From a4c78c113558c673e86ccde683bbe8cc296d59a2 Mon Sep 17 00:00:00 2001 From: Dan Bason Date: Thu, 1 Dec 2022 14:49:07 +1300 Subject: [PATCH 01/10] Allow certmgr issued certs to be used Signed-off-by: Dan Bason --- opensearch-operator/pkg/reconcilers/tls.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/opensearch-operator/pkg/reconcilers/tls.go b/opensearch-operator/pkg/reconcilers/tls.go index 0b7cf104..88b5799f 100644 --- a/opensearch-operator/pkg/reconcilers/tls.go +++ b/opensearch-operator/pkg/reconcilers/tls.go @@ -443,7 +443,14 @@ func (r *TLSReconciler) providedCaCert(secretName string, namespace string) (tls if err := r.Get(r.ctx, client.ObjectKey{Name: secretName, Namespace: namespace}, &caSecret); err != nil { return ca, err } - ca = r.pki.CAFromSecret(caSecret.Data) + data := caSecret.Data + if _, ok := caSecret.Annotations["cert-manager.io/issuer-kind"]; ok { + data = map[string][]byte{ + "ca.crt": caSecret.Data["tls.crt"], + "ca.key": caSecret.Data["tls.key"], + } + } + ca = r.pki.CAFromSecret(data) return ca, nil } From 7467827c09fbf978018be0fb23bf44b560f9aa5f Mon Sep 17 00:00:00 2001 From: Dan Bason Date: Wed, 7 Dec 2022 10:08:56 +1300 Subject: [PATCH 02/10] Refactor admin certs for new version Signed-off-by: Dan Bason --- opensearch-operator/pkg/reconcilers/tls.go | 116 +++++++++++++++------ 1 file changed, 82 insertions(+), 34 deletions(-) diff --git a/opensearch-operator/pkg/reconcilers/tls.go b/opensearch-operator/pkg/reconcilers/tls.go index 88b5799f..91a90f94 100644 --- a/opensearch-operator/pkg/reconcilers/tls.go +++ b/opensearch-operator/pkg/reconcilers/tls.go @@ -6,12 +6,12 @@ import ( "fmt" "strings" - "k8s.io/client-go/tools/record" - + "github.com/Masterminds/semver" "github.com/cisco-open/operator-tools/pkg/reconciler" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" opsterv1 "opensearch.opster.io/api/v1" "opensearch.opster.io/pkg/builders" "opensearch.opster.io/pkg/helpers" @@ -102,50 +102,98 @@ func (r *TLSReconciler) handleTransport() error { func (r *TLSReconciler) handleAdminCertificate() error { tlsConfig := r.instance.Spec.Security.Tls.Transport - namespace := r.instance.Namespace clusterName := r.instance.Name - adminSecretName := clusterName + "-admin-cert" if tlsConfig.Generate { - // Generate admin client certificate - var ca tls.Cert - var err error - if r.instance.Spec.Security.Tls.Transport.TlsCertificateConfig.CaSecret.Name != "" { - ca, err = r.providedCaCert(r.instance.Spec.Security.Tls.Transport.TlsCertificateConfig.CaSecret.Name, namespace) - } else { - ca, err = util.ReadOrGenerateCaCert(r.pki, r.Client, r.ctx, r.instance) - } + ca, err := r.getCACert() if err != nil { return err } - - adminSecret := corev1.Secret{} - if err := r.Get(r.ctx, client.ObjectKey{Name: adminSecretName, Namespace: namespace}, &adminSecret); err != nil { - adminCert, err := ca.CreateAndSignCertificate("admin", clusterName, nil) - if err != nil { - r.logger.Error(err, "Failed to create admin certificate", "interface", "transport") - r.recorder.AnnotatedEventf(r.instance, map[string]string{"cluster-name": r.instance.GetName()}, "Warning", "Security", "Failed to create admin certificate") - return err - } - adminSecret = corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: adminSecretName, Namespace: namespace}, Type: corev1.SecretTypeTLS, Data: adminCert.SecretData(ca)} - if err := ctrl.SetControllerReference(r.instance, &adminSecret, r.Client.Scheme()); err != nil { - return err - } - if err := r.Create(r.ctx, &adminSecret); err != nil { - r.logger.Error(err, "Failed to store admin certificate in secret", "interface", "transport") - return err - } + err = r.createAdminSecret(ca) + if err != nil { + return err } - // Add admin_dn to config r.reconcilerContext.AddConfig("plugins.security.authcz.admin_dn", fmt.Sprintf("[\"CN=admin,OU=%s\"]", clusterName)) - } else { - // Add provided admin_dn to config - adminDn := strings.Join(tlsConfig.AdminDn, "\",\"") - r.reconcilerContext.AddConfig("plugins.security.authcz.admin_dn", fmt.Sprintf("[\"%s\"]", adminDn)) + return nil } + + adminDn := strings.Join(tlsConfig.AdminDn, "\",\"") + r.reconcilerContext.AddConfig("plugins.security.authcz.admin_dn", fmt.Sprintf("[\"%s\"]", adminDn)) return nil } +func (r *TLSReconciler) securityChangeVersion() bool { + newVersionConstraint, err := semver.NewConstraint(">=2.0.0") + if err != nil { + panic(err) + } + + version, err := semver.NewVersion(r.instance.Spec.General.Version) + if err != nil { + r.logger.Error(err, "unable to parse version, assuming >= 2.0.0") + return true + } + return newVersionConstraint.Check(version) +} + +func (r *TLSReconciler) adminCAProvided() bool { + if r.securityChangeVersion() { + return r.instance.Spec.Security.Tls.Http.TlsCertificateConfig.CaSecret.Name != "" + } + return r.instance.Spec.Security.Tls.Transport.TlsCertificateConfig.CaSecret.Name != "" +} + +func (r *TLSReconciler) providedCAForAdminCert() (tls.Cert, error) { + if r.securityChangeVersion() { + return r.providedCaCert( + r.instance.Spec.Security.Tls.Http.TlsCertificateConfig.CaSecret.Name, + r.instance.Namespace, + ) + } + return r.providedCaCert( + r.instance.Spec.Security.Tls.Transport.TlsCertificateConfig.CaSecret.Name, + r.instance.Namespace, + ) +} + +func (r *TLSReconciler) getCACert() (tls.Cert, error) { + if r.adminCAProvided() { + return r.providedCAForAdminCert() + } + return util.ReadOrGenerateCaCert(r.pki, r.Client, r.ctx, r.instance) +} + +func (r *TLSReconciler) createAdminSecret(ca tls.Cert) error { + adminCert, err := ca.CreateAndSignCertificate("admin", r.instance.Name, nil) + if err != nil { + r.logger.Error(err, "Failed to create admin certificate", "interface", "transport") + r.recorder.AnnotatedEventf( + r.instance, + map[string]string{"cluster-name": r.instance.GetName()}, + "Warning", + "Security", + "Failed to create admin certificate", + ) + return err + } + adminSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: r.adminSecretName(), + Namespace: r.instance.Namespace, + }, + Type: corev1.SecretTypeTLS, + Data: adminCert.SecretData(ca), + } + if err := ctrl.SetControllerReference(r.instance, adminSecret, r.Client.Scheme()); err != nil { + return err + } + return client.IgnoreAlreadyExists(r.Create(r.ctx, adminSecret)) +} + +func (r *TLSReconciler) adminSecretName() string { + return r.instance.Name + "-admin-cert" +} + func (r *TLSReconciler) handleTransportGenerateGlobal() error { namespace := r.instance.Namespace clusterName := r.instance.Name From 9fd925c6d26d0e03dfdf49525bb3acbf96f9e69a Mon Sep 17 00:00:00 2001 From: Dan Bason Date: Fri, 21 Jul 2023 11:35:13 +1200 Subject: [PATCH 03/10] Add validity check for admin cert Signed-off-by: Dan Bason --- .../operatortests/deploy_and_upgrade_test.go | 2 +- opensearch-operator/go.mod | 2 + opensearch-operator/go.sum | 4 + opensearch-operator/pkg/reconcilers/tls.go | 113 ++++++++++++++---- opensearch-operator/pkg/tls/pki.go | 58 +++++++++ 5 files changed, 154 insertions(+), 25 deletions(-) diff --git a/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go b/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go index d8ea330c..7a358792 100644 --- a/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go +++ b/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go @@ -72,7 +72,7 @@ var _ = Describe("DeployAndUpgrade", Ordered, func() { return sts.Status.UpdatedReplicas } return 0 - }, time.Minute*15, time.Second*5).Should(Equal(int32(3))) + }, time.Minute*30, time.Second*5).Should(Equal(int32(3))) }) It("should upgrade the dashboard pod", func() { diff --git a/opensearch-operator/go.mod b/opensearch-operator/go.mod index 78003564..b2c1b780 100644 --- a/opensearch-operator/go.mod +++ b/opensearch-operator/go.mod @@ -15,6 +15,7 @@ require ( github.com/opensearch-project/opensearch-go v1.1.0 github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.61.1 + github.com/samber/lo v1.38.1 go.uber.org/zap v1.24.0 k8s.io/api v0.27.2 k8s.io/apiextensions-apiserver v0.27.2 @@ -71,6 +72,7 @@ require ( github.com/wayneashleyberry/terminal-dimensions v1.1.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect + golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect golang.org/x/sys v0.8.0 // indirect diff --git a/opensearch-operator/go.sum b/opensearch-operator/go.sum index bad40bad..aeb0776b 100644 --- a/opensearch-operator/go.sum +++ b/opensearch-operator/go.sum @@ -431,6 +431,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -523,6 +525,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= +golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/opensearch-operator/pkg/reconcilers/tls.go b/opensearch-operator/pkg/reconcilers/tls.go index 91a90f94..f3a54706 100644 --- a/opensearch-operator/pkg/reconcilers/tls.go +++ b/opensearch-operator/pkg/reconcilers/tls.go @@ -5,12 +5,16 @@ import ( "errors" "fmt" "strings" + "time" "github.com/Masterminds/semver" "github.com/cisco-open/operator-tools/pkg/reconciler" "github.com/go-logr/logr" + "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" opsterv1 "opensearch.opster.io/api/v1" "opensearch.opster.io/pkg/builders" @@ -75,6 +79,10 @@ func (r *TLSReconciler) Reconcile() (ctrl.Result, error) { return ctrl.Result{}, err } } + if r.reconcileAdminCert() { + res, err := r.handleAdminCertificate() + return lo.FromPtrOr(res, ctrl.Result{}), err + } return ctrl.Result{}, nil } @@ -96,30 +104,33 @@ func (r *TLSReconciler) handleTransport() error { return err } } - err := r.handleAdminCertificate() - return err + return nil } -func (r *TLSReconciler) handleAdminCertificate() error { +func (r *TLSReconciler) handleAdminCertificate() (*ctrl.Result, error) { + //TODO: This should be refactored in the API - https://github.com/Opster/opensearch-k8s-operator/issues/569 tlsConfig := r.instance.Spec.Security.Tls.Transport clusterName := r.instance.Name + var res *ctrl.Result + var certDN string if tlsConfig.Generate { ca, err := r.getCACert() if err != nil { - return err + return nil, err } - err = r.createAdminSecret(ca) + res, err = r.createAdminSecret(ca) if err != nil { - return err + return nil, err } - r.reconcilerContext.AddConfig("plugins.security.authcz.admin_dn", fmt.Sprintf("[\"CN=admin,OU=%s\"]", clusterName)) - return nil + certDN = fmt.Sprintf("CN=admin,OU=%s", clusterName) + + } else { + certDN = strings.Join(tlsConfig.AdminDn, "\",\"") } - adminDn := strings.Join(tlsConfig.AdminDn, "\",\"") - r.reconcilerContext.AddConfig("plugins.security.authcz.admin_dn", fmt.Sprintf("[\"%s\"]", adminDn)) - return nil + r.reconcilerContext.AddConfig("plugins.security.authcz.admin_dn", fmt.Sprintf("[\"%s\"]", certDN)) + return res, nil } func (r *TLSReconciler) securityChangeVersion() bool { @@ -136,22 +147,27 @@ func (r *TLSReconciler) securityChangeVersion() bool { return newVersionConstraint.Check(version) } -func (r *TLSReconciler) adminCAProvided() bool { +func (r *TLSReconciler) adminCAName() string { if r.securityChangeVersion() { - return r.instance.Spec.Security.Tls.Http.TlsCertificateConfig.CaSecret.Name != "" + return r.instance.Spec.Security.Tls.Http.TlsCertificateConfig.CaSecret.Name } - return r.instance.Spec.Security.Tls.Transport.TlsCertificateConfig.CaSecret.Name != "" + return r.instance.Spec.Security.Tls.Transport.TlsCertificateConfig.CaSecret.Name } -func (r *TLSReconciler) providedCAForAdminCert() (tls.Cert, error) { +func (r *TLSReconciler) reconcileAdminCert() bool { if r.securityChangeVersion() { - return r.providedCaCert( - r.instance.Spec.Security.Tls.Http.TlsCertificateConfig.CaSecret.Name, - r.instance.Namespace, - ) + return r.instance.Spec.Security.Tls.Http != nil && r.instance.Spec.Security.Tls.Transport != nil } + return r.instance.Spec.Security.Tls.Transport != nil +} + +func (r *TLSReconciler) adminCAProvided() bool { + return r.adminCAName() != "" +} + +func (r *TLSReconciler) providedCAForAdminCert() (tls.Cert, error) { return r.providedCaCert( - r.instance.Spec.Security.Tls.Transport.TlsCertificateConfig.CaSecret.Name, + r.adminCAName(), r.instance.Namespace, ) } @@ -163,7 +179,56 @@ func (r *TLSReconciler) getCACert() (tls.Cert, error) { return util.ReadOrGenerateCaCert(r.pki, r.Client, r.ctx, r.instance) } -func (r *TLSReconciler) createAdminSecret(ca tls.Cert) error { +func (r *TLSReconciler) shouldCreateAdminCert(ca tls.Cert) (bool, error) { + secret := &corev1.Secret{} + err := r.Get(r.ctx, types.NamespacedName{ + Name: r.adminSecretName(), + Namespace: r.instance.Namespace, + }, secret) + if err != nil { + if k8serrors.IsNotFound(err) { + r.logger.Info("admin cert does not exist, creating") + return true, nil + } + return false, err + } + + data, ok := secret.Data[corev1.TLSCertKey] + if !ok { + return true, nil + } + + validator, err := tls.NewCertValidater(data, tls.WithExpiryThreshold(5*24*time.Hour)) + if err != nil { + return false, err + } + + if validator.IsExpiringSoon() { + r.logger.Info("admin cert is expiring soon, recreating") + return true, nil + } + + verified, err := validator.IsSignedByCA(ca) + if err != nil { + return false, err + } + + if !verified { + r.logger.Info("admin cert is not signed by CA, recreating") + } + + return !verified, nil +} + +func (r *TLSReconciler) createAdminSecret(ca tls.Cert) (*ctrl.Result, error) { + createCert, err := r.shouldCreateAdminCert(ca) + if err != nil { + return nil, fmt.Errorf("failed to determine if admin cert should be created: %w", err) + } + if !createCert { + return nil, nil + } + adminCert, err := ca.CreateAndSignCertificate("admin", r.instance.Name, nil) if err != nil { r.logger.Error(err, "Failed to create admin certificate", "interface", "transport") @@ -174,7 +239,7 @@ func (r *TLSReconciler) createAdminSecret(ca tls.Cert) error { "Security", "Failed to create admin certificate", ) - return err + return nil, err } adminSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -185,9 +250,9 @@ func (r *TLSReconciler) createAdminSecret(ca tls.Cert) error { Data: adminCert.SecretData(ca), } if err := ctrl.SetControllerReference(r.instance, adminSecret, r.Client.Scheme()); err != nil { - return err + return nil, err } - return client.IgnoreAlreadyExists(r.Create(r.ctx, adminSecret)) + return r.ReconcileResource(adminSecret, reconciler.StatePresent) } func (r *TLSReconciler) adminSecretName() string { diff --git a/opensearch-operator/pkg/tls/pki.go b/opensearch-operator/pkg/tls/pki.go index 5dc0c8b1..0a3cfca0 100644 --- a/opensearch-operator/pkg/tls/pki.go +++ b/opensearch-operator/pkg/tls/pki.go @@ -29,6 +29,11 @@ type Cert interface { CreateAndSignCertificate(commonName string, orgUnit string, dnsnames []string) (cert Cert, err error) } +type CertValidater interface { + IsExpiringSoon() bool + IsSignedByCA(ca Cert) (bool, error) +} + // Dummy struct so that PKI interface can be implemented for easier mocking in tests type PkiImpl struct { } @@ -211,3 +216,56 @@ func calculateExtension(commonName string, dnsNames []string) (pkix.Extension, e } return san, nil } + +type implCertValidater struct { + implCertValidaterOptions + cert *x509.Certificate +} + +type implCertValidaterOptions struct { + expiryThreshold time.Duration +} + +type ImplCertValidaterOption func(*implCertValidaterOptions) + +func (o *implCertValidaterOptions) apply(opts ...ImplCertValidaterOption) { + for _, opt := range opts { + opt(o) + } +} + +func WithExpiryThreshold(expiryThreshold time.Duration) ImplCertValidaterOption { + return func(o *implCertValidaterOptions) { + o.expiryThreshold = expiryThreshold + } +} + +func NewCertValidater(pemData []byte, opts ...ImplCertValidaterOption) (CertValidater, error) { + var o implCertValidaterOptions + o.apply(opts...) + + block, _ := pem.Decode(pemData) + + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + return nil, err + } + return &implCertValidater{ + implCertValidaterOptions: o, + cert: cert, + }, nil +} + +func (i *implCertValidater) IsExpiringSoon() bool { + return time.Now().After(i.cert.NotAfter.Add(i.expiryThreshold * -1)) +} + +func (i *implCertValidater) IsSignedByCA(ca Cert) (bool, error) { + block, _ := pem.Decode(ca.CertData()) + caCert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + return false, err + } + + return bytes.Equal(i.cert.RawIssuer, caCert.RawSubject), nil +} From 44aab4d8a93a22f719aed67e10c9dce396473a2c Mon Sep 17 00:00:00 2001 From: Dan Bason Date: Mon, 24 Jul 2023 09:49:48 +1200 Subject: [PATCH 04/10] Pre-pull opensearch images for tests Signed-off-by: Dan Bason --- .github/workflows/functional-tests.yaml | 6 +++++- opensearch-operator/functionaltests/execute_tests.sh | 6 ++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/.github/workflows/functional-tests.yaml b/.github/workflows/functional-tests.yaml index 4f6b17f9..b8f2e043 100644 --- a/.github/workflows/functional-tests.yaml +++ b/.github/workflows/functional-tests.yaml @@ -39,7 +39,7 @@ jobs: cd functionaltests ## Run tests - go test ./operatortests -timeout 30m + go test ./operatortests -timeout 45mm cluster-helm-chart: runs-on: ubuntu-latest @@ -69,9 +69,13 @@ jobs: ## Build controller docker image make docker-build + docker pull opensearchproject/opensearch:1.3.0 + docker pull opensearchproject/opensearch:2.3.0 ## Import controller docker image k3d image import -c $CLUSTER_NAME controller:latest + k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:1.3.0 + k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:2.3.0 ## Install helm chart helm install opensearch-operator ../charts/opensearch-operator --set manager.image.repository=controller --set manager.image.tag=latest --set manager.image.pullPolicy=IfNotPresent --namespace default --wait diff --git a/opensearch-operator/functionaltests/execute_tests.sh b/opensearch-operator/functionaltests/execute_tests.sh index c6c00b09..b0a708b2 100755 --- a/opensearch-operator/functionaltests/execute_tests.sh +++ b/opensearch-operator/functionaltests/execute_tests.sh @@ -6,12 +6,18 @@ k3d cluster create $CLUSTER_NAME --agents 2 --kubeconfig-switch-context=false -- k3d kubeconfig get $CLUSTER_NAME > kubeconfig export KUBECONFIG=$(pwd)/kubeconfig +## Pre-pull opensearch images +docker pull opensearchproject/opensearch:1.3.0 +docker pull opensearchproject/opensearch:2.3.0 + ## Build controller docker image cd .. make docker-build ## Import controller docker image k3d image import -c $CLUSTER_NAME controller:latest +k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:1.3.0 +k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:2.3.0 ## Install helm chart helm install opensearch-operator ../charts/opensearch-operator --set manager.image.repository=controller --set manager.image.tag=latest --set manager.image.pullPolicy=IfNotPresent --namespace default --wait From 53ce5ceac265dae44793cf34f2fdb44510512df1 Mon Sep 17 00:00:00 2001 From: Dan Bason Date: Mon, 24 Jul 2023 09:51:21 +1200 Subject: [PATCH 05/10] Don't use parallel recovery when upgrade is in progress Signed-off-by: Dan Bason --- .github/workflows/functional-tests.yaml | 2 +- .../functionaltests/execute_tests.sh | 2 +- .../operatortests/deploy_and_upgrade_test.go | 2 +- opensearch-operator/pkg/helpers/helpers.go | 15 +++++++++++++++ opensearch-operator/pkg/reconcilers/cluster.go | 4 +++- 5 files changed, 21 insertions(+), 4 deletions(-) diff --git a/.github/workflows/functional-tests.yaml b/.github/workflows/functional-tests.yaml index b8f2e043..b4c66b00 100644 --- a/.github/workflows/functional-tests.yaml +++ b/.github/workflows/functional-tests.yaml @@ -39,7 +39,7 @@ jobs: cd functionaltests ## Run tests - go test ./operatortests -timeout 45mm + go test ./operatortests -timeout 30m cluster-helm-chart: runs-on: ubuntu-latest diff --git a/opensearch-operator/functionaltests/execute_tests.sh b/opensearch-operator/functionaltests/execute_tests.sh index b0a708b2..a1174aa9 100755 --- a/opensearch-operator/functionaltests/execute_tests.sh +++ b/opensearch-operator/functionaltests/execute_tests.sh @@ -26,7 +26,7 @@ helm install opensearch-cluster ../charts/opensearch-cluster --set OpenSearchClu cd functionaltests ## Run tests -go test ./operatortests -timeout 30m +go test ./operatortests -timeout 45m go test ./helmtests -timeout 15m ## Delete k3d cluster k3d cluster delete $CLUSTER_NAME diff --git a/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go b/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go index 7a358792..ca32c4f6 100644 --- a/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go +++ b/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go @@ -72,7 +72,7 @@ var _ = Describe("DeployAndUpgrade", Ordered, func() { return sts.Status.UpdatedReplicas } return 0 - }, time.Minute*30, time.Second*5).Should(Equal(int32(3))) + }, time.Minute*20, time.Second*5).Should(Equal(int32(3))) }) It("should upgrade the dashboard pod", func() { diff --git a/opensearch-operator/pkg/helpers/helpers.go b/opensearch-operator/pkg/helpers/helpers.go index 4f098c0e..d4d9c5d6 100644 --- a/opensearch-operator/pkg/helpers/helpers.go +++ b/opensearch-operator/pkg/helpers/helpers.go @@ -115,6 +115,13 @@ func GetByDescriptionAndGroup(left opsterv1.ComponentStatus, right opsterv1.Comp return right, false } +func GetByComponent(left opsterv1.ComponentStatus, right opsterv1.ComponentStatus) (opsterv1.ComponentStatus, bool) { + if left.Component == right.Component { + return left, true + } + return right, false +} + func MergeConfigs(left map[string]string, right map[string]string) map[string]string { if left == nil { return right @@ -380,3 +387,11 @@ func CalculateJvmHeapSize(nodePool *opsterv1.NodePool) string { return nodePool.Jvm } + +func UpgradeInProgress(status opsterv1.ClusterStatus) bool { + componentStatus := opsterv1.ComponentStatus{ + Component: "Upgrader", + } + _, found := FindFirstPartial(status.ComponentsStatus, componentStatus, GetByComponent) + return found +} diff --git a/opensearch-operator/pkg/reconcilers/cluster.go b/opensearch-operator/pkg/reconcilers/cluster.go index 8bb1e582..7c7296d3 100644 --- a/opensearch-operator/pkg/reconcilers/cluster.go +++ b/opensearch-operator/pkg/reconcilers/cluster.go @@ -246,7 +246,9 @@ func (r *ClusterReconciler) reconcileNodeStatefulSet(nodePool opsterv1.NodePool, } // Detect cluster failure and initiate parallel recovery - if helpers.ParallelRecoveryMode() && (nodePool.Persistence == nil || nodePool.Persistence.PersistenceSource.PVC != nil) { + // Don't do this if the cluster is upgrading + if !helpers.UpgradeInProgress(r.instance.Status) && helpers.ParallelRecoveryMode() && + (nodePool.Persistence == nil || nodePool.Persistence.PersistenceSource.PVC != nil) { // This logic only works if the STS uses PVCs // First check if the STS already has a readable status (CurrentRevision == "" indicates the STS is newly created and the controller has not yet updated the status properly) if existing.Status.CurrentRevision == "" { From ee50856d4c0b6e7d2bb58dd202d2168d4788c2aa Mon Sep 17 00:00:00 2001 From: Dan Bason Date: Tue, 25 Jul 2023 08:56:57 +1200 Subject: [PATCH 06/10] Add ready pod check to upgrades Signed-off-by: Dan Bason --- .github/workflows/functional-tests.yaml | 6 ++++++ opensearch-operator/pkg/reconcilers/upgrade.go | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/.github/workflows/functional-tests.yaml b/.github/workflows/functional-tests.yaml index b4c66b00..58b0fc4c 100644 --- a/.github/workflows/functional-tests.yaml +++ b/.github/workflows/functional-tests.yaml @@ -31,8 +31,14 @@ jobs: ## Build controller docker image make docker-build + ## Pre-pull opensearch images + docker pull opensearchproject/opensearch:1.3.0 + docker pull opensearchproject/opensearch:2.3.0 + ## Import controller docker image k3d image import -c $CLUSTER_NAME controller:latest + k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:1.3.0 + k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:2.3.0 ## Install helm chart helm install opensearch-operator ../charts/opensearch-operator --set manager.image.repository=controller --set manager.image.tag=latest --set manager.image.pullPolicy=IfNotPresent --namespace default --wait diff --git a/opensearch-operator/pkg/reconcilers/upgrade.go b/opensearch-operator/pkg/reconcilers/upgrade.go index 9fb20140..ac7fcee3 100644 --- a/opensearch-operator/pkg/reconcilers/upgrade.go +++ b/opensearch-operator/pkg/reconcilers/upgrade.go @@ -295,6 +295,11 @@ func (r *UpgradeReconciler) doNodePoolUpgrade(pool opsterv1.NodePool) error { r.logger.Info("only 2 data nodes and drain is set, some shards may not drain") } + if sts.Status.ReadyReplicas < sts.Status.Replicas { + r.logger.Info("Waiting for all pods to be ready") + return nil + } + ready, err := services.CheckClusterStatusForRestart(r.osClient, r.instance.Spec.General.DrainDataNodes) if err != nil { r.logger.Error(err, "Could not check opensearch cluster status") From 9993defd2ecc4d6cb5ec0a2a43c7d4429a1fde79 Mon Sep 17 00:00:00 2001 From: Dan Bason Date: Tue, 25 Jul 2023 09:42:40 +1200 Subject: [PATCH 07/10] Add logic for mixed revision sts Signed-off-by: Dan Bason --- ...ensearch.opster.io_opensearchclusters.yaml | 146 +++++++++++++++--- .../functionaltests/execute_tests.sh | 2 +- .../operatortests/deploy_and_upgrade_test.go | 4 +- opensearch-operator/pkg/builders/cluster.go | 9 -- opensearch-operator/pkg/helpers/helpers.go | 30 ++++ .../pkg/reconcilers/rollingRestart.go | 2 +- opensearch-operator/pkg/reconcilers/scaler.go | 10 +- .../pkg/reconcilers/upgrade.go | 5 +- 8 files changed, 168 insertions(+), 40 deletions(-) diff --git a/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml b/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml index 8df674bd..79471cd3 100644 --- a/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml +++ b/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml @@ -935,6 +935,28 @@ spec: description: ResourceRequirements describes the compute resource requirements. properties: + claims: + description: "Claims lists the names of resources, defined + in spec.resourceClaims, that are used by this container. + \n This is an alpha field and requires enabling the DynamicResourceAllocation + feature gate. \n This field is immutable. It can only be + set for containers." + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: Name must match the name of one entry in + pod.spec.resourceClaims of the Pod where this field + is used. It makes that resource available inside a + container. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map limits: additionalProperties: anyOf: @@ -955,7 +977,8 @@ spec: description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, otherwise - to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + to an implementation-defined value. Requests cannot exceed + Limits. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object tolerations: @@ -2305,9 +2328,13 @@ spec: supplementalGroups: description: A list of groups applied to the first process run in each container, in addition to the container's primary - GID. If unspecified, no groups will be added to any container. - Note that this field cannot be set when spec.os.name is - windows. + GID, the fsGroup (if specified), and group memberships defined + in the container image for the uid of the container process. + If unspecified, no additional groups are added to any container. + Note that group memberships defined in the container image + for the uid of the container process are still effective, + even if they are not included in this list. Note that this + field cannot be set when spec.os.name is windows. items: format: int64 type: integer @@ -2378,6 +2405,28 @@ spec: description: ResourceRequirements describes the compute resource requirements. properties: + claims: + description: "Claims lists the names of resources, defined + in spec.resourceClaims, that are used by this container. + \n This is an alpha field and requires enabling the DynamicResourceAllocation + feature gate. \n This field is immutable. It can only be + set for containers." + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: Name must match the name of one entry in + pod.spec.resourceClaims of the Pod where this field + is used. It makes that resource available inside a + container. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map limits: additionalProperties: anyOf: @@ -2398,7 +2447,8 @@ spec: description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, otherwise - to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + to an implementation-defined value. Requests cannot exceed + Limits. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object securityContext: @@ -3003,9 +3053,13 @@ spec: supplementalGroups: description: A list of groups applied to the first process run in each container, in addition to the container's primary - GID. If unspecified, no groups will be added to any container. - Note that this field cannot be set when spec.os.name is - windows. + GID, the fsGroup (if specified), and group memberships defined + in the container image for the uid of the container process. + If unspecified, no additional groups are added to any container. + Note that group memberships defined in the container image + for the uid of the container process are still effective, + even if they are not included in this list. Note that this + field cannot be set when spec.os.name is windows. items: format: int64 type: integer @@ -3296,6 +3350,28 @@ spec: description: ResourceRequirements describes the compute resource requirements. properties: + claims: + description: "Claims lists the names of resources, defined + in spec.resourceClaims, that are used by this container. + \n This is an alpha field and requires enabling the DynamicResourceAllocation + feature gate. \n This field is immutable. It can only be + set for containers." + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: Name must match the name of one entry in + pod.spec.resourceClaims of the Pod where this field + is used. It makes that resource available inside a + container. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map limits: additionalProperties: anyOf: @@ -3316,7 +3392,8 @@ spec: description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, otherwise - to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + to an implementation-defined value. Requests cannot exceed + Limits. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object version: @@ -4371,7 +4448,7 @@ spec: value between the SizeLimit specified here and the sum of memory limits of all containers in a pod. The default is nil which means that the limit is undefined. - More info: http://kubernetes.io/docs/user-guide/volumes#emptydir' + More info: https://kubernetes.io/docs/concepts/storage/volumes#emptydir' pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true type: object @@ -4411,6 +4488,28 @@ spec: description: ResourceRequirements describes the compute resource requirements. properties: + claims: + description: "Claims lists the names of resources, defined + in spec.resourceClaims, that are used by this container. + \n This is an alpha field and requires enabling the DynamicResourceAllocation + feature gate. \n This field is immutable. It can only + be set for containers." + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: Name must match the name of one entry + in pod.spec.resourceClaims of the Pod where this + field is used. It makes that resource available + inside a container. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map limits: additionalProperties: anyOf: @@ -4431,8 +4530,8 @@ spec: description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, - otherwise to an implementation-defined value. More info: - https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + otherwise to an implementation-defined value. Requests + cannot exceed Limits. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object roles: @@ -4534,14 +4633,19 @@ spec: type: object x-kubernetes-map-type: atomic matchLabelKeys: - description: MatchLabelKeys is a set of pod label keys + description: "MatchLabelKeys is a set of pod label keys to select the pods over which spreading will be calculated. The keys are used to lookup values from the incoming pod labels, those key-value labels are ANDed with labelSelector to select the group of existing pods over which spreading - will be calculated for the incoming pod. Keys that don't - exist in the incoming pod labels will be ignored. A - null or empty list means only match against labelSelector. + will be calculated for the incoming pod. The same key + is forbidden to exist in both MatchLabelKeys and LabelSelector. + MatchLabelKeys cannot be set when LabelSelector isn't + set. Keys that don't exist in the incoming pod labels + will be ignored. A null or empty list means only match + against labelSelector. \n This is a beta field and requires + the MatchLabelKeysInPodTopologySpread feature gate to + be enabled (enabled by default)." items: type: string type: array @@ -4602,9 +4706,9 @@ spec: in the calculations. - Ignore: nodeAffinity/nodeSelector are ignored. All nodes are included in the calculations. \n If this value is nil, the behavior is equivalent - to the Honor policy. This is a alpha-level feature enabled - by the NodeInclusionPolicyInPodTopologySpread feature - flag." + to the Honor policy. This is a beta-level feature default + enabled by the NodeInclusionPolicyInPodTopologySpread + feature flag." type: string nodeTaintsPolicy: description: "NodeTaintsPolicy indicates how we will treat @@ -4613,8 +4717,8 @@ spec: tainted nodes for which the incoming pod has a toleration, are included. - Ignore: node taints are ignored. All nodes are included. \n If this value is nil, the behavior - is equivalent to the Ignore policy. This is a alpha-level - feature enabled by the NodeInclusionPolicyInPodTopologySpread + is equivalent to the Ignore policy. This is a beta-level + feature default enabled by the NodeInclusionPolicyInPodTopologySpread feature flag." type: string topologyKey: diff --git a/opensearch-operator/functionaltests/execute_tests.sh b/opensearch-operator/functionaltests/execute_tests.sh index a1174aa9..b0a708b2 100755 --- a/opensearch-operator/functionaltests/execute_tests.sh +++ b/opensearch-operator/functionaltests/execute_tests.sh @@ -26,7 +26,7 @@ helm install opensearch-cluster ../charts/opensearch-cluster --set OpenSearchClu cd functionaltests ## Run tests -go test ./operatortests -timeout 45m +go test ./operatortests -timeout 30m go test ./helmtests -timeout 15m ## Delete k3d cluster k3d cluster delete $CLUSTER_NAME diff --git a/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go b/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go index ca32c4f6..2379b69c 100644 --- a/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go +++ b/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go @@ -69,10 +69,12 @@ var _ = Describe("DeployAndUpgrade", Ordered, func() { Eventually(func() int32 { err := k8sClient.Get(context.Background(), client.ObjectKey{Name: name + "-masters", Namespace: namespace}, &sts) if err == nil { + GinkgoWriter.Printf("%+v\n", sts.Status) return sts.Status.UpdatedReplicas } + GinkgoWriter.Println(err) return 0 - }, time.Minute*20, time.Second*5).Should(Equal(int32(3))) + }, time.Minute*15, time.Second*5).Should(Equal(int32(3))) }) It("should upgrade the dashboard pod", func() { diff --git a/opensearch-operator/pkg/builders/cluster.go b/opensearch-operator/pkg/builders/cluster.go index 8f9b15ad..44613e86 100644 --- a/opensearch-operator/pkg/builders/cluster.go +++ b/opensearch-operator/pkg/builders/cluster.go @@ -860,10 +860,6 @@ func StsName(cr *opsterv1.OpenSearchCluster, nodePool *opsterv1.NodePool) string return cr.Name + "-" + nodePool.Component } -func ReplicaHostName(currentSts appsv1.StatefulSet, repNum int32) string { - return fmt.Sprintf("%s-%d", currentSts.ObjectMeta.Name, repNum) -} - func DiscoveryServiceName(cr *opsterv1.OpenSearchCluster) string { return fmt.Sprintf("%s-discovery", cr.Name) } @@ -872,11 +868,6 @@ func BootstrapPodName(cr *opsterv1.OpenSearchCluster) string { return fmt.Sprintf("%s-bootstrap-0", cr.Name) } -func WorkingPodForRollingRestart(sts *appsv1.StatefulSet) string { - ordinal := pointer.Int32Deref(sts.Spec.Replicas, 1) - 1 - sts.Status.UpdatedReplicas - return ReplicaHostName(*sts, ordinal) -} - func STSInNodePools(sts appsv1.StatefulSet, nodepools []opsterv1.NodePool) bool { for _, nodepool := range nodepools { if sts.Labels[helpers.NodePoolLabel] == nodepool.Component { diff --git a/opensearch-operator/pkg/helpers/helpers.go b/opensearch-operator/pkg/helpers/helpers.go index d4d9c5d6..7eb2e8e2 100644 --- a/opensearch-operator/pkg/helpers/helpers.go +++ b/opensearch-operator/pkg/helpers/helpers.go @@ -12,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/types" "github.com/hashicorp/go-version" + "github.com/samber/lo" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -24,6 +25,8 @@ import ( const ( stsUpdateWaitTime = 30 updateStepTime = 3 + + stsRevisionLabel = "controller-revision-hash" ) func ContainsString(slice []string, s string) bool { @@ -395,3 +398,30 @@ func UpgradeInProgress(status opsterv1.ClusterStatus) bool { _, found := FindFirstPartial(status.ComponentsStatus, componentStatus, GetByComponent) return found } +func ReplicaHostName(currentSts appsv1.StatefulSet, repNum int32) string { + return fmt.Sprintf("%s-%d", currentSts.ObjectMeta.Name, repNum) +} + +func WorkingPodForRollingRestart(ctx context.Context, k8sClient client.Client, sts *appsv1.StatefulSet) string { + // Handle the simple case + if lo.FromPtrOr(sts.Spec.Replicas, 1) == sts.Status.UpdatedReplicas+sts.Status.CurrentReplicas { + ordinal := lo.FromPtrOr(sts.Spec.Replicas, 1) - 1 - sts.Status.UpdatedReplicas + return ReplicaHostName(*sts, ordinal) + } + // If there are potentially mixed revisions we need to check each pod + for i := lo.FromPtrOr(sts.Spec.Replicas, 1) - 1; i >= 0; i-- { + podName := ReplicaHostName(*sts, i) + pod := &corev1.Pod{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: podName, Namespace: sts.Namespace}, pod); err != nil { + continue + } + podRevision, ok := pod.Labels[stsRevisionLabel] + if !ok { + continue + } + if podRevision != sts.Status.UpdateRevision { + return podName + } + } + panic("bug: unable to calculate the working pod for rolling restart") +} diff --git a/opensearch-operator/pkg/reconcilers/rollingRestart.go b/opensearch-operator/pkg/reconcilers/rollingRestart.go index 33fb1648..2c7b95f6 100644 --- a/opensearch-operator/pkg/reconcilers/rollingRestart.go +++ b/opensearch-operator/pkg/reconcilers/rollingRestart.go @@ -166,7 +166,7 @@ func (r *RollingRestartReconciler) restartStatefulSetPod(sts *appsv1.StatefulSet }, nil } - workingPod := builders.WorkingPodForRollingRestart(sts) + workingPod := helpers.WorkingPodForRollingRestart(r.ctx, r.Client, sts) ready, err = services.PreparePodForDelete(r.osClient, workingPod, r.instance.Spec.General.DrainDataNodes, dataCount) if err != nil { diff --git a/opensearch-operator/pkg/reconcilers/scaler.go b/opensearch-operator/pkg/reconcilers/scaler.go index b34ac52e..2d4baa8b 100644 --- a/opensearch-operator/pkg/reconcilers/scaler.go +++ b/opensearch-operator/pkg/reconcilers/scaler.go @@ -151,7 +151,7 @@ func (r *ScalerReconciler) increaseOneNode(currentSts appsv1.StatefulSet, nodePo lg := log.FromContext(r.ctx) *currentSts.Spec.Replicas++ annotations := map[string]string{"cluster-name": r.instance.GetName()} - lastReplicaNodeName := builders.ReplicaHostName(currentSts, *currentSts.Spec.Replicas) + lastReplicaNodeName := helpers.ReplicaHostName(currentSts, *currentSts.Spec.Replicas) r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Scaler", "Start increaseing node %s on %s ", lastReplicaNodeName, nodePoolGroupName) _, err := r.ReconcileResource(¤tSts, reconciler.StatePresent) if err != nil { @@ -167,7 +167,7 @@ func (r *ScalerReconciler) decreaseOneNode(currentStatus opsterv1.ComponentStatu lg := log.FromContext(r.ctx) *currentSts.Spec.Replicas-- annotations := map[string]string{"cluster-name": r.instance.GetName()} - lastReplicaNodeName := builders.ReplicaHostName(currentSts, *currentSts.Spec.Replicas) + lastReplicaNodeName := helpers.ReplicaHostName(currentSts, *currentSts.Spec.Replicas) r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Scaler", "Start to decreaseing node %s on %s ", lastReplicaNodeName, nodePoolGroupName) _, err := r.ReconcileResource(¤tSts, reconciler.StatePresent) if err != nil { @@ -221,7 +221,7 @@ func (r *ScalerReconciler) excludeNode(currentStatus opsterv1.ComponentStatus, c return err } // ----- Now start remove node ------ - lastReplicaNodeName := builders.ReplicaHostName(currentSts, *currentSts.Spec.Replicas-1) + lastReplicaNodeName := helpers.ReplicaHostName(currentSts, *currentSts.Spec.Replicas-1) excluded, err := services.AppendExcludeNodeHost(clusterClient, lastReplicaNodeName) if err != nil { @@ -268,7 +268,7 @@ func (r *ScalerReconciler) excludeNode(currentStatus opsterv1.ComponentStatus, c func (r *ScalerReconciler) drainNode(currentStatus opsterv1.ComponentStatus, currentSts appsv1.StatefulSet, nodePoolGroupName string) error { lg := log.FromContext(r.ctx) annotations := map[string]string{"cluster-name": r.instance.GetName()} - lastReplicaNodeName := builders.ReplicaHostName(currentSts, *currentSts.Spec.Replicas-1) + lastReplicaNodeName := helpers.ReplicaHostName(currentSts, *currentSts.Spec.Replicas-1) username, password, err := helpers.UsernameAndPassword(r.ctx, r.Client, r.instance) if err != nil { return err @@ -346,7 +346,7 @@ func (r *ScalerReconciler) removeStatefulSet(sts appsv1.StatefulSet) (*ctrl.Resu r.recorder.AnnotatedEventf(r.instance, annotations, "Normal", "Scaler", "Finished os client for scaling ") workingOrdinal := pointer.Int32Deref(sts.Spec.Replicas, 1) - 1 - lastReplicaNodeName := builders.ReplicaHostName(sts, workingOrdinal) + lastReplicaNodeName := helpers.ReplicaHostName(sts, workingOrdinal) _, err = services.AppendExcludeNodeHost(clusterClient, lastReplicaNodeName) if err != nil { lg.Error(err, fmt.Sprintf("failed to exclude node %s", lastReplicaNodeName)) diff --git a/opensearch-operator/pkg/reconcilers/upgrade.go b/opensearch-operator/pkg/reconcilers/upgrade.go index ac7fcee3..f8aac877 100644 --- a/opensearch-operator/pkg/reconcilers/upgrade.go +++ b/opensearch-operator/pkg/reconcilers/upgrade.go @@ -9,6 +9,7 @@ import ( "github.com/Masterminds/semver" "github.com/cisco-open/operator-tools/pkg/reconciler" "github.com/go-logr/logr" + "github.com/samber/lo" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -295,7 +296,7 @@ func (r *UpgradeReconciler) doNodePoolUpgrade(pool opsterv1.NodePool) error { r.logger.Info("only 2 data nodes and drain is set, some shards may not drain") } - if sts.Status.ReadyReplicas < sts.Status.Replicas { + if sts.Status.ReadyReplicas < lo.FromPtrOr(sts.Spec.Replicas, 1) { r.logger.Info("Waiting for all pods to be ready") return nil } @@ -338,7 +339,7 @@ func (r *UpgradeReconciler) doNodePoolUpgrade(pool opsterv1.NodePool) error { }) } - workingPod := builders.WorkingPodForRollingRestart(sts) + workingPod := helpers.WorkingPodForRollingRestart(r.ctx, r.Client, sts) ready, err = services.PreparePodForDelete(r.osClient, workingPod, r.instance.Spec.General.DrainDataNodes, dataCount) if err != nil { From ba07d2c9088f6e91d728b77cc4293543699dc492 Mon Sep 17 00:00:00 2001 From: Dan Bason Date: Tue, 25 Jul 2023 15:51:39 +1200 Subject: [PATCH 08/10] Add conditions to component status for tracking Signed-off-by: Dan Bason --- .github/workflows/functional-tests.yaml | 8 - ...ensearch.opster.io_opensearchclusters.yaml | 150 +++++++++++++++--- .../api/v1/opensearch_types.go | 7 +- .../api/v1/zz_generated.deepcopy.go | 9 +- ...ensearch.opster.io_opensearchclusters.yaml | 4 + .../operatortests/deploy_and_upgrade_test.go | 22 ++- .../operatortests/main_test.go | 11 +- .../services/os_data_service.go | 16 +- opensearch-operator/pkg/helpers/helpers.go | 18 ++- .../pkg/reconcilers/cluster.go | 6 +- .../pkg/reconcilers/rollingRestart.go | 15 +- .../pkg/reconcilers/upgrade.go | 68 +++++++- 12 files changed, 276 insertions(+), 58 deletions(-) diff --git a/.github/workflows/functional-tests.yaml b/.github/workflows/functional-tests.yaml index 58b0fc4c..a5657bf1 100644 --- a/.github/workflows/functional-tests.yaml +++ b/.github/workflows/functional-tests.yaml @@ -32,13 +32,9 @@ jobs: make docker-build ## Pre-pull opensearch images - docker pull opensearchproject/opensearch:1.3.0 - docker pull opensearchproject/opensearch:2.3.0 ## Import controller docker image k3d image import -c $CLUSTER_NAME controller:latest - k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:1.3.0 - k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:2.3.0 ## Install helm chart helm install opensearch-operator ../charts/opensearch-operator --set manager.image.repository=controller --set manager.image.tag=latest --set manager.image.pullPolicy=IfNotPresent --namespace default --wait @@ -75,13 +71,9 @@ jobs: ## Build controller docker image make docker-build - docker pull opensearchproject/opensearch:1.3.0 - docker pull opensearchproject/opensearch:2.3.0 ## Import controller docker image k3d image import -c $CLUSTER_NAME controller:latest - k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:1.3.0 - k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:2.3.0 ## Install helm chart helm install opensearch-operator ../charts/opensearch-operator --set manager.image.repository=controller --set manager.image.tag=latest --set manager.image.pullPolicy=IfNotPresent --namespace default --wait diff --git a/charts/opensearch-operator/files/opensearch.opster.io_opensearchclusters.yaml b/charts/opensearch-operator/files/opensearch.opster.io_opensearchclusters.yaml index 8df674bd..40c42040 100644 --- a/charts/opensearch-operator/files/opensearch.opster.io_opensearchclusters.yaml +++ b/charts/opensearch-operator/files/opensearch.opster.io_opensearchclusters.yaml @@ -935,6 +935,28 @@ spec: description: ResourceRequirements describes the compute resource requirements. properties: + claims: + description: "Claims lists the names of resources, defined + in spec.resourceClaims, that are used by this container. + \n This is an alpha field and requires enabling the DynamicResourceAllocation + feature gate. \n This field is immutable. It can only be + set for containers." + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: Name must match the name of one entry in + pod.spec.resourceClaims of the Pod where this field + is used. It makes that resource available inside a + container. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map limits: additionalProperties: anyOf: @@ -955,7 +977,8 @@ spec: description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, otherwise - to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + to an implementation-defined value. Requests cannot exceed + Limits. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object tolerations: @@ -2305,9 +2328,13 @@ spec: supplementalGroups: description: A list of groups applied to the first process run in each container, in addition to the container's primary - GID. If unspecified, no groups will be added to any container. - Note that this field cannot be set when spec.os.name is - windows. + GID, the fsGroup (if specified), and group memberships defined + in the container image for the uid of the container process. + If unspecified, no additional groups are added to any container. + Note that group memberships defined in the container image + for the uid of the container process are still effective, + even if they are not included in this list. Note that this + field cannot be set when spec.os.name is windows. items: format: int64 type: integer @@ -2378,6 +2405,28 @@ spec: description: ResourceRequirements describes the compute resource requirements. properties: + claims: + description: "Claims lists the names of resources, defined + in spec.resourceClaims, that are used by this container. + \n This is an alpha field and requires enabling the DynamicResourceAllocation + feature gate. \n This field is immutable. It can only be + set for containers." + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: Name must match the name of one entry in + pod.spec.resourceClaims of the Pod where this field + is used. It makes that resource available inside a + container. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map limits: additionalProperties: anyOf: @@ -2398,7 +2447,8 @@ spec: description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, otherwise - to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + to an implementation-defined value. Requests cannot exceed + Limits. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object securityContext: @@ -3003,9 +3053,13 @@ spec: supplementalGroups: description: A list of groups applied to the first process run in each container, in addition to the container's primary - GID. If unspecified, no groups will be added to any container. - Note that this field cannot be set when spec.os.name is - windows. + GID, the fsGroup (if specified), and group memberships defined + in the container image for the uid of the container process. + If unspecified, no additional groups are added to any container. + Note that group memberships defined in the container image + for the uid of the container process are still effective, + even if they are not included in this list. Note that this + field cannot be set when spec.os.name is windows. items: format: int64 type: integer @@ -3296,6 +3350,28 @@ spec: description: ResourceRequirements describes the compute resource requirements. properties: + claims: + description: "Claims lists the names of resources, defined + in spec.resourceClaims, that are used by this container. + \n This is an alpha field and requires enabling the DynamicResourceAllocation + feature gate. \n This field is immutable. It can only be + set for containers." + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: Name must match the name of one entry in + pod.spec.resourceClaims of the Pod where this field + is used. It makes that resource available inside a + container. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map limits: additionalProperties: anyOf: @@ -3316,7 +3392,8 @@ spec: description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, otherwise - to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + to an implementation-defined value. Requests cannot exceed + Limits. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object version: @@ -4371,7 +4448,7 @@ spec: value between the SizeLimit specified here and the sum of memory limits of all containers in a pod. The default is nil which means that the limit is undefined. - More info: http://kubernetes.io/docs/user-guide/volumes#emptydir' + More info: https://kubernetes.io/docs/concepts/storage/volumes#emptydir' pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true type: object @@ -4411,6 +4488,28 @@ spec: description: ResourceRequirements describes the compute resource requirements. properties: + claims: + description: "Claims lists the names of resources, defined + in spec.resourceClaims, that are used by this container. + \n This is an alpha field and requires enabling the DynamicResourceAllocation + feature gate. \n This field is immutable. It can only + be set for containers." + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: Name must match the name of one entry + in pod.spec.resourceClaims of the Pod where this + field is used. It makes that resource available + inside a container. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map limits: additionalProperties: anyOf: @@ -4431,8 +4530,8 @@ spec: description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, - otherwise to an implementation-defined value. More info: - https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + otherwise to an implementation-defined value. Requests + cannot exceed Limits. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object roles: @@ -4534,14 +4633,19 @@ spec: type: object x-kubernetes-map-type: atomic matchLabelKeys: - description: MatchLabelKeys is a set of pod label keys + description: "MatchLabelKeys is a set of pod label keys to select the pods over which spreading will be calculated. The keys are used to lookup values from the incoming pod labels, those key-value labels are ANDed with labelSelector to select the group of existing pods over which spreading - will be calculated for the incoming pod. Keys that don't - exist in the incoming pod labels will be ignored. A - null or empty list means only match against labelSelector. + will be calculated for the incoming pod. The same key + is forbidden to exist in both MatchLabelKeys and LabelSelector. + MatchLabelKeys cannot be set when LabelSelector isn't + set. Keys that don't exist in the incoming pod labels + will be ignored. A null or empty list means only match + against labelSelector. \n This is a beta field and requires + the MatchLabelKeysInPodTopologySpread feature gate to + be enabled (enabled by default)." items: type: string type: array @@ -4602,9 +4706,9 @@ spec: in the calculations. - Ignore: nodeAffinity/nodeSelector are ignored. All nodes are included in the calculations. \n If this value is nil, the behavior is equivalent - to the Honor policy. This is a alpha-level feature enabled - by the NodeInclusionPolicyInPodTopologySpread feature - flag." + to the Honor policy. This is a beta-level feature default + enabled by the NodeInclusionPolicyInPodTopologySpread + feature flag." type: string nodeTaintsPolicy: description: "NodeTaintsPolicy indicates how we will treat @@ -4613,8 +4717,8 @@ spec: tainted nodes for which the incoming pod has a toleration, are included. - Ignore: node taints are ignored. All nodes are included. \n If this value is nil, the behavior - is equivalent to the Ignore policy. This is a alpha-level - feature enabled by the NodeInclusionPolicyInPodTopologySpread + is equivalent to the Ignore policy. This is a beta-level + feature default enabled by the NodeInclusionPolicyInPodTopologySpread feature flag." type: string topologyKey: @@ -4803,6 +4907,10 @@ spec: properties: component: type: string + conditions: + items: + type: string + type: array description: type: string status: diff --git a/opensearch-operator/api/v1/opensearch_types.go b/opensearch-operator/api/v1/opensearch_types.go index fe78776e..f3b07d41 100644 --- a/opensearch-operator/api/v1/opensearch_types.go +++ b/opensearch-operator/api/v1/opensearch_types.go @@ -296,9 +296,10 @@ type OpenSearchCluster struct { } type ComponentStatus struct { - Component string `json:"component,omitempty"` - Status string `json:"status,omitempty"` - Description string `json:"description,omitempty"` + Component string `json:"component,omitempty"` + Status string `json:"status,omitempty"` + Description string `json:"description,omitempty"` + Conditions []string `json:"conditions,omitempty"` } // +kubebuilder:object:root=true diff --git a/opensearch-operator/api/v1/zz_generated.deepcopy.go b/opensearch-operator/api/v1/zz_generated.deepcopy.go index 0bcea474..a2912738 100644 --- a/opensearch-operator/api/v1/zz_generated.deepcopy.go +++ b/opensearch-operator/api/v1/zz_generated.deepcopy.go @@ -132,7 +132,9 @@ func (in *ClusterStatus) DeepCopyInto(out *ClusterStatus) { if in.ComponentsStatus != nil { in, out := &in.ComponentsStatus, &out.ComponentsStatus *out = make([]ComponentStatus, len(*in)) - copy(*out, *in) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } } @@ -149,6 +151,11 @@ func (in *ClusterStatus) DeepCopy() *ClusterStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ComponentStatus) DeepCopyInto(out *ComponentStatus) { *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ComponentStatus. diff --git a/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml b/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml index 79471cd3..40c42040 100644 --- a/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml +++ b/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml @@ -4907,6 +4907,10 @@ spec: properties: component: type: string + conditions: + items: + type: string + type: array description: type: string status: diff --git a/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go b/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go index 2379b69c..75b8552a 100644 --- a/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go +++ b/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go @@ -7,8 +7,10 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + opsterv1 "opensearch.opster.io/api/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -70,11 +72,29 @@ var _ = Describe("DeployAndUpgrade", Ordered, func() { err := k8sClient.Get(context.Background(), client.ObjectKey{Name: name + "-masters", Namespace: namespace}, &sts) if err == nil { GinkgoWriter.Printf("%+v\n", sts.Status) + pods := &corev1.PodList{} + err := k8sClient.List(context.Background(), pods, client.InNamespace(namespace)) + if err == nil { + for _, pod := range pods.Items { + revision, ok := pod.Labels["controller-revision-hash"] + GinkgoWriter.Printf("Pod: %s\tPhase: %s", pod.Name, pod.Status.Phase) + if ok { + GinkgoWriter.Printf("\tRevision: %s\t Image: %s", revision, pod.Spec.Containers[0].Image) + } + GinkgoWriter.Println() + } + } else { + GinkgoWriter.Println(err) + } + cluster := &opsterv1.OpenSearchCluster{} + k8sClient.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, cluster) + GinkgoWriter.Printf("Cluster: %+v\n", cluster.Status) + return sts.Status.UpdatedReplicas } GinkgoWriter.Println(err) return 0 - }, time.Minute*15, time.Second*5).Should(Equal(int32(3))) + }, time.Minute*10, time.Second*5).Should(Equal(int32(3))) }) It("should upgrade the dashboard pod", func() { diff --git a/opensearch-operator/functionaltests/operatortests/main_test.go b/opensearch-operator/functionaltests/operatortests/main_test.go index 3f77263e..20ead9ec 100644 --- a/opensearch-operator/functionaltests/operatortests/main_test.go +++ b/opensearch-operator/functionaltests/operatortests/main_test.go @@ -5,7 +5,11 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" + opsterv1 "opensearch.opster.io/api/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -18,7 +22,12 @@ func TestAPIs(t *testing.T) { if err != nil { panic(err.Error()) } - k8sClient, err = client.New(config, client.Options{}) + scheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(opsterv1.AddToScheme(scheme)) + k8sClient, err = client.New(config, client.Options{ + Scheme: scheme, + }) if err != nil { panic(err.Error()) } diff --git a/opensearch-operator/opensearch-gateway/services/os_data_service.go b/opensearch-operator/opensearch-gateway/services/os_data_service.go index d2f6f96d..8e27a296 100644 --- a/opensearch-operator/opensearch-gateway/services/os_data_service.go +++ b/opensearch-operator/opensearch-gateway/services/os_data_service.go @@ -147,35 +147,35 @@ func createClusterSettingsAllocationEnable(enable ClusterSettingsAllocation) res }} } -func CheckClusterStatusForRestart(service *OsClusterClient, drainNodes bool) (bool, error) { +func CheckClusterStatusForRestart(service *OsClusterClient, drainNodes bool) (bool, string, error) { health, err := service.GetHealth() if err != nil { - return false, err + return false, "failed to fetch health", err } if health.Status == "green" { - return true, nil + return true, "", nil } if drainNodes { - return false, nil + return false, "cluster is not green and drain nodes is enabled", nil } flatSettings, err := service.GetFlatClusterSettings() if err != nil { - return false, err + return false, "could not fetch cluster settings", err } if flatSettings.Transient.ClusterRoutingAllocationEnable == string(ClusterSettingsAllocationAll) { - return false, nil + return false, "waiting for health to be green", nil } // Set shard routing to all if err := SetClusterShardAllocation(service, ClusterSettingsAllocationAll); err != nil { - return false, err + return false, "failed to set shard allocation", err } - return false, nil + return false, "enabled shard allocation", nil } func ReactivateShardAllocation(service *OsClusterClient) error { diff --git a/opensearch-operator/pkg/helpers/helpers.go b/opensearch-operator/pkg/helpers/helpers.go index 7eb2e8e2..117c3356 100644 --- a/opensearch-operator/pkg/helpers/helpers.go +++ b/opensearch-operator/pkg/helpers/helpers.go @@ -48,7 +48,7 @@ func GetField(v *appsv1.StatefulSetSpec, field string) interface{} { func RemoveIt(ss opsterv1.ComponentStatus, ssSlice []opsterv1.ComponentStatus) []opsterv1.ComponentStatus { for idx, v := range ssSlice { - if v == ss { + if ComponentStatusEqual(v, ss) { return append(ssSlice[0:idx], ssSlice[idx+1:]...) } } @@ -60,6 +60,10 @@ func Replace(remove opsterv1.ComponentStatus, add opsterv1.ComponentStatus, ssSl return fullSliced } +func ComponentStatusEqual(left opsterv1.ComponentStatus, right opsterv1.ComponentStatus) bool { + return left.Component == right.Component && left.Description == right.Description && left.Status == right.Status +} + func FindFirstPartial( arr []opsterv1.ComponentStatus, item opsterv1.ComponentStatus, @@ -402,26 +406,26 @@ func ReplicaHostName(currentSts appsv1.StatefulSet, repNum int32) string { return fmt.Sprintf("%s-%d", currentSts.ObjectMeta.Name, repNum) } -func WorkingPodForRollingRestart(ctx context.Context, k8sClient client.Client, sts *appsv1.StatefulSet) string { +func WorkingPodForRollingRestart(ctx context.Context, k8sClient client.Client, sts *appsv1.StatefulSet) (string, error) { // Handle the simple case if lo.FromPtrOr(sts.Spec.Replicas, 1) == sts.Status.UpdatedReplicas+sts.Status.CurrentReplicas { ordinal := lo.FromPtrOr(sts.Spec.Replicas, 1) - 1 - sts.Status.UpdatedReplicas - return ReplicaHostName(*sts, ordinal) + return ReplicaHostName(*sts, ordinal), nil } // If there are potentially mixed revisions we need to check each pod for i := lo.FromPtrOr(sts.Spec.Replicas, 1) - 1; i >= 0; i-- { podName := ReplicaHostName(*sts, i) pod := &corev1.Pod{} if err := k8sClient.Get(ctx, types.NamespacedName{Name: podName, Namespace: sts.Namespace}, pod); err != nil { - continue + return "", err } podRevision, ok := pod.Labels[stsRevisionLabel] if !ok { - continue + return "", fmt.Errorf("pod %s has no revision label", podName) } if podRevision != sts.Status.UpdateRevision { - return podName + return podName, nil } } - panic("bug: unable to calculate the working pod for rolling restart") + return "", errors.New("unable to calculate the working pod for rolling restart") } diff --git a/opensearch-operator/pkg/reconcilers/cluster.go b/opensearch-operator/pkg/reconcilers/cluster.go index 7c7296d3..d304454d 100644 --- a/opensearch-operator/pkg/reconcilers/cluster.go +++ b/opensearch-operator/pkg/reconcilers/cluster.go @@ -246,8 +246,7 @@ func (r *ClusterReconciler) reconcileNodeStatefulSet(nodePool opsterv1.NodePool, } // Detect cluster failure and initiate parallel recovery - // Don't do this if the cluster is upgrading - if !helpers.UpgradeInProgress(r.instance.Status) && helpers.ParallelRecoveryMode() && + if helpers.ParallelRecoveryMode() && (nodePool.Persistence == nil || nodePool.Persistence.PersistenceSource.PVC != nil) { // This logic only works if the STS uses PVCs // First check if the STS already has a readable status (CurrentRevision == "" indicates the STS is newly created and the controller has not yet updated the status properly) @@ -264,7 +263,8 @@ func (r *ClusterReconciler) reconcileNodeStatefulSet(nodePool opsterv1.NodePool, } else { // A failure is assumed if n PVCs exist but less than n-1 pods (one missing pod is allowed for rolling restart purposes) // We can assume the cluster is in a failure state and cannot recover on its own - if pvcCount >= int(nodePool.Replicas) && existing.Status.ReadyReplicas < nodePool.Replicas-1 { + if !helpers.UpgradeInProgress(r.instance.Status) && + pvcCount >= int(nodePool.Replicas) && existing.Status.ReadyReplicas < nodePool.Replicas-1 { r.logger.Info(fmt.Sprintf("Detected recovery situation for nodepool %s: PVC count: %d, replicas: %d. Recreating STS with parallel mode", nodePool.Component, pvcCount, existing.Status.Replicas)) if existing.Spec.PodManagementPolicy != appsv1.ParallelPodManagement { // Switch to Parallel to jumpstart the cluster diff --git a/opensearch-operator/pkg/reconcilers/rollingRestart.go b/opensearch-operator/pkg/reconcilers/rollingRestart.go index 2c7b95f6..0018c02a 100644 --- a/opensearch-operator/pkg/reconcilers/rollingRestart.go +++ b/opensearch-operator/pkg/reconcilers/rollingRestart.go @@ -111,6 +111,14 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) { return ctrl.Result{}, nil } + // Skip a rolling restart if the cluster hasn't finished initializing + if !r.instance.Status.Initialized { + return ctrl.Result{ + Requeue: true, + RequeueAfter: 10 * time.Second, + }, nil + } + if err := r.updateStatus(statusInProgress); err != nil { return ctrl.Result{Requeue: true}, err } @@ -155,7 +163,7 @@ func (r *RollingRestartReconciler) restartStatefulSetPod(sts *appsv1.StatefulSet lg.Info("only 2 data nodes and drain is set, some shards may not drain") } - ready, err := services.CheckClusterStatusForRestart(r.osClient, r.instance.Spec.General.DrainDataNodes) + ready, _, err := services.CheckClusterStatusForRestart(r.osClient, r.instance.Spec.General.DrainDataNodes) if err != nil { return ctrl.Result{}, err } @@ -166,7 +174,10 @@ func (r *RollingRestartReconciler) restartStatefulSetPod(sts *appsv1.StatefulSet }, nil } - workingPod := helpers.WorkingPodForRollingRestart(r.ctx, r.Client, sts) + workingPod, err := helpers.WorkingPodForRollingRestart(r.ctx, r.Client, sts) + if err != nil { + return ctrl.Result{}, err + } ready, err = services.PreparePodForDelete(r.osClient, workingPod, r.instance.Spec.General.DrainDataNodes, dataCount) if err != nil { diff --git a/opensearch-operator/pkg/reconcilers/upgrade.go b/opensearch-operator/pkg/reconcilers/upgrade.go index f8aac877..42038c46 100644 --- a/opensearch-operator/pkg/reconcilers/upgrade.go +++ b/opensearch-operator/pkg/reconcilers/upgrade.go @@ -68,6 +68,15 @@ func (r *UpgradeReconciler) Reconcile() (ctrl.Result, error) { if r.instance.Spec.General.Version == r.instance.Status.Version { return ctrl.Result{}, nil } + + // Skip an upgrade if the cluster hasn't finished initializing + if !r.instance.Status.Initialized { + return ctrl.Result{ + Requeue: true, + RequeueAfter: 10 * time.Second, + }, nil + } + annotations := map[string]string{"cluster-name": r.instance.GetName()} // If version validation fails log a warning and do nothing @@ -280,6 +289,7 @@ func (r *UpgradeReconciler) findNextPool(pools []opsterv1.NodePool) (opsterv1.No } func (r *UpgradeReconciler) doNodePoolUpgrade(pool opsterv1.NodePool) error { + var conditions []string // Fetch the STS stsName := builders.StsName(r.instance, &pool) sts := &appsv1.StatefulSet{} @@ -298,22 +308,30 @@ func (r *UpgradeReconciler) doNodePoolUpgrade(pool opsterv1.NodePool) error { if sts.Status.ReadyReplicas < lo.FromPtrOr(sts.Spec.Replicas, 1) { r.logger.Info("Waiting for all pods to be ready") + conditions = append(conditions, "Waiting for all pods to be ready") + r.setComponentConditions(conditions, pool.Component) return nil } - ready, err := services.CheckClusterStatusForRestart(r.osClient, r.instance.Spec.General.DrainDataNodes) + ready, condition, err := services.CheckClusterStatusForRestart(r.osClient, r.instance.Spec.General.DrainDataNodes) if err != nil { r.logger.Error(err, "Could not check opensearch cluster status") + conditions = append(conditions, "Could not check opensearch cluster status") + r.setComponentConditions(conditions, pool.Component) return err } if !ready { r.logger.Info("Cluster is not ready for next pod to restart") + conditions = append(conditions, condition) + r.setComponentConditions(conditions, pool.Component) return nil } + conditions = append(conditions, "preparing for pod delete") + // Work around for https://github.com/kubernetes/kubernetes/issues/73492 // If upgrade on this node pool is complete update status and return - if sts.Status.UpdatedReplicas == sts.Status.Replicas { + if sts.Status.UpdatedReplicas == lo.FromPtrOr(sts.Spec.Replicas, 1) { if err = services.ReactivateShardAllocation(r.osClient); err != nil { return err } @@ -339,13 +357,22 @@ func (r *UpgradeReconciler) doNodePoolUpgrade(pool opsterv1.NodePool) error { }) } - workingPod := helpers.WorkingPodForRollingRestart(r.ctx, r.Client, sts) + workingPod, err := helpers.WorkingPodForRollingRestart(r.ctx, r.Client, sts) + if err != nil { + conditions = append(conditions, "Could not find working pod") + r.setComponentConditions(conditions, pool.Component) + return err + } ready, err = services.PreparePodForDelete(r.osClient, workingPod, r.instance.Spec.General.DrainDataNodes, dataCount) if err != nil { + conditions = append(conditions, "Could not prepare pod for delete") + r.setComponentConditions(conditions, pool.Component) return err } if !ready { + conditions = append(conditions, "Waiting for node to drain") + r.setComponentConditions(conditions, pool.Component) return nil } @@ -356,9 +383,14 @@ func (r *UpgradeReconciler) doNodePoolUpgrade(pool opsterv1.NodePool) error { }, }) if err != nil { + conditions = append(conditions, "Could not delete pod") + r.setComponentConditions(conditions, pool.Component) return err } + conditions = append(conditions, fmt.Sprintf("Deleted pod %s", workingPod)) + r.setComponentConditions(conditions, pool.Component) + // If we are draining nodes remove the exclusion after the pod is deleted if r.instance.Spec.General.DrainDataNodes { _, err = services.RemoveExcludeNodeHost(r.osClient, workingPod) @@ -367,3 +399,33 @@ func (r *UpgradeReconciler) doNodePoolUpgrade(pool opsterv1.NodePool) error { return nil } + +func (r *UpgradeReconciler) setComponentConditions(conditions []string, component string) { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := r.Get(r.ctx, client.ObjectKeyFromObject(r.instance), r.instance); err != nil { + return err + } + currentStatus := opsterv1.ComponentStatus{ + Component: "Upgrader", + Status: "Upgrading", + Description: component, + } + componentStatus, found := helpers.FindFirstPartial(r.instance.Status.ComponentsStatus, currentStatus, helpers.GetByDescriptionAndGroup) + newStatus := opsterv1.ComponentStatus{ + Component: "Upgrader", + Status: "Upgrading", + Description: component, + Conditions: conditions, + } + if found { + conditions = append(componentStatus.Conditions, conditions...) + } + + r.instance.Status.ComponentsStatus = helpers.Replace(currentStatus, newStatus, r.instance.Status.ComponentsStatus) + + return r.Status().Update(r.ctx, r.instance) + }) + if err != nil { + r.logger.Error(err, "Could not update status") + } +} From ac93df0a29e137259c5e140c48747479de15cb20 Mon Sep 17 00:00:00 2001 From: Dan Bason Date: Thu, 27 Jul 2023 09:12:28 +1200 Subject: [PATCH 09/10] Update watermark settings Signed-off-by: Dan Bason --- .../functionaltests/operatortests/deploy-and-upgrade.yaml | 4 ++++ .../functionaltests/operatortests/deploy_and_upgrade_test.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/opensearch-operator/functionaltests/operatortests/deploy-and-upgrade.yaml b/opensearch-operator/functionaltests/operatortests/deploy-and-upgrade.yaml index f1c111ea..7d1c64cc 100644 --- a/opensearch-operator/functionaltests/operatortests/deploy-and-upgrade.yaml +++ b/opensearch-operator/functionaltests/operatortests/deploy-and-upgrade.yaml @@ -9,6 +9,10 @@ spec: httpPort: 9200 vendor: opensearch serviceName: deploy-and-upgrade + additionalConfig: + cluster.routing.allocation.disk.watermark.low: 500m + cluster.routing.allocation.disk.watermark.high: 300m + cluster.routing.allocation.disk.watermark.flood_stage: 100m confMgmt: smartScaler: true dashboards: diff --git a/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go b/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go index 75b8552a..33da6fac 100644 --- a/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go +++ b/opensearch-operator/functionaltests/operatortests/deploy_and_upgrade_test.go @@ -94,7 +94,7 @@ var _ = Describe("DeployAndUpgrade", Ordered, func() { } GinkgoWriter.Println(err) return 0 - }, time.Minute*10, time.Second*5).Should(Equal(int32(3))) + }, time.Minute*15, time.Second*5).Should(Equal(int32(3))) }) It("should upgrade the dashboard pod", func() { From 4e793b35f3c8420334aa2c25f1d4dfdefbe6dcee Mon Sep 17 00:00:00 2001 From: Dan Bason Date: Fri, 28 Jul 2023 09:24:55 +1200 Subject: [PATCH 10/10] Tidy up comments and variables Signed-off-by: Dan Bason --- .github/workflows/functional-tests.yaml | 2 -- opensearch-operator/functionaltests/execute_tests.sh | 6 ------ opensearch-operator/pkg/helpers/helpers.go | 7 ++++--- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/.github/workflows/functional-tests.yaml b/.github/workflows/functional-tests.yaml index a5657bf1..4f6b17f9 100644 --- a/.github/workflows/functional-tests.yaml +++ b/.github/workflows/functional-tests.yaml @@ -31,8 +31,6 @@ jobs: ## Build controller docker image make docker-build - ## Pre-pull opensearch images - ## Import controller docker image k3d image import -c $CLUSTER_NAME controller:latest diff --git a/opensearch-operator/functionaltests/execute_tests.sh b/opensearch-operator/functionaltests/execute_tests.sh index b0a708b2..c6c00b09 100755 --- a/opensearch-operator/functionaltests/execute_tests.sh +++ b/opensearch-operator/functionaltests/execute_tests.sh @@ -6,18 +6,12 @@ k3d cluster create $CLUSTER_NAME --agents 2 --kubeconfig-switch-context=false -- k3d kubeconfig get $CLUSTER_NAME > kubeconfig export KUBECONFIG=$(pwd)/kubeconfig -## Pre-pull opensearch images -docker pull opensearchproject/opensearch:1.3.0 -docker pull opensearchproject/opensearch:2.3.0 - ## Build controller docker image cd .. make docker-build ## Import controller docker image k3d image import -c $CLUSTER_NAME controller:latest -k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:1.3.0 -k3d image import -c $CLUSTER_NAME opensearchproject/opensearch:2.3.0 ## Install helm chart helm install opensearch-operator ../charts/opensearch-operator --set manager.image.repository=controller --set manager.image.tag=latest --set manager.image.pullPolicy=IfNotPresent --namespace default --wait diff --git a/opensearch-operator/pkg/helpers/helpers.go b/opensearch-operator/pkg/helpers/helpers.go index 117c3356..e129f0c0 100644 --- a/opensearch-operator/pkg/helpers/helpers.go +++ b/opensearch-operator/pkg/helpers/helpers.go @@ -407,13 +407,14 @@ func ReplicaHostName(currentSts appsv1.StatefulSet, repNum int32) string { } func WorkingPodForRollingRestart(ctx context.Context, k8sClient client.Client, sts *appsv1.StatefulSet) (string, error) { + replicas := lo.FromPtrOr(sts.Spec.Replicas, 1) // Handle the simple case - if lo.FromPtrOr(sts.Spec.Replicas, 1) == sts.Status.UpdatedReplicas+sts.Status.CurrentReplicas { - ordinal := lo.FromPtrOr(sts.Spec.Replicas, 1) - 1 - sts.Status.UpdatedReplicas + if replicas == sts.Status.UpdatedReplicas+sts.Status.CurrentReplicas { + ordinal := replicas - 1 - sts.Status.UpdatedReplicas return ReplicaHostName(*sts, ordinal), nil } // If there are potentially mixed revisions we need to check each pod - for i := lo.FromPtrOr(sts.Spec.Replicas, 1) - 1; i >= 0; i-- { + for i := replicas - 1; i >= 0; i-- { podName := ReplicaHostName(*sts, i) pod := &corev1.Pod{} if err := k8sClient.Get(ctx, types.NamespacedName{Name: podName, Namespace: sts.Namespace}, pod); err != nil {