Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
IshaGirdhar committed Oct 10, 2023
1 parent 2b98982 commit d76b701
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 88 deletions.
4 changes: 2 additions & 2 deletions docs/userguide/main.md
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ It is possible to manage OpenSearch ISM policies in Kubernetes with the operator
```yaml
apiVersion: opensearch.opster.io/v1
kind: ISMPolicy
kind: OpensearchISMPolicy
metadata:
name: sample-policy
spec:
Expand Down Expand Up @@ -1241,7 +1241,7 @@ spec:
- delete: {}
```
The namespace of the `ISMPolicy` must be the namespace the OpenSearch cluster itself is deployed in. `policyId` is an optional field, and if not provided `metadata.name` is used as the default.
The namespace of the `OpensearchISMPolicy` must be the namespace the OpenSearch cluster itself is deployed in. `policyId` is an optional field, and if not provided `metadata.name` is used as the default.
```

## Managing index and component templates
Expand Down
11 changes: 6 additions & 5 deletions opensearch-operator/api/v1/opensearchism_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type OpensearchISMPolicyStatus struct {
Reason string `json:"reason,omitempty"`
ExistingISMPolicy *bool `json:"existingISMPolicy,omitempty"`
ManagedCluster *types.UID `json:"managedCluster,omitempty"`
PolicyId string `json:"policyId,omitempty"`
}

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand All @@ -29,15 +30,15 @@ type OpensearchISMPolicyStatus struct {
// +kubebuilder:object:root=true
// +kubebuilder:resource:shortName=ismp;ismpolicy
// +kubebuilder:subresource:status
type ISMPolicy struct {
type OpenSearchISMPolicy struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec ISMPolicySpec `json:"spec,omitempty"`
Spec OpenSearchISMPolicySpec `json:"spec,omitempty"`
Status OpensearchISMPolicyStatus `json:"status,omitempty"`
}

// ISMPolicySpec is the specification for the ISM policy for OS.
type ISMPolicySpec struct {
type OpenSearchISMPolicySpec struct {
OpensearchRef corev1.LocalObjectReference `json:"opensearchCluster,omitempty"`
// The default starting state for each index that uses this policy.
DefaultState string `json:"defaultState"`
Expand Down Expand Up @@ -257,9 +258,9 @@ type Cron struct {
type ISMPolicyList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []ISMPolicy `json:"items"`
Items []OpenSearchISMPolicy `json:"items"`
}

func init() {
SchemeBuilder.Register(&ISMPolicy{}, &ISMPolicyList{})
SchemeBuilder.Register(&OpenSearchISMPolicy{}, &ISMPolicyList{})
}
122 changes: 61 additions & 61 deletions opensearch-operator/api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions opensearch-operator/controllers/opensearchism_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type OpensearchISMPolicyReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
Instance *opsterv1.ISMPolicy
Instance *opsterv1.OpenSearchISMPolicy
logr.Logger
}

Expand All @@ -31,7 +31,7 @@ type OpensearchISMPolicyReconciler struct {
func (r *OpensearchISMPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.Logger = log.FromContext(ctx).WithValues("tenant", req.NamespacedName)
r.Logger.Info("Reconciling OpensearchISMPolicy")
r.Instance = &opsterv1.ISMPolicy{}
r.Instance = &opsterv1.OpenSearchISMPolicy{}
err := r.Get(ctx, req.NamespacedName, r.Instance)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
Expand Down Expand Up @@ -66,7 +66,7 @@ func (r *OpensearchISMPolicyReconciler) Reconcile(ctx context.Context, req ctrl.
// SetupWithManager sets up the controller with the Manager.
func (r *OpensearchISMPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&opsterv1.ISMPolicy{}).
For(&opsterv1.OpenSearchISMPolicy{}).
Owns(&opsterv1.OpenSearchCluster{}). // Get notified when opensearch clusters change
Complete(r)
}
55 changes: 41 additions & 14 deletions opensearch-operator/pkg/reconcilers/ismpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type IsmPolicyReconciler struct {
ctx context.Context
osClient *services.OsClusterClient
recorder record.EventRecorder
instance *opsterv1.ISMPolicy
instance *opsterv1.OpenSearchISMPolicy
cluster *opsterv1.OpenSearchCluster
logger logr.Logger
}
Expand All @@ -38,7 +38,7 @@ func NewIsmReconciler(
ctx context.Context,
client client.Client,
recorder record.EventRecorder,
instance *opsterv1.ISMPolicy,
instance *opsterv1.OpenSearchISMPolicy,
opts ...ReconcilerOption,
) *IsmPolicyReconciler {
options := ReconcilerOptions{}
Expand All @@ -55,6 +55,7 @@ func NewIsmReconciler(

func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error) {
var reason string
var policyId string
defer func() {
if !pointer.BoolDeref(r.updateStatus, true) {
return
Expand All @@ -76,6 +77,7 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error)
// Requeue is after 30 seconds for normal reconciliation after creation/update
if retErr == nil && retResult.RequeueAfter == 30*time.Second {
r.instance.Status.State = opsterv1.OpensearchISMPolicyCreated
r.instance.Status.PolicyId = policyId
}
if reason == ismPolicyExists {
r.instance.Status.State = opsterv1.OpensearchISMPolicyIgnored
Expand Down Expand Up @@ -117,6 +119,21 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error)
r.recorder.Event(r.instance, "Warning", opensearchRefMismatch, reason)
return
}
} else {
if pointer.BoolDeref(r.updateStatus, true) {
retErr = retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := r.Get(r.ctx, client.ObjectKeyFromObject(r.instance), r.instance); err != nil {
return err
}
r.instance.Status.ManagedCluster = &r.cluster.UID
return r.Status().Update(r.ctx, r.instance)
})
if retErr != nil {
reason = fmt.Sprintf("failed to update status: %s", retErr)
r.recorder.Event(r.instance, "Warning", statusError, reason)
return
}
}
}
// Check cluster is ready
if r.cluster.Status.Phase != opsterv1.PhaseRunning {
Expand All @@ -135,14 +152,16 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error)
reason := "error creating opensearch client"
r.recorder.Event(r.instance, "Warning", opensearchError, reason)
}

// If PolicyID not provided explicitly, use metadata.name by default
policyId = r.instance.Spec.PolicyID
if r.instance.Spec.PolicyID == "" {
r.instance.Spec.PolicyID = r.instance.Name
policyId = r.instance.Name
}
// Check ism policy state to make sure we don't touch preexisting ism policy
if r.instance.Status.ExistingISMPolicy == nil {
var exists bool
exists, retErr = services.PolicyExists(r.ctx, r.osClient, r.instance.Spec.PolicyID)
exists, retErr = services.PolicyExists(r.ctx, r.osClient, policyId)
if retErr != nil {
reason = "failed to get policy status from Opensearch API"
r.logger.Error(retErr, reason)
Expand Down Expand Up @@ -174,6 +193,7 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error)
reason = ismPolicyExists
return
}

ismpolicy, retErr := r.CreateISMPolicyRequest()
if retErr != nil {
reason = "failed to get create the ism policy request"
Expand All @@ -182,7 +202,7 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error)
return
}

ismResponse, retErr := services.GetPolicy(r.ctx, r.osClient, r.instance.Spec.PolicyID)
existingPolicy, retErr := services.GetPolicy(r.ctx, r.osClient, policyId)
if retErr != nil && retErr != services.ErrNotFound {
reason = "failed to get policy from Opensearch API"
r.logger.Error(retErr, reason)
Expand All @@ -191,7 +211,7 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error)
}
if errors.Is(retErr, services.ErrNotFound) {
r.logger.V(1).Info(fmt.Sprintf("policy %s not found, creating.", r.instance.Spec.PolicyID))
retErr = services.CreateISMPolicy(r.ctx, r.osClient, *ismpolicy, r.instance.Spec.PolicyID)
retErr = services.CreateISMPolicy(r.ctx, r.osClient, *ismpolicy, policyId)
if retErr != nil {
reason = "failed to create ism policy"
r.logger.Error(retErr, reason)
Expand All @@ -204,12 +224,12 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error)
if err != nil {
return
}
priterm := ismResponse.PrimaryTerm
seqno := ismResponse.SequenceNumber
priterm := existingPolicy.PrimaryTerm
seqno := existingPolicy.SequenceNumber
// Reset
ismResponse.PrimaryTerm = nil
ismResponse.SequenceNumber = nil
shouldUpdate, retErr := services.ShouldUpdateISMPolicy(r.ctx, *ismpolicy, *ismResponse)
existingPolicy.PrimaryTerm = nil
existingPolicy.SequenceNumber = nil
shouldUpdate, retErr := services.ShouldUpdateISMPolicy(r.ctx, *ismpolicy, *existingPolicy)
if retErr != nil {
reason = "failed to compare the policies"
r.logger.Error(retErr, reason)
Expand All @@ -222,12 +242,14 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error)
return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, retErr
}

if r.instance.Spec.PolicyID != ismResponse.PolicyID {
// the policyId is immutable, so check the old name (r.instance.Status.PolicyId) against the new
if r.instance.Status.PolicyId != "" && policyId != r.instance.Status.PolicyId {
reason = "can't change PolicyID"
r.recorder.Event(r.instance, "Warning", opensearchError, reason)
return

}
retErr = services.UpdateISMPolicy(r.ctx, r.osClient, *ismpolicy, seqno, priterm, r.instance.Spec.PolicyID)
retErr = services.UpdateISMPolicy(r.ctx, r.osClient, *ismpolicy, seqno, priterm, policyId)
if retErr != nil {
reason = "failed to update ism policy with Opensearch API"
r.logger.Error(retErr, reason)
Expand Down Expand Up @@ -523,7 +545,12 @@ func (r *IsmPolicyReconciler) Delete() error {
if err != nil {
return err
}
err = services.DeleteISMPolicy(r.ctx, r.osClient, r.instance.Spec.PolicyID)
// If PolicyID not provided explicitly, use metadata.name by default
policyId := r.instance.Spec.PolicyID
if r.instance.Spec.PolicyID == "" {
policyId = r.instance.Name
}
err = services.DeleteISMPolicy(r.ctx, r.osClient, policyId)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit d76b701

Please sign in to comment.