Skip to content

Commit

Permalink
move kafka.go to its own package
Browse files Browse the repository at this point in the history
  • Loading branch information
nasark committed Oct 26, 2023
1 parent 7b075a7 commit 24f0ffa
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package miqtools
package miqkafka

import (
miqv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1"
Expand Down
49 changes: 12 additions & 37 deletions manageiq-operator/internal/controller/manageiq_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

miqv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1"
cr_migration "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/cr_migration"
miqkafka "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka"
miqtool "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/miq-components"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -524,61 +525,35 @@ func (r *ManageIQReconciler) generatePostgresqlResources(cr *miqv1alpha1.ManageI
}

func (r *ManageIQReconciler) generateKafkaResources(cr *miqv1alpha1.ManageIQ) error {
kafkaSubscription, mutateFunc := miqtool.KafkaInstall(cr, r.Scheme)
kafkaSubscription, mutateFunc := miqkafka.KafkaInstall(cr, r.Scheme)
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaSubscription, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Kafka Subscription has been reconciled", "result", result)
}

kafkaClusterCR, mutateFunc := miqtool.KafkaCluster(cr, r.Scheme)
kafkaClusterCR, mutateFunc := miqkafka.KafkaCluster(cr, r.Scheme)
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaClusterCR, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Kafka Cluster has been reconciled", "result", result)
}

kafkaUserCR, mutateFunc := miqtool.KafkaUser(cr, r.Scheme)
kafkaUserCR, mutateFunc := miqkafka.KafkaUser(cr, r.Scheme)
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaUserCR, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Kafka User has been reconciled", "result", result)
}

kafkaService, mutateFunc := miqtool.KafkaService(cr, r.Scheme)
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaService, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Service has been reconciled", "component", "kafka", "result", result)
}

zookeeperService, mutateFunc := miqtool.ZookeeperService(cr, r.Scheme)
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, zookeeperService, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Service has been reconciled", "component", "zookeeper", "result", result)
}

kafkaDeployment, mutateFunc, err := miqtool.KafkaDeployment(cr, r.Client, r.Scheme)
if err != nil {
return err
}

if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaDeployment, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Deployment has been reconciled", "component", "kafka", "result", result)
}

zookeeperDeployment, mutateFunc, err := miqtool.ZookeeperDeployment(cr, r.Client, r.Scheme)
if err != nil {
return err
}

if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, zookeeperDeployment, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Deployment has been reconciled", "component", "zookeeper", "result", result)
topics := []string{"messaging-health-check", "manageiq.ems", "manageiq.ems-events", "manageiq.ems-inventory", "manageiq.metrics"}
for i := 0; i < len(topics); i++ {
kafkaTopicCR, mutateFunc := miqkafka.KafkaTopic(cr, r.Scheme, topics[i])
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaTopicCR, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info(fmt.Sprintf("Kafka topic %s has been reconciled", topics[i]))
}
}

return nil
Expand Down

0 comments on commit 24f0ffa

Please sign in to comment.