From 691bb80248f2b9bcdf987b2231a27d7190a63e9e Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Mon, 4 Dec 2023 15:30:40 +0800 Subject: [PATCH] feat(deploy): add config map to specify the xline bootup args Signed-off-by: Phoeniix Zhao --- api/v1alpha1/xlinecluster_types.go | 5 +++ api/v1alpha1/zz_generated.deepcopy.go | 7 ++++ ...e.kvstore.datenlord.com_xlineclusters.yaml | 5 +++ internal/reconciler/cluster_reconciler.go | 6 +++ internal/transformer/xlinecluster_resource.go | 42 +++++++++++++++++++ internal/util/kubeutil.go | 23 ++++++++++ internal/util/kubeutil_test.go | 15 +++++++ tests/e2e/cases/ci.sh | 2 +- tests/e2e/cases/manifests/cluster.yml | 5 ++- 9 files changed, 108 insertions(+), 2 deletions(-) diff --git a/api/v1alpha1/xlinecluster_types.go b/api/v1alpha1/xlinecluster_types.go index 820bd54e..fa5eb314 100644 --- a/api/v1alpha1/xlinecluster_types.go +++ b/api/v1alpha1/xlinecluster_types.go @@ -65,6 +65,10 @@ type XlineClusterSpec struct { // The replicas of xline nodes // +kubebuilder:validation:Minimum=3 Replicas int32 `json:"replicas"` + + // The config items of xline server + // +optional + Config map[string]string `json:"config"` } // XlineClusterStatus defines the observed state of XlineCluster @@ -78,6 +82,7 @@ type XlineClusterStatus struct { type XlineClusterOprStage string const ( + StageXlineConfigMap XlineClusterOprStage = "Xline/ConfigMap" StageXlineService XlineClusterOprStage = "Xline/Service" StageXlineStatefulSet XlineClusterOprStage = "Xline/Statefulset" StageComplete XlineClusterOprStage = "complete" diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 6a7cc92b..4b96d7df 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -128,6 +128,13 @@ func (in *XlineClusterSpec) DeepCopyInto(out *XlineClusterSpec) { *out = new(string) **out = **in } + if in.Config != nil { + in, out := &in.Config, &out.Config + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new XlineClusterSpec. diff --git a/config/crd/bases/xline.kvstore.datenlord.com_xlineclusters.yaml b/config/crd/bases/xline.kvstore.datenlord.com_xlineclusters.yaml index c105cb8e..8764c531 100644 --- a/config/crd/bases/xline.kvstore.datenlord.com_xlineclusters.yaml +++ b/config/crd/bases/xline.kvstore.datenlord.com_xlineclusters.yaml @@ -35,6 +35,11 @@ spec: spec: description: XlineClusterSpec defines the desired state of XlineCluster properties: + config: + additionalProperties: + type: string + description: The config items of xline server + type: object image: description: Xline cluster image type: string diff --git a/internal/reconciler/cluster_reconciler.go b/internal/reconciler/cluster_reconciler.go index f290d5d0..7b7ebc8a 100644 --- a/internal/reconciler/cluster_reconciler.go +++ b/internal/reconciler/cluster_reconciler.go @@ -68,6 +68,12 @@ func (r *ClusterStageRecResult) AsXlineClusterRecStatus() xapi.XlineClusterRecSt // reconcile xline cluster resources. func (r *XlineClusterReconciler) recXlineResources() ClusterStageRecResult { + // create an xline configmap + configMap := tran.MakeConfigMap(r.CR, r.Schema) + if err := r.CreateOrUpdate(configMap, &corev1.ConfigMap{}); err != nil { + return clusterStageFail(xapi.StageXlineConfigMap, err) + } + // create a xline service service := tran.MakeService(r.CR, r.Schema) if err := r.CreateOrUpdate(service, &corev1.Service{}); err != nil { diff --git a/internal/transformer/xlinecluster_resource.go b/internal/transformer/xlinecluster_resource.go index 8c6ce49a..7c4c79fe 100644 --- a/internal/transformer/xlinecluster_resource.go +++ b/internal/transformer/xlinecluster_resource.go @@ -5,6 +5,7 @@ import ( "strings" xapi "github.com/xline-kv/xline-operator/api/v1alpha1" + "github.com/xline-kv/xline-operator/internal/util" appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -13,6 +14,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) +func GetConfigMapKey(xlineClusterName types.NamespacedName) types.NamespacedName { + return types.NamespacedName{ + Namespace: xlineClusterName.Namespace, + Name: fmt.Sprintf("%s-config", xlineClusterName.Name), + } +} + func GetServiceKey(xlineClusterName types.NamespacedName) types.NamespacedName { return types.NamespacedName{ Namespace: xlineClusterName.Namespace, @@ -46,6 +54,33 @@ func GetMemberTopology(stsRef types.NamespacedName, svcName string, replicas int return strings.Join(members, ",") } +func defaultConfigMap(clusterName string) map[string]string { + return map[string]string{ + "init-leader": fmt.Sprintf("%s-sts-0", clusterName), + "log-level": "info", + "engine": "rocksdb", + "data-dir": "/usr/local/xline/data-dir", + "jwt-pubkey": "", + "jwt-prikey": "", + } +} + +func MakeConfigMap(cr *xapi.XlineCluster, scheme *runtime.Scheme) *corev1.ConfigMap { + defaultConf := defaultConfigMap(cr.ObjKey().Name) + data := util.IntersectAndMergeMaps(cr.Spec.Config, defaultConf) + configMapRef := GetConfigMapKey(cr.ObjKey()) + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: configMapRef.Name, + Namespace: configMapRef.Namespace, + Labels: GetXlineInstanceLabels(cr.ObjKey()), + }, + Data: data, + } + _ = controllerutil.SetOwnerReference(cr, configMap, scheme) + return configMap +} + func MakeService(cr *xapi.XlineCluster, scheme *runtime.Scheme) *corev1.Service { svcRef := GetServiceKey(cr.ObjKey()) svcLabel := GetXlineInstanceLabels(cr.ObjKey()) @@ -71,6 +106,7 @@ func MakeService(cr *xapi.XlineCluster, scheme *runtime.Scheme) *corev1.Service func MakeStatefulSet(cr *xapi.XlineCluster, scheme *runtime.Scheme) *appv1.StatefulSet { crName := types.NamespacedName{Namespace: cr.Namespace, Name: cr.Name} + configMapRef := GetConfigMapKey(cr.ObjKey()) stsRef := GetStatefulSetKey(crName) stsLabels := GetXlineInstanceLabels(crName) svcName := GetServiceKey(cr.ObjKey()).Name @@ -85,6 +121,12 @@ func MakeStatefulSet(cr *xapi.XlineCluster, scheme *runtime.Scheme) *appv1.State }, Env: []corev1.EnvVar{ {Name: "MEMBERS", Value: GetMemberTopology(stsRef, svcName, int(cr.Spec.Replicas))}, + {Name: "INIT_LEADER", ValueFrom: util.NewEnvVarConfigMapSource(configMapRef.Name, "init-leader")}, + {Name: "RUST_LOG", ValueFrom: util.NewEnvVarConfigMapSource(configMapRef.Name, "log-level")}, + {Name: "ENGINE", ValueFrom: util.NewEnvVarConfigMapSource(configMapRef.Name, "engine")}, + {Name: "DATA_DIR", ValueFrom: util.NewEnvVarConfigMapSource(configMapRef.Name, "data-dir")}, + {Name: "AUTH_PUBLIC_KEY", ValueFrom: util.NewEnvVarConfigMapSource(configMapRef.Name, "jwt-pubkey")}, + {Name: "AUTH_PRIVATE_KEY", ValueFrom: util.NewEnvVarConfigMapSource(configMapRef.Name, "jwt-prikey")}, }, } diff --git a/internal/util/kubeutil.go b/internal/util/kubeutil.go index fc4a04f1..314ac401 100644 --- a/internal/util/kubeutil.go +++ b/internal/util/kubeutil.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" ) @@ -34,3 +35,25 @@ func Md5HashOr(obj any, fallback string) string { } return hash } + +// IntersectAndMergeMaps patch the m1 on the m2 +func IntersectAndMergeMaps[K comparable, V any](m1, m2 map[K]V) map[K]V { + if len(m1) == 0 || m1 == nil || len(m2) == 0 || m2 == nil { + return m2 + } + for k := range m2 { + if value, ok := m1[k]; ok { + m2[k] = value + } + } + return m2 +} + +func NewEnvVarConfigMapSource(cmName string, key string) *corev1.EnvVarSource { + return &corev1.EnvVarSource{ + ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: cmName}, + Key: key, + }, + } +} diff --git a/internal/util/kubeutil_test.go b/internal/util/kubeutil_test.go index 1b076540..07c35b24 100644 --- a/internal/util/kubeutil_test.go +++ b/internal/util/kubeutil_test.go @@ -11,3 +11,18 @@ func TestK8sObjKeyStr(t *testing.T) { objkey := types.NamespacedName{Name: "xline", Namespace: "default"} assert.Equal(t, K8sObjKeyStr(objkey), "xline.default") } + +func TestIntersectAndMergeMaps(t *testing.T) { + m1 := map[string]string{} + m2 := map[string]string{"hello": "world"} + assert.Equal(t, IntersectAndMergeMaps(m1, m2), m2) + assert.Equal(t, IntersectAndMergeMaps(nil, m2), m2) + + m1 = map[string]string{"hello": "Sun"} + m2 = map[string]string{"hello": "Earth", "world": "Moon"} + assert.Equal(t, IntersectAndMergeMaps(m1, m2), map[string]string{"hello": "Sun", "world": "Moon"}) + + assert.Equal(t, IntersectAndMergeMaps(m1, nil), map[string]string(nil)) + assert.Equal(t, IntersectAndMergeMaps(m1, map[string]string{}), map[string]string{}) + +} diff --git a/tests/e2e/cases/ci.sh b/tests/e2e/cases/ci.sh index 74a724f5..29770def 100644 --- a/tests/e2e/cases/ci.sh +++ b/tests/e2e/cases/ci.sh @@ -101,7 +101,7 @@ function test::ci::_chaos() { done test::ci::_etcdctl_expect "$endpoints" "put B $i" "OK" || return $? test::ci::_etcdctl_expect "$endpoints" "get B" "B\n$i" || return $? - k8s::kubectl wait --for=jsonpath='{.status.readyReplicas}'="$size" sts/$_TEST_CI_CLUSTER_NAME --timeout=300s >/dev/null 2>&1 + k8s::kubectl wait --for=jsonpath='{.status.readyReplicas}'="$size" sts/$_TEST_CI_STS_NAME --timeout=300s >/dev/null 2>&1 log::info "wait for log synchronization" && sleep $_TEST_CI_LOG_SYNC_TIMEOUT done } diff --git a/tests/e2e/cases/manifests/cluster.yml b/tests/e2e/cases/manifests/cluster.yml index f7450c85..e0174e20 100644 --- a/tests/e2e/cases/manifests/cluster.yml +++ b/tests/e2e/cases/manifests/cluster.yml @@ -3,7 +3,10 @@ kind: XlineCluster metadata: name: my-xline-cluster spec: - version: v0.6.0 + # TODO: replace it with ghcr.io/xline-kv/xline:v0.6.1 when xline 0.6.1 is ready. + version: v0.6.1 image: phoenix500526/xline imagePullPolicy: IfNotPresent replicas: 3 + config: + init-leader: "hello"