From 56c9c8f16e79647e0a9b627f188f8486854a086e Mon Sep 17 00:00:00 2001 From: Sebastian Woehrl Date: Thu, 13 Jun 2024 08:54:33 +0200 Subject: [PATCH 1/7] Disable http client connection reuse to prevent memory leak (#842) ### Description The operator pod is suffering from memory leaks. After some analysis I think I have narrowed it down to connections for the http client being kept for reuse but never being used due to a new client being created in every reconcile run. This PR disables the connection keepalive/reuse and (at least in my experiments) prevents the memory leak. ### Issues Resolved Fixes #700 ### Check List - [x] Commits are signed per the DCO using --signoff - [-] Unittest added for the new/changed functionality and all unit tests are successful - [-] Customer-visible features documented - [x] No linter warnings (`make lint`) If CRDs are changed: - [-] CRD YAMLs updated (`make manifests`) and also copied into the helm chart - [-] Changes to CRDs documented By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin). Signed-off-by: Sebastian Woehrl --- .../api/v1/zz_generated.deepcopy.go | 22 ++++++++++++- .../opensearch-gateway/services/os_client.go | 4 +++ opensearch-operator/pkg/reconcilers/scaler.go | 33 +++++++------------ 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/opensearch-operator/api/v1/zz_generated.deepcopy.go b/opensearch-operator/api/v1/zz_generated.deepcopy.go index da195040..055616a0 100644 --- a/opensearch-operator/api/v1/zz_generated.deepcopy.go +++ b/opensearch-operator/api/v1/zz_generated.deepcopy.go @@ -402,7 +402,7 @@ func (in *Condition) DeepCopyInto(out *Condition) { if in.Cron != nil { in, out := &in.Cron, &out.Cron *out = new(Cron) - **out = **in + (*in).DeepCopyInto(*out) } if in.MinDocCount != nil { in, out := &in.MinDocCount, &out.MinDocCount @@ -454,6 +454,11 @@ func (in *ConfMgmt) DeepCopy() *ConfMgmt { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Cron) DeepCopyInto(out *Cron) { *out = *in + if in.CronDetails != nil { + in, out := &in.CronDetails, &out.CronDetails + *out = new(CronDetails) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Cron. @@ -466,6 +471,21 @@ func (in *Cron) DeepCopy() *Cron { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CronDetails) DeepCopyInto(out *CronDetails) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CronDetails. +func (in *CronDetails) DeepCopy() *CronDetails { + if in == nil { + return nil + } + out := new(CronDetails) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DashboardsConfig) DeepCopyInto(out *DashboardsConfig) { *out = *in diff --git a/opensearch-operator/opensearch-gateway/services/os_client.go b/opensearch-operator/opensearch-gateway/services/os_client.go index b12729f9..081c6fa2 100644 --- a/opensearch-operator/opensearch-gateway/services/os_client.go +++ b/opensearch-operator/opensearch-gateway/services/os_client.go @@ -73,6 +73,10 @@ func NewOsClusterClient(clusterUrl string, username string, password string, opt } return &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + // These options are needed as otherwise connections would be kept and leak memory + // Connection reuse is not really possible due to each reconcile run being independent + DisableKeepAlives: true, + MaxIdleConns: 1, } }(), Addresses: []string{clusterUrl}, diff --git a/opensearch-operator/pkg/reconcilers/scaler.go b/opensearch-operator/pkg/reconcilers/scaler.go index 297e53d0..3223a80a 100644 --- a/opensearch-operator/pkg/reconcilers/scaler.go +++ b/opensearch-operator/pkg/reconcilers/scaler.go @@ -10,6 +10,7 @@ import ( "github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/builders" "github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/helpers" "github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/k8s" + "github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/util" "github.com/cisco-open/operator-tools/pkg/reconciler" appsv1 "k8s.io/api/apps/v1" "k8s.io/client-go/tools/record" @@ -25,6 +26,7 @@ type ScalerReconciler struct { recorder record.EventRecorder reconcilerContext *ReconcilerContext instance *opsterv1.OpenSearchCluster + ReconcilerOptions } func NewScalerReconciler( @@ -33,14 +35,17 @@ func NewScalerReconciler( recorder record.EventRecorder, reconcilerContext *ReconcilerContext, instance *opsterv1.OpenSearchCluster, - opts ...reconciler.ResourceReconcilerOption, + opts ...ReconcilerOption, ) *ScalerReconciler { + options := ReconcilerOptions{} + options.apply(opts...) return &ScalerReconciler{ - client: k8s.NewK8sClient(client, ctx, append(opts, reconciler.WithLog(log.FromContext(ctx).WithValues("reconciler", "scaler")))...), + client: k8s.NewK8sClient(client, ctx, reconciler.WithLog(log.FromContext(ctx).WithValues("reconciler", "scaler"))), ctx: ctx, recorder: recorder, reconcilerContext: reconcilerContext, instance: instance, + ReconcilerOptions: options, } } @@ -187,11 +192,7 @@ func (r *ScalerReconciler) decreaseOneNode(currentStatus opsterv1.ComponentStatu if !smartDecrease { return false, err } - username, password, err := helpers.UsernameAndPassword(r.client, r.instance) - if err != nil { - return true, err - } - clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password) + clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport) if err != nil { lg.Error(err, "failed to create os client") r.recorder.AnnotatedEventf(r.instance, annotations, "WARN", "failed to remove node exclude", "Group-%s . failed to remove node exclude %s", nodePoolGroupName, lastReplicaNodeName) @@ -209,13 +210,9 @@ func (r *ScalerReconciler) decreaseOneNode(currentStatus opsterv1.ComponentStatu func (r *ScalerReconciler) excludeNode(currentStatus opsterv1.ComponentStatus, currentSts appsv1.StatefulSet, nodePoolGroupName string) error { lg := log.FromContext(r.ctx) - username, password, err := helpers.UsernameAndPassword(r.client, r.instance) annotations := map[string]string{"cluster-name": r.instance.GetName()} - if err != nil { - return err - } - clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password) + clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport) if err != nil { lg.Error(err, "failed to create os client") r.recorder.AnnotatedEventf(r.instance, annotations, "Warning", "Scaler", "Failed to create os client for scaling") @@ -272,12 +269,8 @@ func (r *ScalerReconciler) drainNode(currentStatus opsterv1.ComponentStatus, cur lg := log.FromContext(r.ctx) annotations := map[string]string{"cluster-name": r.instance.GetName()} lastReplicaNodeName := helpers.ReplicaHostName(currentSts, *currentSts.Spec.Replicas-1) - username, password, err := helpers.UsernameAndPassword(r.client, r.instance) - if err != nil { - return err - } - clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password) + clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport) if err != nil { return err } @@ -328,12 +321,8 @@ func (r *ScalerReconciler) removeStatefulSet(sts appsv1.StatefulSet) (*ctrl.Resu } // Gracefully remove nodes - username, password, err := helpers.UsernameAndPassword(r.client, r.instance) - if err != nil { - return nil, err - } annotations := map[string]string{"cluster-name": r.instance.GetName()} - clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password) + clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport) if err != nil { lg.Error(err, "failed to create os client") r.recorder.AnnotatedEventf(r.instance, annotations, "Warning", "Scaler", "Failed to create os client") From ce25fccd734cddcf8b1441632cc80caac329057e Mon Sep 17 00:00:00 2001 From: Casper Thygesen Date: Fri, 2 Aug 2024 12:50:05 +0200 Subject: [PATCH 2/7] Missing securityContext attributes on container kube-rbac-proxy (#848) ### Description Allows the following setup ```yaml kubeRbacProxy: securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: true capabilities: drop: - all ``` ### Issues Resolved Fixes [#745](https://github.com/opensearch-project/opensearch-k8s-operator/issues/745) ### Check List - [x] Commits are signed per the DCO using --signoff - [ ] Unittest added for the new/changed functionality and all unit tests are successful - [ ] Customer-visible features documented - [ ] No linter warnings (`make lint`) If CRDs are changed: - [ ] CRD YAMLs updated (`make manifests`) and also copied into the helm chart - [ ] Changes to CRDs documented Please refer to the [PR guidelines](https://github.com/opensearch-project/opensearch-k8s-operator/blob/main/docs/developing.md#submitting-a-pr) before submitting this pull request. By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin). Signed-off-by: Casper Thygesen --- charts/opensearch-operator/values.yaml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/charts/opensearch-operator/values.yaml b/charts/opensearch-operator/values.yaml index 96bdc268..0629f261 100644 --- a/charts/opensearch-operator/values.yaml +++ b/charts/opensearch-operator/values.yaml @@ -74,7 +74,11 @@ serviceAccount: kubeRbacProxy: enable: true securityContext: - # allowPrivilegeEscalation: false + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + capabilities: + drop: + - ALL resources: limits: cpu: 50m From 231075ce87a7ccac999c66083b071f4ec6094d94 Mon Sep 17 00:00:00 2001 From: Alex <49783005+OlegVanHorst@users.noreply.github.com> Date: Wed, 7 Aug 2024 14:05:38 +0200 Subject: [PATCH 3/7] Fix rollingRestart reconcile for multiple same named clusters (#836) ### Description Fixes a bug where reconcile breaks, if there are multiple same named OpensearchClusters in different namespaces. The rollingRestart reconciler selects pods in all namespaces when evaluating if the Pods are healthy and matches them with the desired replicas. Only pods in the own namespace should be checked. By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin). --------- Signed-off-by: Alex Engel --- opensearch-operator/pkg/helpers/helpers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opensearch-operator/pkg/helpers/helpers.go b/opensearch-operator/pkg/helpers/helpers.go index aa6de1d5..82672731 100644 --- a/opensearch-operator/pkg/helpers/helpers.go +++ b/opensearch-operator/pkg/helpers/helpers.go @@ -247,7 +247,7 @@ func CountRunningPodsForNodePool(k8sClient k8s.K8sClient, cr *opsterv1.OpenSearc selector := labels.NewSelector() selector = selector.Add(*clusterReq, *componentReq) // List pods matching selector - list, err := k8sClient.ListPods(&client.ListOptions{LabelSelector: selector}) + list, err := k8sClient.ListPods(&client.ListOptions{Namespace: cr.Namespace, LabelSelector: selector}) if err != nil { return 0, err } @@ -281,7 +281,7 @@ func CountPVCsForNodePool(k8sClient k8s.K8sClient, cr *opsterv1.OpenSearchCluste } selector := labels.NewSelector() selector = selector.Add(*clusterReq, *componentReq) - list, err := k8sClient.ListPVCs(&client.ListOptions{LabelSelector: selector}) + list, err := k8sClient.ListPVCs(&client.ListOptions{Namespace: cr.Namespace, LabelSelector: selector}) if err != nil { return 0, err } From 610222aed69e483c14e9cf9e93205547079df35f Mon Sep 17 00:00:00 2001 From: Guilherme Oki Date: Tue, 13 Aug 2024 10:21:21 -0300 Subject: [PATCH 4/7] feat(helm): add option to set resources for initHelper (#865) ### Description This PR adds an option to define resources for initHelper container. ### Issues Resolved Closes this issue https://github.com/opensearch-project/opensearch-k8s-operator/issues/866 ### Check List - [X] Commits are signed per the DCO using --signoff - [X] Unittest added for the new/changed functionality and all unit tests are successful - [X] Customer-visible features documented - [X] No linter warnings (`make lint`) By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin). --------- Signed-off-by: Guilherme Oki --- .../templates/opensearch-cluster-cr.yaml | 4 ++++ charts/opensearch-cluster/values.yaml | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/charts/opensearch-cluster/templates/opensearch-cluster-cr.yaml b/charts/opensearch-cluster/templates/opensearch-cluster-cr.yaml index e08132ca..79f3b70b 100644 --- a/charts/opensearch-cluster/templates/opensearch-cluster-cr.yaml +++ b/charts/opensearch-cluster/templates/opensearch-cluster-cr.yaml @@ -27,6 +27,10 @@ spec: imagePullSecrets: {{ toYaml .Values.opensearchCluster.initHelper.imagePullSecrets | nindent 6 }} {{- end }} + {{- if .Values.opensearchCluster.initHelper.resources }} + resources: + {{- toYaml .Values.opensearchCluster.initHelper.resources | nindent 6 }} + {{- end }} {{- end }} general: {{- if .Values.opensearchCluster.general.version }} diff --git a/charts/opensearch-cluster/values.yaml b/charts/opensearch-cluster/values.yaml index c0cc622f..2841ea81 100644 --- a/charts/opensearch-cluster/values.yaml +++ b/charts/opensearch-cluster/values.yaml @@ -27,6 +27,17 @@ opensearchCluster: limits: memory: "1Gi" cpu: "500m" + initHelper: + imagePullSecrets: [] + # - registryKeySecretName + imagePullPolicy: IfNotPresent + resources: {} + # requests: + # memory: "1Gi" + # cpu: "500m" + # limits: + # memory: "1Gi" + # cpu: "500m" nodePools: - component: masters diskSize: "30Gi" From ec6428d4310da13087ac0546885c7a31e29a1356 Mon Sep 17 00:00:00 2001 From: rkthtrifork <131661717+rkthtrifork@users.noreply.github.com> Date: Thu, 22 Aug 2024 15:18:17 +0200 Subject: [PATCH 5/7] Rewrote ISM Policy reconciler (#846) ### Description The ISM Policy reconciler was constantly trying to update the ISM Policy and it was not handling reconciliation requeue in some cases. There were possibly other issues as well. Below I have described what caused the different issues I encountered - The ISM Policy request was different from the response, but they were both made with the same struct. This caused the reconciler to always see the existing ISM Policy and the ISM Policy from the CR as different and try to update it. I have created a separate struct model for each to separate the logic and in the code I now compare the existing policy with the policy from the CR by comparing both the Policy IDs and the policy spec - There were some very complex cases in the code that were very difficult to understand so I have attempted to make the code more concise and easy to read and understand - I have added reconciliation requeuing to all cases so the operator doesn't just stop reconciling the ISM Policy in some cases One thing I am wondering is that I am not sure why we would want to create a CR without specifying the cluster ID and then the operator automatically links it to that cluster ID so it breaks if the OpenSearch CR is deleted. Is this intended and why? I'm talking about the section with the comment "Check cluster ref has not changed" Tested cases: - A new ISM Policy is created through a CR and the operator creates it in the OpenSearch Cluster - The CR for an ISM Policy that is created by the operator is removed and the operator removes it in the OpenSearch Cluster - An ISM Policy that already exists in the OpenSearch Cluster is created through a CR and the operator ignores it and marks it as existing - The CR for an ISM Policy that was pre-existing and therefore was not created by the operator is removed and the operator does not remove the ISM Policy from the OpenSearch Cluster - An ISM Policy that already exists in the OpenSearch Cluster is created through a CR and the operator ignores it and marks it as existing. The ISM Policy is then manually removed from the OpenSearch Cluster and the operator now applies the ISM Policy from the CR The test for ISM Policies is currently failing miserably, but I decided to create the PR to get feedback before I dive into fixing it. ### Issues Resolved https://github.com/opensearch-project/opensearch-k8s-operator/issues/833 https://github.com/opensearch-project/opensearch-k8s-operator/issues/732 Possibly other issues ### Check List - [x] Commits are signed per the DCO using --signoff - [x] Unittest added for the new/changed functionality and all unit tests are successful - [x] Customer-visible features documented - [x] No linter warnings (`make lint`) If CRDs are changed: - [ ] CRD YAMLs updated (`make manifests`) and also copied into the helm chart - [ ] Changes to CRDs documented Please refer to the [PR guidelines](https://github.com/opensearch-project/opensearch-k8s-operator/blob/main/docs/developing.md#submitting-a-pr) before submitting this pull request. By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin). Signed-off-by: rkthtrifork --- .../opensearch-gateway/requests/IsmPolicy.go | 9 +- .../responses/ISMPolicyResponse.go | 5 - .../opensearch-gateway/responses/IsmPolicy.go | 10 + .../services/os_ism_service.go | 33 +- .../pkg/reconcilers/ismpolicy.go | 285 +++++++------ .../pkg/reconcilers/ismpolicy_test.go | 398 +++++++++--------- .../pkg/reconcilers/reconcilers.go | 16 +- 7 files changed, 383 insertions(+), 373 deletions(-) delete mode 100644 opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go create mode 100644 opensearch-operator/opensearch-gateway/responses/IsmPolicy.go diff --git a/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go b/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go index 2f820a0a..023fc79e 100644 --- a/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go +++ b/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go @@ -1,14 +1,11 @@ package requests -type Policy struct { - PolicyID string `json:"_id,omitempty"` - PrimaryTerm *int `json:"_primary_term,omitempty"` - SequenceNumber *int `json:"_seq_no,omitempty"` - Policy ISMPolicy `json:"policy"` +type ISMPolicy struct { + Policy ISMPolicySpec `json:"policy"` } // ISMPolicySpec is the specification for the ISM policy for OS. -type ISMPolicy struct { +type ISMPolicySpec struct { // The default starting state for each index that uses this policy. DefaultState string `json:"default_state"` // A human-readable description of the policy. diff --git a/opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go b/opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go deleted file mode 100644 index 753314cc..00000000 --- a/opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go +++ /dev/null @@ -1,5 +0,0 @@ -package responses - -import "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests" - -type GetISMPoliciesResponse requests.Policy diff --git a/opensearch-operator/opensearch-gateway/responses/IsmPolicy.go b/opensearch-operator/opensearch-gateway/responses/IsmPolicy.go new file mode 100644 index 00000000..29174466 --- /dev/null +++ b/opensearch-operator/opensearch-gateway/responses/IsmPolicy.go @@ -0,0 +1,10 @@ +package responses + +import "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests" + +type GetISMPolicyResponse struct { + PolicyID string `json:"_id"` + PrimaryTerm int `json:"_primary_term"` + SequenceNumber int `json:"_seq_no"` + Policy requests.ISMPolicySpec +} diff --git a/opensearch-operator/opensearch-gateway/services/os_ism_service.go b/opensearch-operator/opensearch-gateway/services/os_ism_service.go index d82c0e90..040e2e26 100644 --- a/opensearch-operator/opensearch-gateway/services/os_ism_service.go +++ b/opensearch-operator/opensearch-gateway/services/os_ism_service.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests" + "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/responses" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/opensearch-project/opensearch-go/opensearchutil" @@ -16,7 +17,7 @@ import ( var ErrNotFound = errors.New("policy not found") // ShouldUpdateISMPolicy checks if the passed policy is same as existing or needs update -func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.Policy) (bool, error) { +func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.ISMPolicy) (bool, error) { if cmp.Equal(newPolicy, existingPolicy, cmpopts.EquateEmpty()) { return false, nil } @@ -27,23 +28,8 @@ func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy reques return true, nil } -// PolicyExists checks if the passed policy already exists or not -func PolicyExists(ctx context.Context, service *OsClusterClient, policyName string) (bool, error) { - resp, err := service.GetISMConfig(ctx, policyName) - if err != nil { - return false, err - } - defer resp.Body.Close() - if resp.StatusCode == 404 { - return false, nil - } else if resp.IsError() { - return false, fmt.Errorf("response from API is %s", resp.Status()) - } - return true, nil -} - // GetPolicy fetches the passed policy -func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*requests.Policy, error) { +func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*responses.GetISMPolicyResponse, error) { resp, err := service.GetISMConfig(ctx, policyName) if err != nil { return nil, err @@ -51,10 +37,11 @@ func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) defer resp.Body.Close() if resp.StatusCode == 404 { return nil, ErrNotFound - } else if resp.IsError() { + } + if resp.IsError() { return nil, fmt.Errorf("response from API is %s", resp.Status()) } - ismResponse := requests.Policy{} + ismResponse := responses.GetISMPolicyResponse{} if resp != nil && resp.Body != nil { err := json.NewDecoder(resp.Body).Decode(&ismResponse) if err != nil { @@ -66,7 +53,7 @@ func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) } // CreateISMPolicy creates the passed policy -func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, policyId string) error { +func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, policyId string) error { spec := opensearchutil.NewJSONReader(ismpolicy) resp, err := service.PutISMConfig(ctx, policyId, spec) if err != nil { @@ -80,15 +67,15 @@ func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy re } // UpdateISMPolicy updates the given policy -func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, seqno, primterm *int, policyName string) error { +func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, seqno, primterm *int, policyId string) error { spec := opensearchutil.NewJSONReader(ismpolicy) - resp, err := service.UpdateISMConfig(ctx, policyName, *seqno, *primterm, spec) + resp, err := service.UpdateISMConfig(ctx, policyId, *seqno, *primterm, spec) if err != nil { return err } defer resp.Body.Close() if resp.IsError() { - return fmt.Errorf("failed to create ism policy: %s", resp.String()) + return fmt.Errorf("failed to update ism policy: %s", resp.String()) } return nil } diff --git a/opensearch-operator/pkg/reconcilers/ismpolicy.go b/opensearch-operator/pkg/reconcilers/ismpolicy.go index f2954d45..8b757422 100644 --- a/opensearch-operator/pkg/reconcilers/ismpolicy.go +++ b/opensearch-operator/pkg/reconcilers/ismpolicy.go @@ -13,6 +13,8 @@ import ( "github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/util" "github.com/cisco-open/operator-tools/pkg/reconciler" "github.com/go-logr/logr" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" @@ -22,7 +24,10 @@ import ( ) const ( - ismPolicyExists = "ism policy already exists in Opensearch" + opensearchIsmPolicyExists = "ISM Policy already exists in Opensearch" + opensearchIsmPolicyNameMismatch = "OpensearchISMPolicyNameMismatch" + opensearchClusterRequeueAfter = 10 * time.Second + defaultRequeueAfter = 30 * time.Second ) type IsmPolicyReconciler struct { @@ -58,6 +63,7 @@ func NewIsmReconciler( func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error) { var reason string var policyId string + defer func() { if !pointer.BoolDeref(r.updateStatus, true) { return @@ -71,24 +77,23 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error) instance.Status.State = opsterv1.OpensearchISMPolicyError } // Requeue after is 10 seconds if waiting for OpenSearch cluster - if retResult.Requeue && retResult.RequeueAfter == 10*time.Second { + if retResult.Requeue && retResult.RequeueAfter == opensearchClusterRequeueAfter { instance.Status.State = opsterv1.OpensearchISMPolicyPending } - // Requeue is after 30 seconds for normal reconciliation after creation/update - if retErr == nil && retResult.RequeueAfter == 30*time.Second { + if retErr == nil && retResult.Requeue { instance.Status.State = opsterv1.OpensearchISMPolicyCreated instance.Status.PolicyId = policyId } - if reason == ismPolicyExists { + if reason == opensearchIsmPolicyExists { instance.Status.State = opsterv1.OpensearchISMPolicyIgnored } }) + if err != nil { r.logger.Error(err, "failed to update status") } }() - var err error r.cluster, retErr = util.FetchOpensearchCluster(r.client, r.ctx, types.NamespacedName{ Name: r.instance.Spec.OpensearchRef.Name, Namespace: r.instance.Namespace, @@ -97,167 +102,184 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error) reason = "error fetching opensearch cluster" r.logger.Error(retErr, "failed to fetch opensearch cluster") r.recorder.Event(r.instance, "Warning", opensearchError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: opensearchClusterRequeueAfter, + }, retErr } if r.cluster == nil { r.logger.Info("opensearch cluster does not exist, requeueing") reason = "waiting for opensearch cluster to exist" r.recorder.Event(r.instance, "Normal", opensearchPending, reason) - retResult = ctrl.Result{ + return ctrl.Result{ Requeue: true, - RequeueAfter: 10 * time.Second, - } - return + RequeueAfter: opensearchClusterRequeueAfter, + }, nil } + // Check cluster ref has not changed - if r.instance.Status.ManagedCluster != nil { - if *r.instance.Status.ManagedCluster != r.cluster.UID { - reason = "cannot change the cluster a role refers to" - retErr = fmt.Errorf("%s", reason) - r.recorder.Event(r.instance, "Warning", opensearchRefMismatch, reason) - return - } - } else { - if pointer.BoolDeref(r.updateStatus, true) { - retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { - instance := object.(*opsterv1.OpenSearchISMPolicy) - instance.Status.ManagedCluster = &r.cluster.UID - }) - if retErr != nil { - reason = fmt.Sprintf("failed to update status: %s", retErr) - r.recorder.Event(r.instance, "Warning", statusError, reason) - return - } + managedCluster := r.instance.Status.ManagedCluster + if managedCluster != nil && *managedCluster != r.cluster.UID { + reason = "cannot change the cluster a resource refers to" + retErr = fmt.Errorf("%s", reason) + r.recorder.Event(r.instance, "Warning", opensearchRefMismatch, reason) + return ctrl.Result{ + Requeue: false, + }, retErr + } + + if pointer.BoolDeref(r.updateStatus, true) { + retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { + object.(*opsterv1.OpenSearchISMPolicy).Status.ManagedCluster = &r.cluster.UID + }) + if retErr != nil { + reason = fmt.Sprintf("failed to update status: %s", retErr) + r.recorder.Event(r.instance, "Warning", statusError, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: opensearchClusterRequeueAfter, + }, retErr } } + // Check cluster is ready if r.cluster.Status.Phase != opsterv1.PhaseRunning { r.logger.Info("opensearch cluster is not running, requeueing") reason = "waiting for opensearch cluster status to be running" r.recorder.Event(r.instance, "Normal", opensearchPending, reason) - retResult = ctrl.Result{ + return ctrl.Result{ Requeue: true, - RequeueAfter: 10 * time.Second, - } - return + RequeueAfter: opensearchClusterRequeueAfter, + }, nil } - r.osClient, err = util.CreateClientForCluster(r.client, r.ctx, r.cluster, r.osClientTransport) - if err != nil { - reason := "error creating opensearch client" + r.osClient, retErr = util.CreateClientForCluster(r.client, r.ctx, r.cluster, r.osClientTransport) + if retErr != nil { + reason = "error creating opensearch client" r.recorder.Event(r.instance, "Warning", opensearchError, reason) - retResult = ctrl.Result{ + return ctrl.Result{ Requeue: true, - RequeueAfter: 30 * time.Second, - } - retErr = err - return - } - - // If PolicyID not provided explicitly, use metadata.name by default - policyId = r.instance.Spec.PolicyID - if r.instance.Spec.PolicyID == "" { - policyId = r.instance.Name - } - // Check ism policy state to make sure we don't touch preexisting ism policy - if r.instance.Status.ExistingISMPolicy == nil { - var exists bool - exists, retErr = services.PolicyExists(r.ctx, r.osClient, policyId) - if retErr != nil { - reason = "failed to get policy status from Opensearch API" - r.logger.Error(retErr, reason) - r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return - } - if pointer.BoolDeref(r.updateStatus, true) { - retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { - instance := object.(*opsterv1.OpenSearchISMPolicy) - instance.Status.ExistingISMPolicy = &exists - }) - if retErr != nil { - reason = fmt.Sprintf("failed to update status: %s", retErr) - r.recorder.Event(r.instance, "Warning", statusError, reason) - return - } - } else { - // Emit an event for unit testing assertion - r.recorder.Event(r.instance, "Normal", "UnitTest", fmt.Sprintf("exists is %t", exists)) - return - } + RequeueAfter: opensearchClusterRequeueAfter, + }, retErr } - // If ism policy is existing do nothing - if *r.instance.Status.ExistingISMPolicy { - reason = ismPolicyExists - return + // If PolicyID is not provided explicitly, use metadata.name by default + policyId = r.instance.Name + if r.instance.Spec.PolicyID != "" { + policyId = r.instance.Spec.PolicyID } - ismpolicy, retErr := r.CreateISMPolicyRequest() + newPolicy, retErr := r.CreateISMPolicy() if retErr != nil { - reason = "failed to get create the ism policy request" r.logger.Error(retErr, reason) - r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } existingPolicy, retErr := services.GetPolicy(r.ctx, r.osClient, policyId) - if retErr != nil && retErr != services.ErrNotFound { - reason = "failed to get policy from Opensearch API" - r.logger.Error(retErr, reason) - r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return - } + // If not exists, create if errors.Is(retErr, services.ErrNotFound) { - r.logger.V(1).Info(fmt.Sprintf("policy %s not found, creating.", r.instance.Spec.PolicyID)) - retErr = services.CreateISMPolicy(r.ctx, r.osClient, *ismpolicy, policyId) + request := requests.ISMPolicy{ + Policy: *newPolicy, + } + retErr = services.CreateISMPolicy(r.ctx, r.osClient, request, policyId) if retErr != nil { reason = "failed to create ism policy" r.logger.Error(retErr, reason) r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } - r.recorder.Event(r.instance, "Normal", opensearchAPIUpdated, "policy created in opensearch") - return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, retErr + // Mark the ISM Policy as not pre-existing (created by the operator) + retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { + object.(*opsterv1.OpenSearchISMPolicy).Status.ExistingISMPolicy = pointer.Bool(false) + }) + if retErr != nil { + reason = "failed to update custom resource object" + r.logger.Error(retErr, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr + } + + r.recorder.Event(r.instance, "Normal", opensearchAPIUpdated, "policy successfully created in OpenSearch Cluster") + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil } - priterm := existingPolicy.PrimaryTerm - seqno := existingPolicy.SequenceNumber - // Reset - existingPolicy.PrimaryTerm = nil - existingPolicy.SequenceNumber = nil - shouldUpdate, retErr := services.ShouldUpdateISMPolicy(r.ctx, *ismpolicy, *existingPolicy) + + // If other error, report if retErr != nil { - reason = "failed to compare the policies" + reason = "failed to get the ism policy from Opensearch API" r.logger.Error(retErr, reason) r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } - if !shouldUpdate { - r.logger.V(1).Info(fmt.Sprintf("policy %s is in sync", r.instance.Spec.PolicyID)) - return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, retErr + // If the ISM policy exists in OpenSearch cluster and was not created by the operator, update the status and return + if r.instance.Status.ExistingISMPolicy == nil || *r.instance.Status.ExistingISMPolicy { + retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { + object.(*opsterv1.OpenSearchISMPolicy).Status.ExistingISMPolicy = pointer.Bool(true) + }) + if retErr != nil { + reason = "failed to update custom resource object" + r.logger.Error(retErr, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr + } + reason = "the ISM policy already exists in the OpenSearch cluster" + r.logger.Error(errors.New(opensearchIsmPolicyExists), reason) + r.recorder.Event(r.instance, "Warning", opensearchIsmPolicyExists, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil } - // the policyId is immutable, so check the old name (r.instance.Status.PolicyId) against the new - if r.instance.Status.PolicyId != "" && policyId != r.instance.Status.PolicyId { - reason = "can't change PolicyID" - r.recorder.Event(r.instance, "Warning", opensearchError, reason) - return + // Return if there are no changes + if r.instance.Spec.PolicyID == existingPolicy.PolicyID && cmp.Equal(*newPolicy, existingPolicy.Policy, cmpopts.EquateEmpty()) { + r.logger.V(1).Info(fmt.Sprintf("user %s is in sync", r.instance.Name)) + r.recorder.Event(r.instance, "Normal", opensearchAPIUnchanged, "policy is in sync") + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil + } + request := requests.ISMPolicy{ + Policy: *newPolicy, } - retErr = services.UpdateISMPolicy(r.ctx, r.osClient, *ismpolicy, seqno, priterm, policyId) + retErr = services.UpdateISMPolicy(r.ctx, r.osClient, request, &existingPolicy.SequenceNumber, &existingPolicy.PrimaryTerm, existingPolicy.PolicyID) if retErr != nil { reason = "failed to update ism policy with Opensearch API" r.logger.Error(retErr, reason) r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } r.recorder.Event(r.instance, "Normal", opensearchAPIUpdated, "policy updated in opensearch") - - return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, retErr + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil } -func (r *IsmPolicyReconciler) CreateISMPolicyRequest() (*requests.Policy, error) { - policy := requests.ISMPolicy{ +func (r *IsmPolicyReconciler) CreateISMPolicy() (*requests.ISMPolicySpec, error) { + policy := requests.ISMPolicySpec{ DefaultState: r.instance.Spec.DefaultState, Description: r.instance.Spec.Description, } @@ -378,35 +400,35 @@ func (r *IsmPolicyReconciler) CreateISMPolicyRequest() (*requests.Policy, error) shrink.ForceUnsafe = action.Shrink.ForceUnsafe } if action.Shrink.MaxShardSize == nil && action.Shrink.NumNewShards == nil && action.Shrink.PercentageOfSourceShards == nil { - reason := "Either of MaxShardSize or NumNewShards or PercentageOfSourceShards is required" - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "either of MaxShardSize or NumNewShards or PercentageOfSourceShards is required" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } if action.Shrink.MaxShardSize != nil { if action.Shrink.NumNewShards == nil && action.Shrink.PercentageOfSourceShards == nil { shrink.MaxShardSize = action.Shrink.MaxShardSize } else { - reason := "MaxShardSize can't exist with NumNewShards or PercentageOfSourceShards. Keep one of these." - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "maxShardSize can't exist with NumNewShards or PercentageOfSourceShards. Keep one of these" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } if action.Shrink.NumNewShards != nil { if action.Shrink.MaxShardSize == nil && action.Shrink.PercentageOfSourceShards == nil { shrink.NumNewShards = action.Shrink.NumNewShards } else { - reason := "NumNewShards can't exist with MaxShardSize or PercentageOfSourceShards. Keep one of these." - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "numNewShards can't exist with MaxShardSize or PercentageOfSourceShards. Keep one of these" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } } if action.Shrink.PercentageOfSourceShards != nil { if action.Shrink.NumNewShards == nil && action.Shrink.MaxShardSize == nil { shrink.PercentageOfSourceShards = action.Shrink.PercentageOfSourceShards } else { - reason := "PercentageOfSourceShards can't exist with MaxShardSize or NumNewShards. Keep one of these." - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "percentageOfSourceShards can't exist with MaxShardSize or NumNewShards. Keep one of these" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } } if action.Shrink.TargetIndexNameTemplate != nil { @@ -515,10 +537,8 @@ func (r *IsmPolicyReconciler) CreateISMPolicyRequest() (*requests.Policy, error) policy.States = append(policy.States, requests.State{Actions: actions, Name: state.Name, Transitions: transitions}) } } - ismPolicy := requests.Policy{ - Policy: policy, - } - return &ismPolicy, nil + + return &policy, nil } // Delete ISM policy from the OS cluster @@ -527,10 +547,12 @@ func (r *IsmPolicyReconciler) Delete() error { if r.instance.Status.ExistingISMPolicy == nil { return nil } + if *r.instance.Status.ExistingISMPolicy { r.logger.Info("policy was pre-existing; not deleting") return nil } + var err error r.cluster, err = util.FetchOpensearchCluster(r.client, r.ctx, types.NamespacedName{ Name: r.instance.Spec.OpensearchRef.Name, @@ -544,15 +566,18 @@ func (r *IsmPolicyReconciler) Delete() error { // If the opensearch cluster doesn't exist, we don't need to delete anything return nil } + r.osClient, err = util.CreateClientForCluster(r.client, r.ctx, r.cluster, r.osClientTransport) if err != nil { return err } + // If PolicyID not provided explicitly, use metadata.name by default policyId := r.instance.Spec.PolicyID - if r.instance.Spec.PolicyID == "" { + if policyId == "" { policyId = r.instance.Name } + err = services.DeleteISMPolicy(r.ctx, r.osClient, policyId) if err != nil { return err diff --git a/opensearch-operator/pkg/reconcilers/ismpolicy_test.go b/opensearch-operator/pkg/reconcilers/ismpolicy_test.go index 22351613..a7b9d290 100644 --- a/opensearch-operator/pkg/reconcilers/ismpolicy_test.go +++ b/opensearch-operator/pkg/reconcilers/ismpolicy_test.go @@ -21,7 +21,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) -var seqno *int = new(int) var _ = Describe("ism policy reconciler", func() { var ( transport *httpmock.MockTransport @@ -117,7 +116,7 @@ var _ = Describe("ism policy reconciler", func() { recorder = record.NewFakeRecorder(1) mockClient.EXPECT().GetOpenSearchCluster(mock.Anything, mock.Anything).Return(*cluster, nil) }) - It("should wait for the cluster to be running", func() { + It("should emit a unit test event and requeue", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) @@ -140,6 +139,7 @@ var _ = Describe("ism policy reconciler", func() { cluster.Status.Phase = opsterv1.PhaseRunning cluster.Status.ComponentsStatus = []opsterv1.ComponentStatus{} mockClient.EXPECT().GetOpenSearchCluster(mock.Anything, mock.Anything).Return(*cluster, nil) + recorder = record.NewFakeRecorder(1) transport.RegisterResponder( http.MethodGet, @@ -162,44 +162,73 @@ var _ = Describe("ism policy reconciler", func() { ) }) - When("existing status is true", func() { + When("cluster reference mismatch", func() { BeforeEach(func() { - instance.Status.ExistingISMPolicy = pointer.Bool(true) + managedCluster := types.UID("different-uid") + instance.Status.ManagedCluster = &managedCluster }) - It("should do nothing", func() { - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) + It("should emit a unit test event and not requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).To(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s cannot change the cluster a resource refers to", opensearchRefMismatch))) }) }) - When("existing status is nil", func() { - var localExtraCalls = 4 + When("policy does not exist in opensearch", func() { BeforeEach(func() { - policyRequest := requests.ISMPolicy{ - DefaultState: "abc", - Description: "test", - } + mockClient.EXPECT().UdateObjectStatus(mock.Anything, mock.Anything).Return(nil) - recorder = record.NewFakeRecorder(1) transport.RegisterResponder( http.MethodGet, fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/", + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", cluster.Spec.General.ServiceName, cluster.Namespace, + instance.Name, ), - httpmock.NewStringResponder(200, "OK").Times(4, failMessage), + httpmock.NewStringResponder(404, "Not Found").Once(), ) transport.RegisterResponder( - http.MethodHead, + http.MethodPut, fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/", + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", cluster.Spec.General.ServiceName, cluster.Namespace, + instance.Name, ), - httpmock.NewStringResponder(200, "OK").Times(2, failMessage), + httpmock.NewStringResponder(200, "OK").Once(), ) + }) + It("should create the policy, emit a unit test event, and requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy successfully created in OpenSearch Cluster", opensearchAPIUpdated))) + }) + }) + + When("failed to get policy from opensearch api", func() { + BeforeEach(func() { transport.RegisterResponder( http.MethodGet, fmt.Sprintf( @@ -208,118 +237,61 @@ var _ = Describe("ism policy reconciler", func() { cluster.Namespace, instance.Name, ), - httpmock.NewJsonResponderOrPanic(200, responses.GetISMPoliciesResponse{ - Policy: policyRequest, - }).Then( - httpmock.NewStringResponder(404, "does not exist"), - ).Then( - httpmock.NewNotFoundResponder(failMessage), - ), + httpmock.NewErrorResponder(fmt.Errorf("failed to get policy")).Once(), ) }) - - It("should do nothing and emit a unit test event", func() { + It("should emit a unit test event, requeue, and return an error", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - _, err = reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - // Confirm all responders have been called - Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls + localExtraCalls)) + result, err := reconciler.Reconcile() + Expect(err).To(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) }() var events []string for msg := range recorder.Events { events = append(events, msg) } - Expect(len(events)).To(Equal(2)) - Expect(events[0]).To(Equal("Normal UnitTest exists is true")) - Expect(events[1]).To(Equal("Normal UnitTest exists is false")) + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s failed to get the ism policy from Opensearch API", opensearchAPIError))) }) }) - When("existing status is true", func() { + Context("policy exists in opensearch", func() { BeforeEach(func() { - instance.Status.ExistingISMPolicy = pointer.Bool(true) - }) - It("should do nothing", func() { - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - }) - }) + instance.Spec.PolicyID = "test-policy-id" - When("existing status is false", func() { - BeforeEach(func() { - instance.Status.ExistingISMPolicy = pointer.Bool(false) + transport.RegisterResponder( + http.MethodGet, + fmt.Sprintf( + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", + cluster.Spec.General.ServiceName, + cluster.Namespace, + instance.Spec.PolicyID, + ), + httpmock.NewJsonResponderOrPanic(200, responses.GetISMPolicyResponse{ + PolicyID: "test-policy-id", + Policy: requests.ISMPolicySpec{ + DefaultState: "test-state", + Description: "test-policy", + }, + }).Once(), + ) }) - When("policy exists in opensearch and is the same", func() { + When("existing status is nil", func() { BeforeEach(func() { - policyRequest := requests.ISMPolicy{ - DefaultState: "", - Description: "", - } - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewJsonResponderOrPanic(200, responses.GetISMPoliciesResponse{ - Policy: policyRequest, - SequenceNumber: seqno, - PrimaryTerm: seqno, - }).Once(failMessage), - ) - }) - It("should do nothing", func() { - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) + mockClient.EXPECT().UdateObjectStatus(mock.Anything, mock.Anything).Return(nil) + instance.Status.ExistingISMPolicy = nil }) - }) - When("policy exists in opensearch and is not the same", func() { - BeforeEach(func() { - recorder = record.NewFakeRecorder(1) - policyRequest := requests.ISMPolicy{ - DefaultState: "policy", - Description: "test-policy", - } - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewJsonResponderOrPanic(200, responses.GetISMPoliciesResponse{ - Policy: policyRequest, - SequenceNumber: seqno, - PrimaryTerm: seqno, - PolicyID: "test-policy", - }).Once(failMessage), - ) - transport.RegisterResponder( - http.MethodPut, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s?if_seq_no=0&if_primary_term=0", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) - }) - It("should update the policy", func() { + + It("should emit a unit test event and requeue", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) - _, err := reconciler.Reconcile() + result, err := reconciler.Reconcile() Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) // Confirm all responders have been called Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) }() @@ -328,39 +300,23 @@ var _ = Describe("ism policy reconciler", func() { events = append(events, msg) } Expect(len(events)).To(Equal(1)) - Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy updated in opensearch", opensearchAPIUpdated))) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s the ISM policy already exists in the OpenSearch cluster", opensearchIsmPolicyExists))) }) }) - When("policy doesn't exist in opensearch", func() { + + When("existing status is true", func() { BeforeEach(func() { - recorder = record.NewFakeRecorder(1) - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(404, "OK").Once(failMessage), - ) - transport.RegisterResponder( - http.MethodPut, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) + mockClient.EXPECT().UdateObjectStatus(mock.Anything, mock.Anything).Return(nil) + instance.Status.ExistingISMPolicy = pointer.Bool(true) }) - It("should create the policy", func() { + + It("should emit a unit test event and requeue", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) - _, err := reconciler.Reconcile() + result, err := reconciler.Reconcile() Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) // Confirm all responders have been called Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) }() @@ -369,7 +325,72 @@ var _ = Describe("ism policy reconciler", func() { events = append(events, msg) } Expect(len(events)).To(Equal(1)) - Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy created in opensearch", opensearchAPIUpdated))) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s the ISM policy already exists in the OpenSearch cluster", opensearchIsmPolicyExists))) + }) + }) + + Context("existing status is false", func() { + BeforeEach(func() { + instance.Status.ExistingISMPolicy = pointer.Bool(false) + }) + + When("policy is the same", func() { + BeforeEach(func() { + instance.Spec.DefaultState = "test-state" + instance.Spec.Description = "test-policy" + }) + It("should emit a unit test event and requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + // Confirm all responders have been called + Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy is in sync", opensearchAPIUnchanged))) + }) + }) + + When("policy is not the same", func() { + BeforeEach(func() { + instance.Spec.DefaultState = "test-state2" + instance.Spec.Description = "test-policy2" + + transport.RegisterResponder( + http.MethodPut, + fmt.Sprintf( + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", + cluster.Spec.General.ServiceName, + cluster.Namespace, + instance.Spec.PolicyID, + ), + httpmock.NewStringResponder(200, "OK").Once(), + ) + }) + It("should update ism policy, emit a unit test event, and requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + // Confirm all responders have been called + Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy updated in opensearch", opensearchAPIUpdated))) + }) }) }) }) @@ -406,9 +427,14 @@ var _ = Describe("ism policy reconciler", func() { }) }) - When("policy does not exist", func() { + Context("cluster is ready", func() { + // extraContextCalls := 1 BeforeEach(func() { + cluster.Status.Phase = opsterv1.PhaseRunning + cluster.Status.ComponentsStatus = []opsterv1.ComponentStatus{} mockClient.EXPECT().GetOpenSearchCluster(mock.Anything, mock.Anything).Return(*cluster, nil) + recorder = record.NewFakeRecorder(1) + transport.RegisterResponder( http.MethodGet, fmt.Sprintf( @@ -418,6 +444,7 @@ var _ = Describe("ism policy reconciler", func() { ), httpmock.NewStringResponder(200, "OK").Times(2, failMessage), ) + transport.RegisterResponder( http.MethodHead, fmt.Sprintf( @@ -427,75 +454,42 @@ var _ = Describe("ism policy reconciler", func() { ), httpmock.NewStringResponder(200, "OK").Once(failMessage), ) - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(404, "does not exist").Once(failMessage), - ) - transport.RegisterResponder( - http.MethodDelete, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) }) - It("should do nothing and exit", func() { - Expect(reconciler.Delete()).To(Succeed()) - }) - }) - When("policy does exist", func() { - BeforeEach(func() { - mockClient.EXPECT().GetOpenSearchCluster(mock.Anything, mock.Anything).Return(*cluster, nil) - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/", - cluster.Spec.General.ServiceName, - cluster.Namespace, - ), - httpmock.NewStringResponder(200, "OK").Times(2, failMessage), - ) - transport.RegisterResponder( - http.MethodHead, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/", - cluster.Spec.General.ServiceName, - cluster.Namespace, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) - transport.RegisterResponder( - http.MethodDelete, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) + + When("policy does not exist", func() { + BeforeEach(func() { + transport.RegisterResponder( + http.MethodDelete, + fmt.Sprintf( + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", + cluster.Spec.General.ServiceName, + cluster.Namespace, + instance.Name, + ), + httpmock.NewStringResponder(404, "does not exist").Once(failMessage), + ) + }) + It("should do nothing and exit", func() { + Expect(reconciler.Delete()).NotTo(Succeed()) + }) }) - It("should delete the policy", func() { - Expect(reconciler.Delete()).To(Succeed()) + + When("policy does exist", func() { + BeforeEach(func() { + transport.RegisterResponder( + http.MethodDelete, + fmt.Sprintf( + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", + cluster.Spec.General.ServiceName, + cluster.Namespace, + instance.Name, + ), + httpmock.NewStringResponder(200, "OK").Once(failMessage), + ) + }) + It("should delete the policy", func() { + Expect(reconciler.Delete()).To(Succeed()) + }) }) }) }) diff --git a/opensearch-operator/pkg/reconcilers/reconcilers.go b/opensearch-operator/pkg/reconcilers/reconcilers.go index 6531bc6f..c0e644ee 100644 --- a/opensearch-operator/pkg/reconcilers/reconcilers.go +++ b/opensearch-operator/pkg/reconcilers/reconcilers.go @@ -14,13 +14,15 @@ import ( ) const ( - opensearchPending = "OpensearchPending" - opensearchError = "OpensearchError" - opensearchAPIError = "OpensearchAPIError" - opensearchRefMismatch = "OpensearchRefMismatch" - opensearchAPIUpdated = "OpensearchAPIUpdated" - passwordError = "PasswordError" - statusError = "StatusUpdateError" + opensearchPending = "OpensearchPending" + opensearchError = "OpensearchError" + opensearchAPIError = "OpensearchAPIError" + opensearchRefMismatch = "OpensearchRefMismatch" + opensearchAPIUpdated = "OpensearchAPIUpdated" + opensearchAPIUnchanged = "OpensearchAPIUnchanged" + opensearchCustomResourceError = "OpensearchCustomResourceError" + passwordError = "PasswordError" + statusError = "StatusUpdateError" ) type ComponentReconciler func() (reconcile.Result, error) From fb3812d58795838607bbfaeefd6670d3762a71f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gergely=20Szab=C3=B3?= Date: Thu, 19 Sep 2024 15:12:59 +0200 Subject: [PATCH 6/7] Adding pluginsList and keystore fields to BootstrapConfig (#862) ### Description This change adds the `bootstrap.keystore` and `bootstrap.pluginsList` fields to the OpenSearchCluster custom resource. This changes how the bootstrap pod is generated. The behavior is the same as with the `general.keystore` and `general.pluginsList` fields. ### Issues Resolved Closes https://github.com/opensearch-project/opensearch-k8s-operator/issues/430 and https://github.com/opensearch-project/opensearch-k8s-operator/issues/639. ### Check List - [x] Commits are signed per the DCO using --signoff - [x] Unittest added for the new/changed functionality and all unit tests are successful - [x] Customer-visible features documented - [x] No linter warnings (`make lint`) If CRDs are changed: - [x] CRD YAMLs updated (`make manifests`) and also copied into the helm chart - [x] Changes to CRDs documented By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --------- Signed-off-by: Gergely Szabo --- ...ensearch.opster.io_opensearchclusters.yaml | 25 +++++ docs/designs/crd.md | 24 +++-- docs/userguide/main.md | 34 +++++-- .../api/v1/opensearch_types.go | 2 + .../api/v1/zz_generated.deepcopy.go | 12 +++ ...ensearch.opster.io_opensearchclusters.yaml | 25 +++++ opensearch-operator/pkg/builders/cluster.go | 93 +++++++++++++++++++ .../pkg/builders/cluster_test.go | 88 ++++++++++++++++++ 8 files changed, 290 insertions(+), 13 deletions(-) diff --git a/charts/opensearch-operator/files/opensearch.opster.io_opensearchclusters.yaml b/charts/opensearch-operator/files/opensearch.opster.io_opensearchclusters.yaml index 1ea3615c..b329b1bc 100644 --- a/charts/opensearch-operator/files/opensearch.opster.io_opensearchclusters.yaml +++ b/charts/opensearch-operator/files/opensearch.opster.io_opensearchclusters.yaml @@ -839,10 +839,35 @@ spec: type: object jvm: type: string + keystore: + items: + properties: + keyMappings: + additionalProperties: + type: string + description: Key mappings from secret to keystore keys + type: object + secret: + description: Secret containing key value pairs + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: object + type: array nodeSelector: additionalProperties: type: string type: object + pluginsList: + items: + type: string + type: array resources: description: ResourceRequirements describes the compute resource requirements. diff --git a/docs/designs/crd.md b/docs/designs/crd.md index ceb49c0e..52bb82b1 100644 --- a/docs/designs/crd.md +++ b/docs/designs/crd.md @@ -6,7 +6,7 @@ A resource is an endpoint in the Kubernetes API that stores a collection of API A [Custom Resource](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) is an extension of the Kubernetes API, many core Kubernetes functions are now built using custom resources, making Kubernetes more modular. Cluster admins can update custom resources independently of the cluster itself. Once a custom resource is installed, users can create and access its objects using kubectl, just as they do for built-in resources like Pods. -The CustomResourceDefinition API resource allows you to define custom resources. Defining a CRD object creates a new custom resource with a name and schema that you specify. The Kubernetes API serves and handles the storage of your custom resource. Every resource is build from `KGV` that stands for Group Version Resource and this is what drives the Kubernetes API Server structure. +The CustomResourceDefinition API resource allows you to define custom resources. Defining a CRD object creates a new custom resource with a name and schema that you specify. The Kubernetes API serves and handles the storage of your custom resource. Every resource is build from `KGV` that stands for Group Version Resource and this is what drives the Kubernetes API Server structure. The `OpensearchCLuster` CRD is representing an Opensearch cluster. @@ -115,7 +115,7 @@ ClusterSpec defines the desired state of OpensearchCluster GeneralConfig -GeneralConfig defines global Opensearch cluster configuration +GeneralConfig defines global Opensearch cluster configuration @@ -290,6 +290,18 @@ Bootstrap defines Opensearch bootstrap pod configuration + + + + + + + + + + + +
Added extra items to opensearch.yml in the bootstrap pod map[string]string general.additionalConfig
keystore[]opsterv1.KeystoreValueList of objects that define secret values that will populate the opensearch keystore in the bootstrap podfalse -
pluginsList[]stringList of plugins that should be installed for OpenSearch at startup in the boostrap podfalse []
@@ -432,7 +444,7 @@ Dashboards defines Opensearch-Dashboard configuration and deployment NodePools -Every NodePool is defining different Opensearch Nodes StatefulSet +Every NodePool is defining different Opensearch Nodes StatefulSet @@ -581,8 +593,8 @@ InitHelperConfig defines global Opensearch InitHelper image configuration - - + +
string Version of InitHelper (busybox) image to deploy false1.27.2-buildx
1.27.2-buildx

@@ -676,7 +688,7 @@ Monitoring TLS configuration options Keystore

-Every Keystore Value defines a secret to pull secrets from. +Every Keystore Value defines a secret to pull secrets from. diff --git a/docs/userguide/main.md b/docs/userguide/main.md index 7f55780e..227c3f71 100644 --- a/docs/userguide/main.md +++ b/docs/userguide/main.md @@ -124,7 +124,7 @@ spec: nodePools: - component: masters replicas: 3 # The number of replicas - diskSize: "30Gi" # The disk size to use + diskSize: "30Gi" # The disk size to use resources: # The resource requests and limits for that nodepool requests: memory: "2Gi" @@ -221,7 +221,7 @@ If you provide your own node certificates you must also provide an admin cert th spec: security: config: - adminSecret: + adminSecret: name: my-first-cluster-admin-cert # The secret must have keys tls.crt and tls.key ``` @@ -278,6 +278,14 @@ To install a plugin for opensearch dashboards add it to the list under `dashboar - sample-plugin-name ``` +To install a plugin for the bootstrap pod add it to the list under `bootstrap.pluginsList`: + +```yaml + bootstrap: + pluginsList: ["repository-s3"] +``` + + Please note: * [Bundled plugins](https://opensearch.org/docs/latest/install-and-configure/install-opensearch/plugins/#bundled-plugins) do not have to be added to the list, they are installed automatically @@ -323,6 +331,18 @@ If you only want to load some keys from a secret or rename the existing keys, yo Note that only provided keys will be loaded from the secret! Any keys not specified will be ignored. +To populate the keystore of the boostrap pod add the secrets under the `bootstrap.keystore` section: + +```yaml + bootstrap: + # ... + keystore: + - secret: + name: credentials + - secret: + name: some-other-secret +``` + ### SmartScaler What is SmartScaler? @@ -382,7 +402,7 @@ You can configure the snapshot repositories for the OpenSearch cluster through t ```yaml spec: general: - snapshotRepositories: + snapshotRepositories: - name: my_s3_repository_1 type: s3 settings: @@ -737,7 +757,7 @@ spec: projected: sources: serviceAccountToken: - path: "token" + path: "token" dashboards: additionalVolumes: - name: example-secret @@ -775,7 +795,7 @@ spec: env: - name: MY_ENV_VAR value: "myvalue" - # the other options are supported here as well + # the other options are supported here as well ``` ### Custom cluster domain name @@ -793,7 +813,7 @@ manager: During cluster initialization the operator uses init containers as helpers. For these containers a busybox image is used ( specifically `docker.io/busybox:latest`). In case you are working in an offline environment and the cluster cannot access the registry or you want to customize the image, you can override the image used by specifying the `initHelper` image in your cluster spec: ```yaml - spec: + spec: initHelper: # You can either only specify the version version: "1.27.2-buildcustom" @@ -1393,7 +1413,7 @@ metadata: spec: opensearchCluster: name: my-first-cluster - + name: logs_template # name of the index template - defaults to metadata.name. Can't be updated in-place indexPatterns: # required index patterns diff --git a/opensearch-operator/api/v1/opensearch_types.go b/opensearch-operator/api/v1/opensearch_types.go index 75e822ab..7da1e55a 100644 --- a/opensearch-operator/api/v1/opensearch_types.go +++ b/opensearch-operator/api/v1/opensearch_types.go @@ -171,6 +171,8 @@ type BootstrapConfig struct { Jvm string `json:"jvm,omitempty"` // Extra items to add to the opensearch.yml, defaults to General.AdditionalConfig AdditionalConfig map[string]string `json:"additionalConfig,omitempty"` + PluginsList []string `json:"pluginsList,omitempty"` + Keystore []KeystoreValue `json:"keystore,omitempty"` } type DashboardsServiceSpec struct { diff --git a/opensearch-operator/api/v1/zz_generated.deepcopy.go b/opensearch-operator/api/v1/zz_generated.deepcopy.go index 055616a0..fb0068d7 100644 --- a/opensearch-operator/api/v1/zz_generated.deepcopy.go +++ b/opensearch-operator/api/v1/zz_generated.deepcopy.go @@ -295,6 +295,18 @@ func (in *BootstrapConfig) DeepCopyInto(out *BootstrapConfig) { (*out)[key] = val } } + if in.PluginsList != nil { + in, out := &in.PluginsList, &out.PluginsList + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Keystore != nil { + in, out := &in.Keystore, &out.Keystore + *out = make([]KeystoreValue, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BootstrapConfig. diff --git a/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml b/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml index 1ea3615c..b329b1bc 100644 --- a/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml +++ b/opensearch-operator/config/crd/bases/opensearch.opster.io_opensearchclusters.yaml @@ -839,10 +839,35 @@ spec: type: object jvm: type: string + keystore: + items: + properties: + keyMappings: + additionalProperties: + type: string + description: Key mappings from secret to keystore keys + type: object + secret: + description: Secret containing key value pairs + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: object + type: array nodeSelector: additionalProperties: type: string type: object + pluginsList: + items: + type: string + type: array resources: description: ResourceRequirements describes the compute resource requirements. diff --git a/opensearch-operator/pkg/builders/cluster.go b/opensearch-operator/pkg/builders/cluster.go index 60938e9f..0c5c2f76 100644 --- a/opensearch-operator/pkg/builders/cluster.go +++ b/opensearch-operator/pkg/builders/cluster.go @@ -870,6 +870,98 @@ func NewBootstrapPod( }) } + // If Keystore Values are set in OpenSearchCluster manifest + if cr.Spec.Bootstrap.Keystore != nil && len(cr.Spec.Bootstrap.Keystore) > 0 { + + // Add volume and volume mount for keystore + volumes = append(volumes, corev1.Volume{ + Name: "keystore", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }) + + volumeMounts = append(volumeMounts, corev1.VolumeMount{ + Name: "keystore", + MountPath: "/usr/share/opensearch/config/opensearch.keystore", + SubPath: "opensearch.keystore", + }) + + initContainerVolumeMounts := []corev1.VolumeMount{ + { + Name: "keystore", + MountPath: "/tmp/keystore", + }, + } + + // Add volumes and volume mounts for keystore secrets + for _, keystoreValue := range cr.Spec.Bootstrap.Keystore { + volumes = append(volumes, corev1.Volume{ + Name: "keystore-" + keystoreValue.Secret.Name, + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: keystoreValue.Secret.Name, + }, + }, + }) + + if keystoreValue.KeyMappings == nil || len(keystoreValue.KeyMappings) == 0 { + // If no renames are necessary, mount secret key-value pairs directly + initContainerVolumeMounts = append(initContainerVolumeMounts, corev1.VolumeMount{ + Name: "keystore-" + keystoreValue.Secret.Name, + MountPath: "/tmp/keystoreSecrets/" + keystoreValue.Secret.Name, + }) + } else { + keys := helpers.SortedKeys(keystoreValue.KeyMappings) + for _, oldKey := range keys { + initContainerVolumeMounts = append(initContainerVolumeMounts, corev1.VolumeMount{ + Name: "keystore-" + keystoreValue.Secret.Name, + MountPath: "/tmp/keystoreSecrets/" + keystoreValue.Secret.Name + "/" + keystoreValue.KeyMappings[oldKey], + SubPath: oldKey, + }) + } + } + } + + keystoreInitContainer := corev1.Container{ + Name: "keystore", + Image: image.GetImage(), + ImagePullPolicy: image.GetImagePullPolicy(), + Resources: resources, + Command: []string{ + "sh", + "-c", + ` + #!/usr/bin/env bash + set -euo pipefail + + /usr/share/opensearch/bin/opensearch-keystore create + for i in /tmp/keystoreSecrets/*/*; do + key=$(basename $i) + echo "Adding file $i to keystore key $key" + /usr/share/opensearch/bin/opensearch-keystore add-file "$key" "$i" + done + + # Add the bootstrap password since otherwise the opensearch entrypoint tries to do this on startup + if [ ! -z ${PASSWORD+x} ]; then + echo 'Adding env $PASSWORD to keystore as key bootstrap.password' + echo "$PASSWORD" | /usr/share/opensearch/bin/opensearch-keystore add -x bootstrap.password + fi + + cp -a /usr/share/opensearch/config/opensearch.keystore /tmp/keystore/ + `, + }, + VolumeMounts: initContainerVolumeMounts, + SecurityContext: securityContext, + } + + initContainers = append(initContainers, keystoreInitContainer) + } + + startUpCommand := "./opensearch-docker-entrypoint.sh" + + pluginslist := helpers.RemoveDuplicateStrings(cr.Spec.Bootstrap.PluginsList) + mainCommand := helpers.BuildMainCommand("./bin/opensearch-plugin", pluginslist, true, startUpCommand) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: BootstrapPodName(cr), @@ -881,6 +973,7 @@ func NewBootstrapPod( { Env: env, Name: "opensearch", + Command: mainCommand, Image: image.GetImage(), ImagePullPolicy: image.GetImagePullPolicy(), Resources: resources, diff --git a/opensearch-operator/pkg/builders/cluster_test.go b/opensearch-operator/pkg/builders/cluster_test.go index 7b0e2580..bd6f8824 100644 --- a/opensearch-operator/pkg/builders/cluster_test.go +++ b/opensearch-operator/pkg/builders/cluster_test.go @@ -42,6 +42,23 @@ func ClusterDescWithKeystoreSecret(secretName string, keyMappings map[string]str } } +func ClusterDescWithBootstrapKeystoreSecret(secretName string, keyMappings map[string]string) opsterv1.OpenSearchCluster { + return opsterv1.OpenSearchCluster{ + Spec: opsterv1.ClusterSpec{ + Bootstrap: opsterv1.BootstrapConfig{ + Keystore: []opsterv1.KeystoreValue{ + { + Secret: corev1.LocalObjectReference{ + Name: secretName, + }, + KeyMappings: keyMappings, + }, + }, + }, + }, + } +} + func ClusterDescWithAdditionalConfigs(addtitionalConfig map[string]string, bootstrapAdditionalConfig map[string]string) opsterv1.OpenSearchCluster { return opsterv1.OpenSearchCluster{ Spec: opsterv1.ClusterSpec{ @@ -449,6 +466,77 @@ var _ = Describe("Builders", func() { Value: mockBootstrapConfig[mockKey2], })) }) + It("should properly setup the main command when installing plugins", func() { + clusterObject := ClusterDescWithVersion("2.2.1") + pluginA := "some-plugin" + pluginB := "another-plugin" + + clusterObject.Spec.Bootstrap.PluginsList = []string{pluginA, pluginB} + result := NewBootstrapPod(&clusterObject, nil, nil) + + installCmd := fmt.Sprintf( + "./bin/opensearch-plugin install --batch '%s' '%s' && ./opensearch-docker-entrypoint.sh", + pluginA, + pluginB, + ) + + expected := []string{ + "/bin/bash", + "-c", + installCmd, + } + + actual := result.Spec.Containers[0].Command + + Expect(expected).To(Equal(actual)) + }) + }) + + When("Constructing a bootstrap pod with Keystore Values", func() { + It("should create a proper initContainer", func() { + mockSecretName := "some-secret" + clusterObject := ClusterDescWithBootstrapKeystoreSecret(mockSecretName, nil) + + result := NewBootstrapPod(&clusterObject, nil, nil) + Expect(result.Spec.InitContainers[1].VolumeMounts).To(ContainElements([]corev1.VolumeMount{ + { + Name: "keystore", + MountPath: "/tmp/keystore", + }, + { + Name: "keystore-" + mockSecretName, + MountPath: "/tmp/keystoreSecrets/" + mockSecretName, + }, + })) + }) + + It("should mount the prefilled keystore into the opensearch container", func() { + mockSecretName := "some-secret" + clusterObject := ClusterDescWithBootstrapKeystoreSecret(mockSecretName, nil) + result := NewBootstrapPod(&clusterObject, nil, nil) + Expect(result.Spec.Containers[0].VolumeMounts).To(ContainElement(corev1.VolumeMount{ + Name: "keystore", + MountPath: "/usr/share/opensearch/config/opensearch.keystore", + SubPath: "opensearch.keystore", + })) + }) + + It("should properly rename secret keys when key mappings are given", func() { + mockSecretName := "some-secret" + oldKey := "old-key" + newKey := "new-key" + + keyMappings := map[string]string{ + oldKey: newKey, + } + clusterObject := ClusterDescWithBootstrapKeystoreSecret(mockSecretName, keyMappings) + result := NewBootstrapPod(&clusterObject, nil, nil) + Expect(result.Spec.InitContainers[1].VolumeMounts).To(ContainElement(corev1.VolumeMount{ + Name: "keystore-" + mockSecretName, + MountPath: "/tmp/keystoreSecrets/" + mockSecretName + "/" + newKey, + SubPath: oldKey, + })) + }) }) When("Constructing a STS for a NodePool with Keystore Values", func() { From d3bf10cf1f0642137980588e9f4a546847f8ca18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20S=C3=A1nchez=20Rojas?= <100773517+isrojas1@users.noreply.github.com> Date: Fri, 11 Oct 2024 15:19:44 +0200 Subject: [PATCH 7/7] Fix typo in userguide (#879) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Description Trivial typo in docs/userguide/main.md. ### Issues Resolved ### Check List - [x] Commits are signed per the DCO using --signoff - [ ] Unittest added for the new/changed functionality and all unit tests are successful - [ ] Customer-visible features documented - [ ] No linter warnings (`make lint`) If CRDs are changed: - [ ] CRD YAMLs updated (`make manifests`) and also copied into the helm chart - [ ] Changes to CRDs documented Please refer to the [PR guidelines](https://github.com/opensearch-project/opensearch-k8s-operator/blob/main/docs/developing.md#submitting-a-pr) before submitting this pull request. By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin). Signed-off-by: Iván Sánchez Rojas <100773517+isrojas1@users.noreply.github.com> --- docs/userguide/main.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/userguide/main.md b/docs/userguide/main.md index 227c3f71..07f83ef6 100644 --- a/docs/userguide/main.md +++ b/docs/userguide/main.md @@ -255,7 +255,7 @@ Directly exposing the node HTTP port outside the Kubernetes cluster is not recom ### Adding plugins -You can extend the functionality of OpenSearch via [plugins](https://opensearch.org/docs/latest/install-and-configure/install-opensearch/plugins/#available-plugins). Commonly used ones are snapshot repository plugins for external backups (e.g. to AWS S3 or Azure Blog Storage). The operator has support to automatically install such plugins during setup. +You can extend the functionality of OpenSearch via [plugins](https://opensearch.org/docs/latest/install-and-configure/install-opensearch/plugins/#available-plugins). Commonly used ones are snapshot repository plugins for external backups (e.g. to AWS S3 or Azure Blob Storage). The operator has support to automatically install such plugins during setup. To install a plugin for opensearch add it to the list under `general.pluginsList`: