From 7b075a77679685f5884e6e22e6a21f1deae4b30b Mon Sep 17 00:00:00 2001 From: Nasar Khan Date: Mon, 23 Oct 2023 16:35:22 -0400 Subject: [PATCH] replace bitnami with strimzi kafka --- .../v1alpha1/helpers/miq-components/kafka.go | 354 ++++++++++++++---- .../miq-components/network_policies.go | 2 +- .../helpers/miq-components/orchestrator.go | 26 +- manageiq-operator/config/rbac/role.yaml | 26 ++ .../controller/manageiq_controller.go | 26 +- 5 files changed, 327 insertions(+), 107 deletions(-) diff --git a/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka.go b/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka.go index 2fa01646b..8fdd5cb25 100644 --- a/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka.go +++ b/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka.go @@ -1,8 +1,6 @@ package miqtools import ( - "context" - miqv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1" miqutilsv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/miqutils" appsv1 "k8s.io/api/apps/v1" @@ -10,9 +8,7 @@ import ( resource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - intstr "k8s.io/apimachinery/pkg/util/intstr" - "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) @@ -237,59 +233,191 @@ func KafkaDeployment(cr *miqv1alpha1.ManageIQ, client client.Client, scheme *run }, }, }, - corev1.EnvVar{ - Name: "KAFKA_BROKER_PASSWORD", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, - Key: "password", + "config": map[string]interface{}{ + "offsets.topic.replication.factor": 1, + "transaction.state.log.replication.factor": 1, + "transaction.state.log.min.isr": 1, + "default.replication.factor": 1, + "min.insync.replicas": 1, + }, + "template": map[string]interface{}{ + "pod": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "runAsNonRoot": true, + }, + }, + "kafkaContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, }, }, }, - corev1.EnvVar{ - Name: "KAFKA_ZOOKEEPER_CONNECT", - Value: "zookeeper:2181", + "storage": map[string]interface{}{ + "type": "persistent-claim", + "deleteClaim": true, }, - corev1.EnvVar{ - Name: "ALLOW_PLAINTEXT_LISTENER", - Value: "yes", + "authorization": map[string]interface{}{ + "type": "simple", }, - corev1.EnvVar{ - Name: "KAFKA_CFG_ADVERTISED_LISTENERS", - Value: "PLAINTEXT://kafka:9092", + "resources": map[string]interface{}{ + "requests": map[string]interface{}{}, + "limits": map[string]interface{}{}, }, }, - VolumeMounts: []corev1.VolumeMount{ - corev1.VolumeMount{Name: "kafka-data", MountPath: "/bitnami/kafka"}, - }, - } - - err := addResourceReqs(cr.Spec.KafkaMemoryLimit, cr.Spec.KafkaMemoryRequest, cr.Spec.KafkaCpuLimit, cr.Spec.KafkaCpuRequest, &container) - if err != nil { - return nil, nil, err - } - - deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "kafka", - Namespace: cr.ObjectMeta.Namespace, + "zookeeper": map[string]interface{}{ + "replicas": 1, + "template": map[string]interface{}{ + "pod": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "runAsNonRoot": true, + }, + }, + "zookeeperContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + }, + "storage": map[string]interface{}{ + "type": "persistent-claim", + "deleteClaim": true, + }, + "resources": map[string]interface{}{ + "requests": map[string]interface{}{}, + "limits": map[string]interface{}{}, + }, }, - Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: deploymentLabels, + "entityOperator": map[string]interface{}{ + "template": map[string]interface{}{ + "pod": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "runAsNonRoot": true, + }, + }, + "topicOperatorContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + "userOperatorContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + "tlsSidecarContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + }, + "tlsSidecar": map[string]interface{}{ + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": "500m", + "memory": "128Mi", + }, + "limits": map[string]interface{}{ + "cpu": "500m", + "memory": "128Mi", + }, + }, + }, + "userOperator": map[string]interface{}{ + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, + "limits": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, + }, }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: deploymentLabels, - Name: "kafka", + "topicOperator": map[string]interface{}{ + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, + "limits": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, }, - Spec: corev1.PodSpec{}, }, }, + // "clusterCa": map[string]interface{}{ + // "generateCertificateAuthority": false, + // }, + } +} + +func KafkaCluster(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*unstructured.Unstructured, controllerutil.MutateFn) { + kafkaClusterCR := &unstructured.Unstructured{} + kafkaClusterCR.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "kafka.strimzi.io", + Kind: "Kafka", + Version: "v1beta2", + }) + kafkaClusterCR.SetName("manageiq") + kafkaClusterCR.SetNamespace(cr.Namespace) + + kafkaCRSpec := KafkaClusterSpec() + + if cr.Spec.StorageClassName != "" { + kafkaStorage := kafkaCRSpec["kafka"].(map[string]interface{})["storage"].(map[string]interface{}) + kafkaStorage["class"] = cr.Spec.StorageClassName + zookeeperStorage := kafkaCRSpec["zookeeper"].(map[string]interface{})["storage"].(map[string]interface{}) + zookeeperStorage["class"] = cr.Spec.StorageClassName } - f := func() error { - if err := controllerutil.SetControllerReference(cr, deployment, scheme); err != nil { + kafkaResourceRequests := kafkaCRSpec["kafka"].(map[string]interface{})["resources"].(map[string]interface{})["requests"].(map[string]interface{}) + kafkaResourceRequests["memory"] = "1Gi" + kafkaResourceRequests["cpu"] = "200m" + kafkaResourceLimits := kafkaCRSpec["kafka"].(map[string]interface{})["resources"].(map[string]interface{})["limits"].(map[string]interface{}) + kafkaResourceLimits["memory"] = "2Gi" + kafkaResourceLimits["cpu"] = "400m" + + zookeeperResourceRequests := kafkaCRSpec["zookeeper"].(map[string]interface{})["resources"].(map[string]interface{})["requests"].(map[string]interface{}) + zookeeperResourceRequests["memory"] = "256Mi" + zookeeperResourceRequests["cpu"] = "150m" + zookeeperResourceLimits := kafkaCRSpec["zookeeper"].(map[string]interface{})["resources"].(map[string]interface{})["limits"].(map[string]interface{}) + zookeeperResourceLimits["memory"] = "512Mi" + zookeeperResourceLimits["cpu"] = "250m" + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaClusterCR, scheme); err != nil { return err } addAppLabel(cr.Spec.AppName, &deployment.ObjectMeta) @@ -321,7 +449,7 @@ func KafkaDeployment(cr *miqv1alpha1.ManageIQ, client client.Client, scheme *run return nil } - return deployment, f, nil + return kafkaClusterCR, mutateFunc } func ZookeeperDeployment(cr *miqv1alpha1.ManageIQ, client client.Client, scheme *runtime.Scheme) (*appsv1.Deployment, controllerutil.MutateFn, error) { @@ -339,43 +467,48 @@ func ZookeeperDeployment(cr *miqv1alpha1.ManageIQ, client client.Client, scheme ContainerPort: 2181, }, }, - Env: []corev1.EnvVar{ - corev1.EnvVar{ - Name: "ALLOW_ANONYMOUS_LOGIN", - Value: "yes", + "authorization": map[string]interface{}{ + "type": "simple", + "acls": []map[string]interface{}{ + map[string]interface{}{ + "resource": map[string]interface{}{ + "type": "topic", + "name": "*", + "patternType": "literal", + }, + "operations": []string{"All"}, + "host": "*", + }, + map[string]interface{}{ + "resource": map[string]interface{}{ + "type": "group", + "name": "*", + "patternType": "literal", + }, + "operations": []string{"All"}, + "host": "*", + }, }, }, - VolumeMounts: []corev1.VolumeMount{ - corev1.VolumeMount{Name: "zookeeper-data", MountPath: "/bitnami/zookeeper"}, - }, } +} - err := addResourceReqs(cr.Spec.ZookeeperMemoryLimit, cr.Spec.ZookeeperMemoryRequest, cr.Spec.ZookeeperCpuLimit, cr.Spec.ZookeeperCpuRequest, &container) - if err != nil { - return nil, nil, err - } +func KafkaUser(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*unstructured.Unstructured, controllerutil.MutateFn) { + kafkaUserCR := &unstructured.Unstructured{} - deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "zookeeper", - Namespace: cr.ObjectMeta.Namespace, - }, - Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: deploymentLabels, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: deploymentLabels, - Name: "zookeeper", - }, - Spec: corev1.PodSpec{}, - }, - }, - } + kafkaUserCR.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "kafka.strimzi.io", + Kind: "KafkaUser", + Version: "v1beta2", + }) + kafkaUserCR.SetName("manageiq-user") + kafkaUserCR.SetNamespace(cr.Namespace) + kafkaUserCR.SetLabels(map[string]string{"strimzi.io/cluster": "manageiq"}) - f := func() error { - if err := controllerutil.SetControllerReference(cr, deployment, scheme); err != nil { + kafkaUserSpec := KafkaUserSpec() + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaUserCR, scheme); err != nil { return err } addAppLabel(cr.Spec.AppName, &deployment.ObjectMeta) @@ -406,5 +539,72 @@ func ZookeeperDeployment(cr *miqv1alpha1.ManageIQ, client client.Client, scheme return nil } - return deployment, f, nil + return kafkaUserCR, mutateFunc +} + +func KafkaTopicSpec() (map[string]interface{}) { + return map[string]interface{}{ + "partitions": 1, + "config": map[string]interface{}{ + "retention.ms": 7200000, + "segment.bytes": 1073741824, + }, + } +} + +func KafkaTopic(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme, topicName string) (*unstructured.Unstructured, controllerutil.MutateFn) { + kafkaTopicCR := &unstructured.Unstructured{} + + kafkaTopicCR.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "kafka.strimzi.io", + Kind: "KafkaTopic", + Version: "v1beta2", + }) + kafkaTopicCR.SetName(topicName) + kafkaTopicCR.SetNamespace(cr.Namespace) + kafkaTopicCR.SetLabels(map[string]string{"strimzi.io/cluster": "manageiq"}) + + kafkaTopicSpec := KafkaTopicSpec() + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaTopicCR, scheme); err != nil { + return err + } + + kafkaTopicCR.UnstructuredContent()["spec"] = kafkaTopicSpec + + return nil + } + + return kafkaTopicCR, mutateFunc +} + +func KafkaInstall(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*unstructured.Unstructured, controllerutil.MutateFn) { + kafkaSubscription := &unstructured.Unstructured{} + kafkaSubscription.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "operators.coreos.com", + Kind: "Subscription", + Version: "v1alpha1", + }) + kafkaSubscription.SetName("strimzi-kafka-operator") + kafkaSubscription.SetNamespace(cr.Namespace) + + kafkaSubscriptionSpec := map[string]interface{}{ + "channel": "strimzi-0.35.x", + "name": "strimzi-kafka-operator", + "source": "community-operators", + "sourceNamespace": "openshift-marketplace", + } + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaSubscription, scheme); err != nil { + return err + } + + kafkaSubscription.UnstructuredContent()["spec"] = kafkaSubscriptionSpec + + return nil + } + + return kafkaSubscription, mutateFunc } diff --git a/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go b/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go index 44aba3488..48e555cd1 100644 --- a/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go +++ b/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go @@ -227,7 +227,7 @@ func NetworkPolicyAllowKafka(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme, c addAppLabel(cr.Spec.AppName, &networkPolicy.ObjectMeta) setIngressPolicyType(networkPolicy) - networkPolicy.Spec.PodSelector.MatchLabels = map[string]string{"name": "kafka"} + networkPolicy.Spec.PodSelector.MatchLabels = map[string]string{"strimzi.io/pod-name": "manageiq-kafka-0"} pod := orchestratorPod(*c) if pod == nil { diff --git a/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go b/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go index 7fc124acb..37a838b21 100644 --- a/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go +++ b/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go @@ -119,25 +119,20 @@ func addMessagingEnv(cr *miqv1alpha1.ManageIQ, c *corev1.Container) { messagingEnv := []corev1.EnvVar{ corev1.EnvVar{ Name: "MESSAGING_HOSTNAME", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, - Key: "hostname", - }, - }, + Value: "manageiq-kafka-bootstrap", }, corev1.EnvVar{ Name: "MESSAGING_PASSWORD", ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, + LocalObjectReference: corev1.LocalObjectReference{Name: "manageiq-user"}, Key: "password", }, }, }, corev1.EnvVar{ Name: "MESSAGING_PORT", - Value: "9092", + Value: "9093", }, corev1.EnvVar{ Name: "MESSAGING_TYPE", @@ -145,12 +140,15 @@ func addMessagingEnv(cr *miqv1alpha1.ManageIQ, c *corev1.Container) { }, corev1.EnvVar{ Name: "MESSAGING_USERNAME", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, - Key: "username", - }, - }, + Value: "manageiq-user", + }, + corev1.EnvVar{ + Name: "MESSAGING_SASL_MECHANISM", + Value: "SCRAM-SHA-512", + }, + corev1.EnvVar{ + Name: "MESSAGING_SSL_CA", + Value: "/etc/pki/ca-trust/source/anchors/root.crt", }, } diff --git a/manageiq-operator/config/rbac/role.yaml b/manageiq-operator/config/rbac/role.yaml index 67c9af75d..5dd47c48c 100644 --- a/manageiq-operator/config/rbac/role.yaml +++ b/manageiq-operator/config/rbac/role.yaml @@ -76,6 +76,32 @@ rules: - patch - update - watch +- apiGroups: + - kafka.strimzi.io + resources: + - kafkas + - kafkatopics + - kafkausers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - operators.coreos.com + resources: + - subscriptions + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - manageiq.org resources: diff --git a/manageiq-operator/internal/controller/manageiq_controller.go b/manageiq-operator/internal/controller/manageiq_controller.go index 9bf358328..13dadaa07 100644 --- a/manageiq-operator/internal/controller/manageiq_controller.go +++ b/manageiq-operator/internal/controller/manageiq_controller.go @@ -52,11 +52,13 @@ type ManageIQReconciler struct { //+kubebuilder:rbac:namespace=changeme,groups=apps,resources=deployments/finalizers,resourceNames=manageiq-operator,verbs=update //+kubebuilder:rbac:namespace=changeme,groups=coordination.k8s.io,resources=leases,verbs=get;list;create;update;delete //+kubebuilder:rbac:namespace=changeme,groups=extensions,resources=deployments;deployments/scale;networkpolicies,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:namespace=changeme,groups=kafka.strimzi.io,resources=kafkas;kafkausers;kafkatopics,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=manageiq.org,resources=manageiqs,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=manageiq.org,resources=manageiqs/finalizers,verbs=update //+kubebuilder:rbac:namespace=changeme,groups=manageiq.org,resources=manageiqs/status,verbs=get;update;patch //+kubebuilder:rbac:namespace=changeme,groups=monitoring.coreos.com,resources=servicemonitors,verbs=get;create //+kubebuilder:rbac:namespace=changeme,groups=networking.k8s.io,resources=ingresses;networkpolicies,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:namespace=changeme,groups=operators.coreos.com,resources=subscriptions,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=rbac.authorization.k8s.io,resources=rolebindings;roles,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=route.openshift.io,resources=routes;routes/custom-host,verbs=get;list;watch;create;update;patch;delete @@ -522,31 +524,25 @@ func (r *ManageIQReconciler) generatePostgresqlResources(cr *miqv1alpha1.ManageI } func (r *ManageIQReconciler) generateKafkaResources(cr *miqv1alpha1.ManageIQ) error { - secret, mutateFunc := miqtool.ManageKafkaSecret(cr, r.Client, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, secret, mutateFunc); err != nil { + kafkaSubscription, mutateFunc := miqtool.KafkaInstall(cr, r.Scheme) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaSubscription, mutateFunc); err != nil { return err } else if result != controllerutil.OperationResultNone { - logger.Info("Secret has been reconciled", "component", "kafka", "result", result) - } - - hostName := getSecretKeyValue(r.Client, cr.Namespace, cr.Spec.KafkaSecret, "hostname") - if hostName != "" { - logger.Info("External Kafka Messaging Service selected, skipping kafka and zookeeper service reconciliation", "hostname", hostName) - return nil + logger.Info("Kafka Subscription has been reconciled", "result", result) } - kafkaPVC, mutateFunc := miqtool.KafkaPVC(cr, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaPVC, mutateFunc); err != nil { + kafkaClusterCR, mutateFunc := miqtool.KafkaCluster(cr, r.Scheme) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaClusterCR, mutateFunc); err != nil { return err } else if result != controllerutil.OperationResultNone { - logger.Info("PVC has been reconciled", "component", "kafka", "result", result) + logger.Info("Kafka Cluster has been reconciled", "result", result) } - zookeeperPVC, mutateFunc := miqtool.ZookeeperPVC(cr, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, zookeeperPVC, mutateFunc); err != nil { + kafkaUserCR, mutateFunc := miqtool.KafkaUser(cr, r.Scheme) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaUserCR, mutateFunc); err != nil { return err } else if result != controllerutil.OperationResultNone { - logger.Info("PVC has been reconciled", "component", "zookeeper", "result", result) + logger.Info("Kafka User has been reconciled", "result", result) } kafkaService, mutateFunc := miqtool.KafkaService(cr, r.Scheme)