Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Imporve topic reconcile logic #225

Merged
merged 6 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/v1alpha1/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package v1alpha1
const (
// ConditionReady indicates status condition ready
ConditionReady string = "Ready"
// ConditionTopicPolicyReady indicates the topic policy ready
ConditionTopicPolicyReady string = "PolicyReady"
// FinalizerName is the finalizer string that add to object
FinalizerName string = "cloud.streamnative.io/finalizer"

Expand Down
1 change: 1 addition & 0 deletions api/v1alpha1/pulsartopic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type PulsarTopicStatus struct {
//+kubebuilder:printcolumn:name="GENERATION",type=string,JSONPath=`.metadata.generation`
//+kubebuilder:printcolumn:name="OBSERVED_GENERATION",type=string,JSONPath=`.status.observedGeneration`
//+kubebuilder:printcolumn:name="READY",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].status`
//+kubebuilder:printcolumn:name="POLICY_READY",type=string,JSONPath=`.status.conditions[?(@.type=="PolicyReady")].status`

// PulsarTopic is the Schema for the pulsartopics API
type PulsarTopic struct {
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/resource.streamnative.io_pulsartopics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ spec:
- jsonPath: .status.conditions[?(@.type=="Ready")].status
name: READY
type: string
- jsonPath: .status.conditions[?(@.type=="PolicyReady")].status
name: POLICY_READY
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down
4 changes: 2 additions & 2 deletions pkg/admin/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (d *DummyPulsarAdmin) SetNamespaceClusters(string, []string) error {
}

// ApplyTopic is a fake implements of ApplyTopic
func (d *DummyPulsarAdmin) ApplyTopic(string, *TopicParams) error {
return nil
func (d *DummyPulsarAdmin) ApplyTopic(string, *TopicParams) (error, error) {
return nil, nil
}

// DeleteTopic is a fake implements of DeleteTopic
Expand Down
12 changes: 6 additions & 6 deletions pkg/admin/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,32 +124,32 @@ func (p *PulsarAdminClient) SetNamespaceClusters(completeNSName string, clusters
}

// ApplyTopic creates a topic with policies
func (p *PulsarAdminClient) ApplyTopic(name string, params *TopicParams) error {
func (p *PulsarAdminClient) ApplyTopic(name string, params *TopicParams) (creationErr error, policyErr error) {
completeTopicName := makeCompleteTopicName(name, params.Persistent)
topicName, err := utils.GetTopicName(completeTopicName)
if err != nil {
return err
return err, nil
}
partitionNum := int(*params.Partitions)
err = p.adminClient.Topics().Create(*topicName, partitionNum)
if err != nil {
if !IsAlreadyExist(err) {
return err
return err, nil
}
if partitionNum > 0 {
// for partitioned topic, allow to change the partition number
if err = p.adminClient.Topics().Update(*topicName, partitionNum); err != nil {
return err
return nil, err
}
}
}

err = p.applyTopicPolicies(topicName, params)
if err != nil {
return err
return nil, err
}

return nil
return nil, nil
}

// DeleteTenant deletes a specific tenant
Expand Down
2 changes: 1 addition & 1 deletion pkg/admin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type PulsarAdmin interface {
SetNamespaceClusters(name string, clusters []string) error

// ApplyTopic creates a topic with parameters
ApplyTopic(name string, params *TopicParams) error
ApplyTopic(name string, params *TopicParams) (error, error)

// DeleteTopic delete a specific topic
DeleteTopic(name string) error
Expand Down
105 changes: 78 additions & 27 deletions pkg/connection/reconcile_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/streamnative/pulsar-resources-operator/pkg/feature"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -61,7 +62,7 @@ func (r *PulsarTopicReconciler) Observe(ctx context.Context) error {

r.conn.topics = topicsList.Items
for i := range r.conn.topics {
if !resourcev1alpha1.IsPulsarResourceReady(&r.conn.topics[i]) {
if !isPulsarTopicResourceReady(&r.conn.topics[i]) {
r.conn.addUnreadyResource(&r.conn.topics[i])
}
}
Expand All @@ -75,7 +76,9 @@ func (r *PulsarTopicReconciler) Reconcile(ctx context.Context) error {
for i := range r.conn.topics {
topic := &r.conn.topics[i]
if err := r.ReconcileTopic(ctx, r.conn.pulsarAdmin, topic); err != nil {
return fmt.Errorf("reconcile topic [%w]", err)
// return error will stop the other reconcile process
r.log.Error(err, "Failed to reconcile topic", "topicName", topic.Spec.Name)
continue
}
}
return nil
Expand Down Expand Up @@ -134,21 +137,48 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin
}
}

if resourcev1alpha1.IsPulsarResourceReady(topic) &&
if isPulsarTopicResourceReady(topic) &&
!feature.DefaultFeatureGate.Enabled(feature.AlwaysUpdatePulsarResource) {
log.Info("Skip reconcile, topic resource is ready")
return nil
}

var policyErrs []error
var creationErr error
defer func() {
if creationErr != nil {
meta.SetStatusCondition(&topic.Status.Conditions,
NewTopicErrorCondition(topic.Generation, resourcev1alpha1.ConditionReady, creationErr.Error()))
} else {
meta.SetStatusCondition(&topic.Status.Conditions,
NewTopicReadyCondition(topic.Generation, resourcev1alpha1.ConditionReady))
}
if len(policyErrs) != 0 || creationErr != nil {
msg := ""
for _, err := range policyErrs {
msg += err.Error() + ";\n"
}
meta.SetStatusCondition(&topic.Status.Conditions,
NewTopicErrorCondition(topic.Generation, resourcev1alpha1.ConditionTopicPolicyReady, msg))
} else {
meta.SetStatusCondition(&topic.Status.Conditions,
NewTopicReadyCondition(topic.Generation, resourcev1alpha1.ConditionTopicPolicyReady))
}

if err := r.conn.client.Status().Update(ctx, topic); err != nil {
log.Error(err, "Failed to update status")
}
}()

params := createTopicParams(topic)

r.applyDefault(params)

if refs := topic.Spec.GeoReplicationRefs; len(refs) != 0 {
for _, ref := range refs {
if err := r.applyGeo(ctx, params, ref, topic); err != nil {
log.Error(err, "Failed to get destination connection for geo replication")
return err
log.Error(err, "Failed to get destination connection for geo replication "+ref.Name)
policyErrs = append(policyErrs, err)
}
}

Expand All @@ -161,16 +191,23 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin
topic.Status.GeoReplicationEnabled = false
}

if err := pulsarAdmin.ApplyTopic(topic.Spec.Name, params); err != nil {
meta.SetStatusCondition(&topic.Status.Conditions, *NewErrorCondition(topic.Generation, err.Error()))
log.Error(err, "Failed to apply topic")
if err := r.conn.client.Status().Update(ctx, topic); err != nil {
log.Error(err, "Failed to update the topic status")
return nil
}
return err
creationErr, policyErr := pulsarAdmin.ApplyTopic(topic.Spec.Name, params)
if policyErr != nil {
policyErrs = append(policyErrs, policyErr)
}
if creationErr != nil {
return creationErr
}

if err := applySchema(pulsarAdmin, topic, log); err != nil {
policyErrs = append(policyErrs, err)
}

topic.Status.ObservedGeneration = topic.Generation
return nil
}

func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTopic, log logr.Logger) error {
schema, serr := pulsarAdmin.GetSchema(topic.Spec.Name)
if serr != nil && !admin.IsNotFound(serr) {
return serr
Expand All @@ -186,29 +223,16 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin
}
log.Info("Upload schema for the topic", "name", topic.Spec.Name, "type", info.Type, "schema", info.Schema, "properties", info.Properties)
if err := pulsarAdmin.UploadSchema(topic.Spec.Name, param); err != nil {
log.Error(err, "Failed to upload schema")
if err := r.conn.client.Status().Update(ctx, topic); err != nil {
log.Error(err, "Failed to upload schema for the topic")
return nil
}
return err
}
}
} else if schema != nil {
// Delete the schema when the schema exists and schema info is empty
log.Info("Deleting topic schema", "name", topic.Spec.Name)
err := pulsarAdmin.DeleteSchema(topic.Spec.Name)
if err != nil {
if err := pulsarAdmin.DeleteSchema(topic.Spec.Name); err != nil {
return err
}
}

topic.Status.ObservedGeneration = topic.Generation
meta.SetStatusCondition(&topic.Status.Conditions, *NewReadyCondition(topic.Generation))
if err := r.conn.client.Status().Update(ctx, topic); err != nil {
log.Error(err, "Failed to update the topic status")
return err
}
return nil
}

Expand Down Expand Up @@ -263,3 +287,30 @@ func (r *PulsarTopicReconciler) applyGeo(ctx context.Context, params *admin.Topi
params.ReplicationClusters = append(params.ReplicationClusters, r.conn.connection.Spec.ClusterName)
return nil
}

func isPulsarTopicResourceReady(topic *resourcev1alpha1.PulsarTopic) bool {
condition := meta.FindStatusCondition(topic.Status.Conditions, resourcev1alpha1.ConditionTopicPolicyReady)
return resourcev1alpha1.IsPulsarResourceReady(topic) && condition != nil && condition.Status == metav1.ConditionTrue
}

// NewTopicReadyCondition make condition with ready info
func NewTopicReadyCondition(generation int64, conditionType string) metav1.Condition {
return metav1.Condition{
Type: conditionType,
Status: metav1.ConditionTrue,
ObservedGeneration: generation,
Reason: "Reconciled",
Message: "",
}
}

// NewTopicErrorCondition make condition with ready info
func NewTopicErrorCondition(generation int64, conditionType, msg string) metav1.Condition {
return metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
ObservedGeneration: generation,
Reason: "ReconcileError",
Message: msg,
}
}
Loading