From 93cd8023c7c557ad637f1d6ef6e72049c605cee3 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Mon, 11 Sep 2023 21:34:45 +0800 Subject: [PATCH] refactor: refactor controller with factory Signed-off-by: iGxnon --- .github/workflows/e2e.yml | 2 +- build/xline-operator.Dockerfile | 5 - operator-k8s/src/consts.rs | 4 +- operator-k8s/src/controller/cluster/mod.rs | 2 - .../src/controller/cluster/v1alpha.rs | 412 --------------- .../src/controller/cluster/v1alpha1.rs | 490 ++---------------- operator-k8s/src/crd/mod.rs | 7 - operator-k8s/src/crd/v1alpha/cluster.rs | 91 ---- operator-k8s/src/crd/v1alpha/mod.rs | 3 - operator-k8s/src/crd/v1alpha1/mod.rs | 4 +- operator-k8s/src/manager/cluster.rs | 180 +++++-- operator-k8s/src/manager/mod.rs | 2 +- tests/e2e/cases/ci.sh | 19 +- tests/e2e/cases/manifests/cluster.yml | 4 +- tests/e2e/cases/manifests/operators.yml | 7 +- tests/e2e/testenv/testenv.sh | 4 +- 16 files changed, 206 insertions(+), 1030 deletions(-) delete mode 100644 build/xline-operator.Dockerfile delete mode 100644 operator-k8s/src/controller/cluster/v1alpha.rs delete mode 100644 operator-k8s/src/crd/v1alpha/cluster.rs delete mode 100644 operator-k8s/src/crd/v1alpha/mod.rs diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 78e499cd..0e5d6196 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -39,7 +39,7 @@ jobs: cargo build --release cd build cp ../target/release/xline-operator . - docker build . -t datenlord/xline-operator:latest -f operator.Dockerfile + docker build . -t xline-kv/xline-operator:latest -f operator.Dockerfile - name: 'E2E CI' env: KIND_CLUSTER_IMAGE: kindest/node:${{ matrix.k8s }} diff --git a/build/xline-operator.Dockerfile b/build/xline-operator.Dockerfile deleted file mode 100644 index e100dcdb..00000000 --- a/build/xline-operator.Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM ubuntu:latest - -COPY xline-operator /usr/local/bin - -CMD ["/usr/local/bin/xline-operator"] diff --git a/operator-k8s/src/consts.rs b/operator-k8s/src/consts.rs index 1403144f..607e86e8 100644 --- a/operator-k8s/src/consts.rs +++ b/operator-k8s/src/consts.rs @@ -1,11 +1,11 @@ +#![allow(unused)] // TODO remove + use std::time::Duration; /// The default requeue duration to achieve eventual consistency pub(crate) const DEFAULT_REQUEUE_DURATION: Duration = Duration::from_secs(600); /// The field manager identifier of xline operator pub(crate) const FIELD_MANAGER: &str = "xlineoperator.datenlord.io"; -/// The emptyDir volume name of each pod if there is no data pvc specified -pub(crate) const DATA_EMPTY_DIR_NAME: &str = "xline-data-empty-dir"; /// The image used for cronjob to trigger backup /// The following command line tool should be available in this image /// 1. curl diff --git a/operator-k8s/src/controller/cluster/mod.rs b/operator-k8s/src/controller/cluster/mod.rs index 765e4854..2a4b8058 100644 --- a/operator-k8s/src/controller/cluster/mod.rs +++ b/operator-k8s/src/controller/cluster/mod.rs @@ -1,5 +1,3 @@ -/// Controller v1alpha -mod v1alpha; /// Controller v1alpha1 mod v1alpha1; diff --git a/operator-k8s/src/controller/cluster/v1alpha.rs b/operator-k8s/src/controller/cluster/v1alpha.rs deleted file mode 100644 index 49e76300..00000000 --- a/operator-k8s/src/controller/cluster/v1alpha.rs +++ /dev/null @@ -1,412 +0,0 @@ -use std::collections::BTreeMap; -use std::sync::Arc; - -use async_trait::async_trait; -use k8s_openapi::api::apps::v1::{ - RollingUpdateStatefulSetStrategy, StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy, -}; -use k8s_openapi::api::core::v1::{ - Container, ContainerPort, EmptyDirVolumeSource, EnvVar, EnvVarSource, ObjectFieldSelector, - PersistentVolumeClaim, PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec, Volume, - VolumeMount, -}; -use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta, OwnerReference}; -use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; -use kube::api::{Patch, PatchParams}; -use kube::{Api, Client, Resource, ResourceExt}; -use tracing::{debug, error}; -use utils::consts::{DEFAULT_BACKUP_DIR, DEFAULT_DATA_DIR}; - -use crate::consts::{ - DATA_EMPTY_DIR_NAME, DEFAULT_XLINE_PORT, FIELD_MANAGER, XLINE_POD_NAME_ENV, XLINE_PORT_NAME, -}; -use crate::controller::cluster::ClusterMetrics; -use crate::controller::{Controller, MetricsLabeled}; -use crate::crd::v1alpha::Cluster; - -/// CRD `XlineCluster` controller -pub(crate) struct ClusterController { - /// Kubernetes client - pub(crate) kube_client: Client, - /// The kubernetes cluster dns suffix - pub(crate) cluster_suffix: String, - /// Cluster metrics - pub(crate) metrics: ClusterMetrics, -} - -/// All possible errors -#[derive(thiserror::Error, Debug)] -pub(crate) enum Error { - /// Missing an object in cluster - #[error("Missing object key {0} in cluster")] - MissingObject(&'static str), - /// Kube error - #[error("Kubernetes api error")] - Kube(#[from] kube::Error), - /// Backup PV mount path is already mounted - #[error("The path {0} is internally used in the xline operator and cannot be mounted.")] - CannotMount(&'static str), - /// Volume(PVC) name conflict with `DATA_EMPTY_DIR_NAME` - #[error("The {0} is conflict with the name internally used in the xline operator")] - InvalidVolumeName(&'static str), -} - -impl MetricsLabeled for Error { - fn labels(&self) -> Vec<&str> { - match *self { - Self::MissingObject(_) => vec!["missing_object"], - Self::Kube(_) => vec!["kube"], - Self::CannotMount(_) => vec!["cannot_mount"], - Self::InvalidVolumeName(_) => vec!["invalid_volume_name"], - } - } -} - -/// Controller result -type Result = std::result::Result; - -impl ClusterController { - /// Extract ports - fn extract_ports(cluster: &Arc) -> (ContainerPort, Vec) { - // expose all the container's ports - let mut xline_port = None; - let container_ports = cluster.spec.container.ports.clone().unwrap_or_default(); - let mut service_ports: Vec<_> = container_ports - .into_iter() - .map(|port| { - // the port with name `xline` is considered to be the port of xline - if matches!(port.name.as_deref(), Some(XLINE_PORT_NAME)) { - xline_port = Some(port.clone()); - } - ServicePort { - name: port.name.clone(), - port: port.container_port, - ..ServicePort::default() - } - }) - .collect(); - if xline_port.is_none() { - // add default xline port 2379 to service port if xline port is not specified - service_ports.push(ServicePort { - name: Some(XLINE_PORT_NAME.to_owned()), - port: DEFAULT_XLINE_PORT, - ..ServicePort::default() - }); - } - // if it is not specified, 2379 is used as xline port - let xline_port = xline_port.unwrap_or(ContainerPort { - name: Some(XLINE_PORT_NAME.to_owned()), - container_port: DEFAULT_XLINE_PORT, - ..ContainerPort::default() - }); - (xline_port, service_ports) - } - - /// Extract persistent volume claims - fn extract_pvcs(cluster: &Arc) -> Result> { - let mut pvcs = Vec::new(); - // check if the data pvc if specified, add the pvc to pvcs - if let Some(pvc) = cluster.spec.data.as_ref() { - pvcs.push(pvc.clone()); - } - // extend the user defined pvcs - if let Some(spec_pvcs) = cluster.spec.pvcs.clone() { - if spec_pvcs - .iter() - .any(|pvc| pvc.name_any() == DATA_EMPTY_DIR_NAME) - { - return Err(Error::InvalidVolumeName(".spec.pvcs[].metadata.name")); - } - pvcs.extend(spec_pvcs); - } - Ok(pvcs) - } - - /// Extract owner reference - fn extract_owner_ref(cluster: &Arc) -> OwnerReference { - // unwrap controller_owner_ref is always safe - let Some(owner_ref) = cluster.controller_owner_ref(&()) else { unreachable!("kube-runtime has undergone some changes.") }; - owner_ref - } - - /// Extract name, namespace - fn extract_id(cluster: &Arc) -> Result<(&str, &str)> { - let namespace = cluster - .metadata - .namespace - .as_deref() - .ok_or(Error::MissingObject(".metadata.namespace"))?; - let name = cluster - .metadata - .name - .as_deref() - .ok_or(Error::MissingObject(".metadata.name"))?; - Ok((namespace, name)) - } - - /// Build the metadata which shares between all subresources - fn build_metadata(namespace: &str, name: &str, owner_ref: OwnerReference) -> ObjectMeta { - let mut labels: BTreeMap = BTreeMap::new(); - let _: Option<_> = labels.insert("app".to_owned(), name.to_owned()); - ObjectMeta { - labels: Some(labels.clone()), // it is used in selector - name: Some(name.to_owned()), // all subresources share the same name - namespace: Some(namespace.to_owned()), // all subresources share the same namespace - owner_references: Some(vec![owner_ref]), // allow k8s GC to automatically clean up itself - ..ObjectMeta::default() - } - } - - /// Apply headless service - async fn apply_headless_service( - &self, - namespace: &str, - name: &str, - metadata: &ObjectMeta, - service_ports: Vec, - ) -> Result<()> { - let api: Api = Api::namespaced(self.kube_client.clone(), namespace); - let _: Service = api - .patch( - name, - &PatchParams::apply(FIELD_MANAGER), - &Patch::Apply(Service { - metadata: metadata.clone(), - spec: Some(ServiceSpec { - cluster_ip: None, - ports: Some(service_ports), - selector: metadata.labels.clone(), - ..ServiceSpec::default() - }), - ..Service::default() - }), - ) - .await?; - Ok(()) - } - - /// Prepare container volume - fn prepare_container_volume( - cluster: &Arc, - mut container: Container, - ) -> Result<(Container, Option>)> { - let data = cluster.spec.data.clone(); - let mut volumes = None; - // mount data volume to `DEFAULT_DATA_DIR` in container - let data_mount = if let Some(pvc) = data { - let name = pvc - .metadata - .name - .ok_or(Error::MissingObject(".spec.data.metadata.name"))?; - if name == DATA_EMPTY_DIR_NAME { - return Err(Error::InvalidVolumeName(".spec.data.metadata.name")); - } - Some(VolumeMount { - mount_path: DEFAULT_DATA_DIR.to_owned(), - name, - ..VolumeMount::default() - }) - } else { - None - }; - let mut mounts = Vec::new(); - // check if the container has specified volume_mounts before - if let Some(spec_mounts) = container.volume_mounts { - // if the container mount the dir used in operator, return error - if spec_mounts - .iter() - .any(|mount| mount.mount_path.starts_with(DEFAULT_BACKUP_DIR)) - { - return Err(Error::CannotMount(DEFAULT_BACKUP_DIR)); - } - if spec_mounts - .iter() - .any(|mount| mount.mount_path.starts_with(DEFAULT_DATA_DIR)) - { - return Err(Error::CannotMount(DEFAULT_DATA_DIR)); - } - if spec_mounts - .iter() - .any(|mount| mount.name == DATA_EMPTY_DIR_NAME) - { - return Err(Error::InvalidVolumeName( - ".spec.container.volume_mounts[].name", - )); - } - // extend the mounts - mounts.extend(spec_mounts); - } - if let Some(mount) = data_mount { - mounts.push(mount); - } else { - // if data pv is not provided, then use emptyDir as volume - volumes = Some(vec![Volume { - name: DATA_EMPTY_DIR_NAME.to_owned(), - empty_dir: Some(EmptyDirVolumeSource::default()), - ..Volume::default() - }]); - mounts.push(VolumeMount { - mount_path: DEFAULT_DATA_DIR.to_owned(), - name: DATA_EMPTY_DIR_NAME.to_owned(), - ..VolumeMount::default() - }); - } - // override the container volume_mounts - container.volume_mounts = Some(mounts); - Ok((container, volumes)) - } - - /// Prepare container environment - fn prepare_container_env(mut container: Container) -> Container { - // to get pod unique name - let mut env = container.env.unwrap_or_default(); - env.push(EnvVar { - name: XLINE_POD_NAME_ENV.to_owned(), - value_from: Some(EnvVarSource { - field_ref: Some(ObjectFieldSelector { - field_path: "metadata.name".to_owned(), - ..ObjectFieldSelector::default() - }), - ..EnvVarSource::default() - }), - ..EnvVar::default() - }); - // override the pod environments - container.env = Some(env); - container - } - - /// Prepare container command - fn prepare_container_command( - &self, - mut container: Container, - namespace: &str, - name: &str, - size: i32, - xline_port: &ContainerPort, - ) -> Container { - // generate the members and setup xline in command line - let mut members = vec![]; - for i in 0..size { - members.push(format!( - "{name}-{i}={name}-{i}.{name}.{namespace}.svc.{}:{}", - self.cluster_suffix, xline_port.container_port - )); - } - // $(XLINE_POD_NAME_ENV) will read the pod name from environment - container.command = Some( - format!("xline --name $({XLINE_POD_NAME_ENV}) --storage-engine rocksdb --data-dir {DEFAULT_DATA_DIR} --members {}", members.join(",")) - .split_whitespace() - .map(ToOwned::to_owned) - .collect(), - ); - container - } - - /// Prepare the xline container provided by user - fn prepare_container( - &self, - namespace: &str, - name: &str, - cluster: &Arc, - xline_port: &ContainerPort, - ) -> Result<(Container, Option>)> { - let container = cluster.spec.container.clone(); - let (container, volumes) = Self::prepare_container_volume(cluster, container)?; - let container = Self::prepare_container_env(container); - let container = self.prepare_container_command( - container, - namespace, - name, - cluster.spec.size, - xline_port, - ); - Ok((container, volumes)) - } - - /// Apply the statefulset in k8s to reconcile cluster - async fn apply_statefulset( - &self, - namespace: &str, - name: &str, - cluster: &Arc, - xline_port: &ContainerPort, - pvcs: Vec, - metadata: &ObjectMeta, - ) -> Result<()> { - let api: Api = Api::namespaced(self.kube_client.clone(), namespace); - let (container, volumes) = self.prepare_container(namespace, name, cluster, xline_port)?; - let _: StatefulSet = api - .patch( - name, - &PatchParams::apply(FIELD_MANAGER), - &Patch::Apply(StatefulSet { - metadata: metadata.clone(), - spec: Some(StatefulSetSpec { - replicas: Some(cluster.spec.size), - selector: LabelSelector { - match_expressions: None, - match_labels: metadata.labels.clone(), - }, - service_name: name.to_owned(), - volume_claim_templates: Some(pvcs), - update_strategy: Some(StatefulSetUpdateStrategy { - rolling_update: Some(RollingUpdateStatefulSetStrategy { - max_unavailable: Some(IntOrString::String("50%".to_owned())), // allow a maximum of half the cluster quorum shutdown when performing a rolling update - partition: None, - }), - ..StatefulSetUpdateStrategy::default() - }), - template: PodTemplateSpec { - metadata: Some(ObjectMeta { - labels: metadata.labels.clone(), - ..ObjectMeta::default() - }), - spec: Some(PodSpec { - containers: vec![container], - volumes, - affinity: cluster.spec.affinity.clone(), - ..PodSpec::default() - }), - }, - ..StatefulSetSpec::default() - }), - ..StatefulSet::default() - }), - ) - .await?; - Ok(()) - } -} - -#[async_trait] -impl Controller for ClusterController { - type Error = Error; - type Metrics = ClusterMetrics; - - fn metrics(&self) -> &Self::Metrics { - &self.metrics - } - - async fn reconcile_once(&self, cluster: &Arc) -> Result<()> { - debug!( - "Reconciling cluster: \n{}", - serde_json::to_string_pretty(cluster.as_ref()).unwrap_or_default() - ); - let (namespace, name) = Self::extract_id(cluster)?; - let owner_ref = Self::extract_owner_ref(cluster); - let pvcs = Self::extract_pvcs(cluster)?; - let (xline_port, service_ports) = Self::extract_ports(cluster); - let metadata = Self::build_metadata(namespace, name, owner_ref); - - self.apply_headless_service(namespace, name, &metadata, service_ports) - .await?; - self.apply_statefulset(namespace, name, cluster, &xline_port, pvcs, &metadata) - .await?; - Ok(()) - } - - fn handle_error(&self, resource: &Arc, err: &Self::Error) { - error!("{:?} reconciliation error: {}", resource.metadata.name, err); - } -} diff --git a/operator-k8s/src/controller/cluster/v1alpha1.rs b/operator-k8s/src/controller/cluster/v1alpha1.rs index 7458937e..4765f8cc 100644 --- a/operator-k8s/src/controller/cluster/v1alpha1.rs +++ b/operator-k8s/src/controller/cluster/v1alpha1.rs @@ -1,30 +1,19 @@ -use std::collections::BTreeMap; +use std::fmt::Debug; use std::sync::Arc; use async_trait::async_trait; -use k8s_openapi::api::apps::v1::{ - RollingUpdateStatefulSetStrategy, StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy, -}; -use k8s_openapi::api::batch::v1::{CronJob, CronJobSpec, JobSpec, JobTemplateSpec}; -use k8s_openapi::api::core::v1::{ - Container, ContainerPort, EmptyDirVolumeSource, EnvVar, EnvVarSource, ObjectFieldSelector, - PersistentVolumeClaim, PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec, Volume, - VolumeMount, -}; -use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta, OwnerReference}; -use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; +use k8s_openapi::NamespaceResourceScope; use kube::api::{Patch, PatchParams}; -use kube::{Api, Client, Resource, ResourceExt}; +use kube::{Api, Client, Resource}; +use serde::de::DeserializeOwned; +use serde::Serialize; use tracing::{debug, error}; -use utils::consts::{DEFAULT_BACKUP_DIR, DEFAULT_DATA_DIR}; -use crate::consts::{ - CRONJOB_IMAGE, DATA_EMPTY_DIR_NAME, DEFAULT_SIDECAR_PORT, DEFAULT_XLINE_PORT, FIELD_MANAGER, - SIDECAR_PORT_NAME, XLINE_POD_NAME_ENV, XLINE_PORT_NAME, -}; +use crate::consts::FIELD_MANAGER; use crate::controller::cluster::ClusterMetrics; use crate::controller::{Controller, MetricsLabeled}; -use crate::crd::v1alpha1::{Cluster, StorageSpec}; +use crate::crd::v1alpha1::Cluster; +use crate::manager::cluster::Factory; /// CRD `XlineCluster` controller pub(crate) struct ClusterController { @@ -36,430 +25,43 @@ pub(crate) struct ClusterController { pub(crate) metrics: ClusterMetrics, } -/// All possible errors -#[derive(thiserror::Error, Debug)] -pub(crate) enum Error { - /// Missing an object in cluster - #[error("Missing object key {0} in cluster")] - MissingObject(&'static str), - /// Kube error - #[error("Kubernetes api error")] - Kube(#[from] kube::Error), - /// Backup PV mount path is already mounted - #[error("The path {0} is internally used in the xline operator and cannot be mounted.")] - CannotMount(&'static str), - /// Volume(PVC) name conflict with `DATA_EMPTY_DIR_NAME` - #[error("The {0} is conflict with the name internally used in the xline operator")] - InvalidVolumeName(&'static str), -} - -impl MetricsLabeled for Error { +impl MetricsLabeled for kube::Error { fn labels(&self) -> Vec<&str> { + #[allow(clippy::wildcard_enum_match_arm)] // the reason is enough match *self { - Self::MissingObject(_) => vec!["missing_object"], - Self::Kube(_) => vec!["kube"], - Self::CannotMount(_) => vec!["cannot_mount"], - Self::InvalidVolumeName(_) => vec!["invalid_volume_name"], + Self::Api(_) => vec!["api error"], + Self::Service(_) => vec!["service error"], + Self::FromUtf8(_) | Self::SerdeError(_) => vec!["encode/decode error"], + Self::Auth(_) => vec!["authorization error"], + Self::OpensslTls(_) => vec!["tls error"], + Self::HyperError(_) | Self::HttpError(_) => vec!["http error"], + _ => vec!["unknown"], } } } /// Controller result -type Result = std::result::Result; +type Result = std::result::Result; impl ClusterController { - /// Extract ports - fn extract_ports(cluster: &Arc) -> (ContainerPort, ContainerPort, Vec) { - // expose all the container's ports - let mut xline_port = None; - let mut sidecar_port = None; - let container_ports = cluster.spec.container.ports.clone().unwrap_or_default(); - let mut service_ports: Vec<_> = container_ports - .into_iter() - .map(|port| { - // the port with name `xline` is considered to be the port of xline - if matches!(port.name.as_deref(), Some(XLINE_PORT_NAME)) { - xline_port = Some(port.clone()); - } - // the port with name `sidecar` is considered to be the port of xline - if matches!(port.name.as_deref(), Some(SIDECAR_PORT_NAME)) { - sidecar_port = Some(port.clone()); - } - ServicePort { - name: port.name.clone(), - port: port.container_port, - ..ServicePort::default() - } - }) - .collect(); - if xline_port.is_none() { - // add default xline port 2379 to service port if xline port is not specified - service_ports.push(ServicePort { - name: Some(XLINE_PORT_NAME.to_owned()), - port: DEFAULT_XLINE_PORT, - ..ServicePort::default() - }); - } - if sidecar_port.is_none() { - // add default sidecar port 2380 to service port if sidecar port is not specified - service_ports.push(ServicePort { - name: Some(SIDECAR_PORT_NAME.to_owned()), - port: DEFAULT_SIDECAR_PORT, - ..ServicePort::default() - }); - } - // if it is not specified, 2379 is used as xline port - let xline_port = xline_port.unwrap_or(ContainerPort { - name: Some(XLINE_PORT_NAME.to_owned()), - container_port: DEFAULT_XLINE_PORT, - ..ContainerPort::default() - }); - // if it is not specified, 2380 is used as sidecar port - let sidecar_port = sidecar_port.unwrap_or(ContainerPort { - name: Some(SIDECAR_PORT_NAME.to_owned()), - container_port: DEFAULT_SIDECAR_PORT, - ..ContainerPort::default() - }); - (xline_port, sidecar_port, service_ports) - } - - /// Extract persistent volume claims - fn extract_pvcs(cluster: &Arc) -> Result> { - let mut pvcs = Vec::new(); - // check if the backup type is PV, add the pvc to pvcs - if let Some(spec) = cluster.spec.backup.as_ref() { - if let StorageSpec::Pvc { pvc } = spec.storage.clone() { - pvcs.push(pvc); - } - } - // check if the data pvc if specified, add the pvc to pvcs - if let Some(pvc) = cluster.spec.data.as_ref() { - pvcs.push(pvc.clone()); - } - // extend the user defined pvcs - if let Some(spec_pvcs) = cluster.spec.pvcs.clone() { - if spec_pvcs - .iter() - .any(|pvc| pvc.name_any() == DATA_EMPTY_DIR_NAME) - { - return Err(Error::InvalidVolumeName(".spec.pvcs[].metadata.name")); - } - pvcs.extend(spec_pvcs); - } - Ok(pvcs) - } - - /// Extract owner reference - fn extract_owner_ref(cluster: &Arc) -> OwnerReference { - // unwrap controller_owner_ref is always safe - let Some(owner_ref) = cluster.controller_owner_ref(&()) else { unreachable!("kube-runtime has undergone some changes.") }; - owner_ref - } - - /// Extract name, namespace - fn extract_id(cluster: &Arc) -> Result<(&str, &str)> { - let namespace = cluster - .metadata - .namespace - .as_deref() - .ok_or(Error::MissingObject(".metadata.namespace"))?; - let name = cluster - .metadata - .name - .as_deref() - .ok_or(Error::MissingObject(".metadata.name"))?; - Ok((namespace, name)) - } - - /// Build the metadata which shares between all subresources - fn build_metadata(namespace: &str, name: &str, owner_ref: OwnerReference) -> ObjectMeta { - let mut labels: BTreeMap = BTreeMap::new(); - let _: Option<_> = labels.insert("app".to_owned(), name.to_owned()); - ObjectMeta { - labels: Some(labels.clone()), // it is used in selector - name: Some(name.to_owned()), // all subresources share the same name - namespace: Some(namespace.to_owned()), // all subresources share the same namespace - owner_references: Some(vec![owner_ref]), // allow k8s GC to automatically clean up itself - ..ObjectMeta::default() - } - } - - /// Apply headless service - async fn apply_headless_service( - &self, - namespace: &str, - name: &str, - metadata: &ObjectMeta, - service_ports: Vec, - ) -> Result<()> { - let api: Api = Api::namespaced(self.kube_client.clone(), namespace); - let _: Service = api - .patch( - name, - &PatchParams::apply(FIELD_MANAGER), - &Patch::Apply(Service { - metadata: metadata.clone(), - spec: Some(ServiceSpec { - cluster_ip: None, - ports: Some(service_ports), - selector: metadata.labels.clone(), - ..ServiceSpec::default() - }), - ..Service::default() - }), - ) - .await?; - Ok(()) - } - - /// Prepare container volume - fn prepare_container_volume( - cluster: &Arc, - mut container: Container, - ) -> Result<(Container, Option>)> { - let backup = cluster.spec.backup.clone(); - let data = cluster.spec.data.clone(); - let mut volumes = None; - // mount backup volume to `DEFAULT_BACKUP_PV_MOUNT_PATH` in container - let backup_mount = if let Some(spec) = backup { - let backup_pvc_name = match spec.storage { - StorageSpec::S3 { .. } => None, - StorageSpec::Pvc { pvc } => Some( - pvc.metadata - .name - .ok_or(Error::MissingObject(".spec.backup.pvc.metadata.name"))?, - ), - }; - if let Some(pvc_name) = backup_pvc_name { - if pvc_name == DATA_EMPTY_DIR_NAME { - return Err(Error::InvalidVolumeName(".spec.backup.metadata.name")); - } - Some(VolumeMount { - mount_path: DEFAULT_BACKUP_DIR.to_owned(), - name: pvc_name, - ..VolumeMount::default() - }) - } else { - None - } - } else { - None - }; - // mount data volume to `DEFAULT_DATA_DIR` in container - let data_mount = if let Some(pvc) = data { - let name = pvc - .metadata - .name - .ok_or(Error::MissingObject(".spec.data.metadata.name"))?; - if name == DATA_EMPTY_DIR_NAME { - return Err(Error::InvalidVolumeName(".spec.data.metadata.name")); - } - Some(VolumeMount { - mount_path: DEFAULT_DATA_DIR.to_owned(), - name, - ..VolumeMount::default() - }) - } else { - None - }; - let mut mounts = Vec::new(); - // check if the container has specified volume_mounts before - if let Some(spec_mounts) = container.volume_mounts { - // if the container mount the dir used in operator, return error - if spec_mounts - .iter() - .any(|mount| mount.mount_path.starts_with(DEFAULT_BACKUP_DIR)) - { - return Err(Error::CannotMount(DEFAULT_BACKUP_DIR)); - } - if spec_mounts - .iter() - .any(|mount| mount.mount_path.starts_with(DEFAULT_DATA_DIR)) - { - return Err(Error::CannotMount(DEFAULT_DATA_DIR)); - } - if spec_mounts - .iter() - .any(|mount| mount.name == DATA_EMPTY_DIR_NAME) - { - return Err(Error::InvalidVolumeName( - ".spec.container.volume_mounts[].name", - )); - } - // extend the mounts - mounts.extend(spec_mounts); - } - if let Some(mount) = backup_mount { - mounts.push(mount); - } - if let Some(mount) = data_mount { - mounts.push(mount); - } else { - // if data pv is not provided, then use emptyDir as volume - volumes = Some(vec![Volume { - name: DATA_EMPTY_DIR_NAME.to_owned(), - empty_dir: Some(EmptyDirVolumeSource::default()), - ..Volume::default() - }]); - mounts.push(VolumeMount { - mount_path: DEFAULT_DATA_DIR.to_owned(), - name: DATA_EMPTY_DIR_NAME.to_owned(), - ..VolumeMount::default() - }); - } - // override the container volume_mounts - container.volume_mounts = Some(mounts); - Ok((container, volumes)) - } - - /// Prepare container environment - fn prepare_container_env(mut container: Container) -> Container { - // to get pod unique name - let mut env = container.env.unwrap_or_default(); - env.push(EnvVar { - name: XLINE_POD_NAME_ENV.to_owned(), - value_from: Some(EnvVarSource { - field_ref: Some(ObjectFieldSelector { - field_path: "metadata.name".to_owned(), - ..ObjectFieldSelector::default() - }), - ..EnvVarSource::default() - }), - ..EnvVar::default() - }); - // override the pod environments - container.env = Some(env); - container - } - - /// Prepare container command - fn prepare_container_command(mut container: Container) -> Container { - // the main command should wait forever so that the sidecar could always contact the xline container - // so we use `tail -F /dev/null` here - container.command = Some( - "tail -F /dev/null" - .split_whitespace() - .map(ToOwned::to_owned) - .collect(), - ); - container - } - - /// Prepare the xline container provided by user - fn prepare_container(cluster: &Arc) -> Result<(Container, Option>)> { - let container = cluster.spec.container.clone(); - let (container, volumes) = Self::prepare_container_volume(cluster, container)?; - let container = Self::prepare_container_env(container); - let container = Self::prepare_container_command(container); - Ok((container, volumes)) - } - - /// Apply the statefulset in k8s to reconcile cluster - async fn apply_statefulset( + /// Apply resource + #[allow(clippy::expect_used)] // use expect rather than unwrap_or_else(|| unreachable()) + async fn apply_resource>( &self, - namespace: &str, - name: &str, - cluster: &Arc, - pvcs: Vec, - metadata: &ObjectMeta, - ) -> Result<()> { - let api: Api = Api::namespaced(self.kube_client.clone(), namespace); - let (container, volumes) = Self::prepare_container(cluster)?; - let _: StatefulSet = api + res: R, + ) -> Result<()> + where + R: Clone + DeserializeOwned + Debug + Serialize, + R::DynamicType: Default, + { + let namespace = res.meta().namespace.as_deref().expect("require namespace"); + let name = res.meta().name.clone().expect("require name"); + let api: Api = Api::namespaced(self.kube_client.clone(), namespace); + _ = api .patch( - name, + &name, &PatchParams::apply(FIELD_MANAGER), - &Patch::Apply(StatefulSet { - metadata: metadata.clone(), - spec: Some(StatefulSetSpec { - replicas: Some(cluster.spec.size), - selector: LabelSelector { - match_expressions: None, - match_labels: metadata.labels.clone(), - }, - service_name: name.to_owned(), - volume_claim_templates: Some(pvcs), - update_strategy: Some(StatefulSetUpdateStrategy { - rolling_update: Some(RollingUpdateStatefulSetStrategy { - max_unavailable: Some(IntOrString::String("50%".to_owned())), // allow a maximum of half the cluster quorum shutdown when performing a rolling update - partition: None, - }), - ..StatefulSetUpdateStrategy::default() - }), - template: PodTemplateSpec { - metadata: Some(ObjectMeta { - labels: metadata.labels.clone(), - ..ObjectMeta::default() - }), - spec: Some(PodSpec { - affinity: cluster.spec.affinity.clone(), - init_containers: Some(vec![]), // TODO publish sidecar operator to registry - containers: vec![container], // TODO inject the sidecar operator container here - volumes, - ..PodSpec::default() - }), - }, - ..StatefulSetSpec::default() - }), - ..StatefulSet::default() - }), - ) - .await?; - Ok(()) - } - - /// Apply the cron job to trigger backup - async fn apply_backup_cron_job( - &self, - namespace: &str, - name: &str, - size: i32, - cron: &str, - sidecar_port: &ContainerPort, - metadata: &ObjectMeta, - ) -> Result<()> { - let api: Api = Api::namespaced(self.kube_client.clone(), namespace); - let trigger_cmd = vec![ - "/bin/sh".to_owned(), - "-ecx".to_owned(), - format!( - "curl {name}-$((RANDOM % {size})).{name}.{namespace}.svc.{}:{}/backup", - self.cluster_suffix, sidecar_port.container_port - ), // choose a node randomly - ]; - let _: CronJob = api - .patch( - name, - &PatchParams::apply(FIELD_MANAGER), - &Patch::Apply(CronJob { - metadata: metadata.clone(), - spec: Some(CronJobSpec { - concurrency_policy: Some("Forbid".to_owned()), // A backup cron job cannot run concurrently - schedule: cron.to_owned(), - job_template: JobTemplateSpec { - spec: Some(JobSpec { - template: PodTemplateSpec { - spec: Some(PodSpec { - containers: vec![Container { - name: format!("{name}-backup-cronjob"), - image_pull_policy: Some("IfNotPresent".to_owned()), - image: Some(CRONJOB_IMAGE.to_owned()), - command: Some(trigger_cmd), - ..Container::default() - }], - restart_policy: Some("OnFailure".to_owned()), - ..PodSpec::default() - }), - ..PodTemplateSpec::default() - }, - ..JobSpec::default() - }), - ..JobTemplateSpec::default() - }, - ..CronJobSpec::default() - }), - ..CronJob::default() - }), + &Patch::Apply(res), ) .await?; Ok(()) @@ -468,7 +70,7 @@ impl ClusterController { #[async_trait] impl Controller for ClusterController { - type Error = Error; + type Error = kube::Error; type Metrics = ClusterMetrics; fn metrics(&self) -> &Self::Metrics { @@ -480,28 +82,12 @@ impl Controller for ClusterController { "Reconciling cluster: \n{}", serde_json::to_string_pretty(cluster.as_ref()).unwrap_or_default() ); - let (namespace, name) = Self::extract_id(cluster)?; - let owner_ref = Self::extract_owner_ref(cluster); - let pvcs = Self::extract_pvcs(cluster)?; - let (_xline_port, sidecar_port, service_ports) = Self::extract_ports(cluster); - let metadata = Self::build_metadata(namespace, name, owner_ref); + let factory = Factory::new(Arc::clone(cluster), &self.cluster_suffix); - self.apply_headless_service(namespace, name, &metadata, service_ports) - .await?; - self.apply_statefulset(namespace, name, cluster, pvcs, &metadata) - .await?; + self.apply_resource(factory.node_service()).await?; + // TODO wait service ready + self.apply_resource(factory.sts()).await?; - if let Some(spec) = cluster.spec.backup.as_ref() { - Box::pin(self.apply_backup_cron_job( - namespace, - name, - cluster.spec.size, - spec.cron.as_str(), - &sidecar_port, - &metadata, - )) - .await?; - } Ok(()) } diff --git a/operator-k8s/src/crd/mod.rs b/operator-k8s/src/crd/mod.rs index 87910eb4..4a403a89 100644 --- a/operator-k8s/src/crd/mod.rs +++ b/operator-k8s/src/crd/mod.rs @@ -1,10 +1,3 @@ -/// v1alpha -/// Features: -/// 1. Basic deployment -/// 2. Scale cluster -/// 3. Xline data PV -pub(crate) mod v1alpha; - /// v1alpha1 /// Features: /// 1. Xline sidecar diff --git a/operator-k8s/src/crd/v1alpha/cluster.rs b/operator-k8s/src/crd/v1alpha/cluster.rs deleted file mode 100644 index c31cb5fb..00000000 --- a/operator-k8s/src/crd/v1alpha/cluster.rs +++ /dev/null @@ -1,91 +0,0 @@ -// The `JsonSchema` and `CustomResource` macro generates codes that does not pass the clippy lint. -#![allow(clippy::str_to_string)] -#![allow(clippy::missing_docs_in_private_items)] - -#[cfg(test)] -use garde::Validate; -use k8s_openapi::api::core::v1::{Affinity, Container, PersistentVolumeClaim}; -use k8s_openapi::serde::{Deserialize, Serialize}; -use kube::CustomResource; -use schemars::JsonSchema; - -/// Xline cluster specification -#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)] -#[cfg_attr(test, derive(Validate))] -#[kube( - group = "xlineoperator.xline.cloud", - version = "v1alpha", - kind = "XlineCluster", - singular = "xlinecluster", - plural = "xlineclusters", - struct = "Cluster", - namespaced, - status = "ClusterStatus", - shortname = "xc", - scale = r#"{"specReplicasPath":".spec.size", "statusReplicasPath":".status.available"}"#, - printcolumn = r#"{"name":"Size", "type":"string", "description":"The cluster size", "jsonPath":".spec.size"}"#, - printcolumn = r#"{"name":"Age", "type":"date", "description":"The cluster age", "jsonPath":".metadata.creationTimestamp"}"# -)] -pub(crate) struct ClusterSpec { - /// Size of the xline cluster, less than 3 is not allowed - #[cfg_attr(test, garde(range(min = 3)))] - #[schemars(range(min = 3))] - pub(crate) size: i32, - /// Xline container specification - #[cfg_attr(test, garde(skip))] - pub(crate) container: Container, - /// The affinity of the xline node - #[cfg_attr(test, garde(skip))] - #[serde(skip_serializing_if = "Option::is_none")] - pub(crate) affinity: Option, - /// The data PVC, if it is not specified, then use emptyDir instead - #[cfg_attr(test, garde(skip))] - #[serde(skip_serializing_if = "Option::is_none")] - pub(crate) data: Option, - /// Some user defined persistent volume claim templates - #[cfg_attr(test, garde(skip))] - #[serde(skip_serializing_if = "Option::is_none")] - pub(crate) pvcs: Option>, -} - -/// Xline cluster status -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)] -pub(crate) struct ClusterStatus { - /// The available nodes' number in the cluster - pub(crate) available: i32, -} - -#[cfg(test)] -mod test { - use garde::Validate; - use k8s_openapi::api::core::v1::Container; - - use super::ClusterSpec; - - #[test] - fn validation_ok() { - let ok = ClusterSpec { - size: 3, - container: Container::default(), - affinity: None, - pvcs: None, - data: None, - }; - assert!(Validate::validate(&ok, &()).is_ok()); - } - - #[test] - fn validation_bad_size() { - let bad_size = ClusterSpec { - size: 1, - container: Container::default(), - affinity: None, - pvcs: None, - data: None, - }; - assert_eq!( - Validate::validate(&bad_size, &()).unwrap_err().flatten()[0].0, - "value.size" - ); - } -} diff --git a/operator-k8s/src/crd/v1alpha/mod.rs b/operator-k8s/src/crd/v1alpha/mod.rs deleted file mode 100644 index b0a1a7d0..00000000 --- a/operator-k8s/src/crd/v1alpha/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub(crate) use cluster::Cluster; - -mod cluster; diff --git a/operator-k8s/src/crd/v1alpha1/mod.rs b/operator-k8s/src/crd/v1alpha1/mod.rs index c2343934..b0a1a7d0 100644 --- a/operator-k8s/src/crd/v1alpha1/mod.rs +++ b/operator-k8s/src/crd/v1alpha1/mod.rs @@ -1,5 +1,3 @@ -#![allow(unused)] // TODO: remove when this CRD is used - -pub(crate) use cluster::{BackupSpec, Cluster, ClusterSpec, StorageSpec}; +pub(crate) use cluster::Cluster; mod cluster; diff --git a/operator-k8s/src/manager/cluster.rs b/operator-k8s/src/manager/cluster.rs index 5d38ac7b..ee63d545 100644 --- a/operator-k8s/src/manager/cluster.rs +++ b/operator-k8s/src/manager/cluster.rs @@ -1,24 +1,24 @@ -#![allow(unused)] // remove when implemented - -use crate::consts::{ - ANNOTATION_INHERIT_LABELS_PREFIX, DEFAULT_SIDECAR_PORT, DEFAULT_XLINE_PORT, - LABEL_CLUSTER_COMPONENT, LABEL_CLUSTER_NAME, LABEL_OPERATOR_VERSION, SIDECAR_PORT_NAME, - XLINE_POD_NAME_ENV, XLINE_PORT_NAME, -}; -use crate::crd::v1alpha1::{BackupSpec, Cluster, ClusterSpec, StorageSpec}; +#![allow(unused)] // TODO remove use std::collections::BTreeMap; use std::sync::Arc; +use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec}; use k8s_openapi::api::core::v1::{ - Container, ContainerPort, EnvVar, EnvVarSource, GRPCAction, ObjectFieldSelector, - PersistentVolumeClaim, PersistentVolumeClaimVolumeSource, Pod, PodSpec, PodTemplateSpec, Probe, - Service, ServicePort, ServiceSpec, Volume, VolumeMount, + Container, ContainerPort, EnvVar, EnvVarSource, ObjectFieldSelector, PersistentVolumeClaim, + PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec, VolumeMount, }; -use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta, OwnerReference}; use kube::{Resource, ResourceExt}; use utils::consts::{DEFAULT_BACKUP_DIR, DEFAULT_DATA_DIR}; +use crate::consts::{ + ANNOTATION_INHERIT_LABELS_PREFIX, DEFAULT_SIDECAR_PORT, DEFAULT_XLINE_PORT, + LABEL_CLUSTER_COMPONENT, LABEL_CLUSTER_NAME, LABEL_OPERATOR_VERSION, SIDECAR_PORT_NAME, + XLINE_POD_NAME_ENV, XLINE_PORT_NAME, +}; +use crate::crd::v1alpha1::Cluster; + /// Read objects from `XlineCluster` pub(crate) struct Extractor<'a> { /// `XlineCluster` @@ -29,7 +29,7 @@ pub(crate) struct Extractor<'a> { #[derive(Copy, Clone)] pub(crate) enum Component { /// A xline node - Node, + Nodes, /// A service Service, /// A backup job @@ -40,7 +40,7 @@ impl Component { /// Get the component name fn label(&self) -> &str { match *self { - Component::Node => "node", + Component::Nodes => "nodes", Component::Service => "srv", Component::BackupJob => "job", } @@ -59,7 +59,7 @@ impl<'a> Extractor<'a> { /// If the `XlineCluster` does not specified the xline ports (a port with name 'xline') or /// the sidecar ports (a port with name 'sidecar'), the default port (xline: 2379, sidecar: 2380) /// will be used. - fn extract_ports(&self) -> (ContainerPort, ContainerPort, Vec) { + pub(crate) fn extract_ports(&self) -> (ContainerPort, ContainerPort, Vec) { // expose all the container's ports let mut xline_port = None; let mut sidecar_port = None; @@ -121,7 +121,7 @@ impl<'a> Extractor<'a> { /// Extract all PVC templates /// The PVC template is used to create PVC for every pod - fn extract_pvc_template(&self) -> Vec { + pub(crate) fn extract_pvc_template(&self) -> Vec { self.cluster .spec .backup @@ -163,7 +163,6 @@ impl<'a> Extractor<'a> { /// Extract owner reference #[allow(clippy::expect_used)] // it is ok because xlinecluster always populated from the apiserver fn extract_owner_ref(&self) -> OwnerReference { - // unwrap controller_owner_ref is always safe self.cluster .controller_owner_ref(&()) .expect("metadata doesn't have name or uid") @@ -171,7 +170,7 @@ impl<'a> Extractor<'a> { /// Extract name, namespace #[allow(clippy::expect_used)] // it is ok because xlinecluster has field validation - fn extract_id(&self) -> (&str, &str) { + pub(crate) fn extract_id(&self) -> (&str, &str) { let name = self .cluster .metadata @@ -235,17 +234,24 @@ impl Factory { format!("{cluster_name}-{}", component.label()) } + /// Get the selector labels + fn selector_labels(name: &str, component: Component) -> BTreeMap { + BTreeMap::from([ + (LABEL_CLUSTER_NAME.to_owned(), name.to_owned()), + ( + LABEL_CLUSTER_COMPONENT.to_owned(), + component.label().to_owned(), + ), + ]) + } + /// Get the general metadata fn general_metadata(&self, component: Component) -> ObjectMeta { let extractor = Extractor::new(self.cluster.as_ref()); let mut labels = extractor.extract_inherit_labels(); let (name, namespace) = extractor.extract_id(); let owner_ref = extractor.extract_owner_ref(); - _ = labels.insert(LABEL_CLUSTER_NAME.to_owned(), name.to_owned()); - _ = labels.insert( - LABEL_CLUSTER_COMPONENT.to_owned(), - component.label().to_owned(), - ); + labels.extend(Self::selector_labels(name, component)); _ = labels.insert( LABEL_OPERATOR_VERSION.to_owned(), env!("CARGO_PKG_VERSION").to_owned(), @@ -260,7 +266,7 @@ impl Factory { } /// Get the node headless service - fn node_service(&self) -> Service { + pub(crate) fn node_service(&self) -> Service { let extractor = Extractor::new(self.cluster.as_ref()); let (_, _, service_ports) = extractor.extract_ports(); let (name, _) = extractor.extract_id(); @@ -274,7 +280,7 @@ impl Factory { (LABEL_CLUSTER_NAME.to_owned(), name.to_owned()), ( LABEL_CLUSTER_COMPONENT.to_owned(), - Component::Node.label().to_owned(), + Component::Nodes.label().to_owned(), ), ] .into(), @@ -286,7 +292,6 @@ impl Factory { } /// Mount the additional volumes on the container - #[allow(clippy::unused_self)] fn mount_volume_on_container(&self, container: &mut Container) { let extractor = Extractor::new(self.cluster.as_ref()); let volume_mount = extractor.extract_additional_volume_mount(); @@ -297,14 +302,15 @@ impl Factory { } /// Set the entrypoint of the container - fn set_command(&self, container: &mut Container, size: usize) { + fn set_command(&self, container: &mut Container) { + let size = self.cluster.spec.size; let extractor = Extractor::new(self.cluster.as_ref()); let (name, namespace) = extractor.extract_id(); let (xline_port, _, _) = extractor.extract_ports(); let srv_name = Self::component_name(name, Component::Service); let mut members = vec![]; for i in 0..=size { - let node_name = format!("{}-{i}", Self::component_name(name, Component::Node)); + let node_name = format!("{}-{i}", Self::component_name(name, Component::Nodes)); members.push(format!( "{node_name}={node_name}.{srv_name}.{namespace}.svc.{}:{}", self.cluster_suffix, xline_port.container_port @@ -320,10 +326,10 @@ impl Factory { } /// Get the xline container - fn xline_container(&self, size: usize) -> Container { + fn xline_container(&self) -> Container { let mut container = self.cluster.spec.container.clone(); self.mount_volume_on_container(&mut container); - self.set_command(&mut container, size); + self.set_command(&mut container); // we need to set the env variable to get the pod name in the container container.env = Some(vec![EnvVar { name: XLINE_POD_NAME_ENV.to_owned(), @@ -340,12 +346,16 @@ impl Factory { } /// Get the node pod - fn pod_spec(&self, size: usize) -> PodTemplateSpec { + pub(crate) fn pod_spec(&self) -> PodTemplateSpec { let extractor = Extractor::new(self.cluster.as_ref()); let (name, _) = extractor.extract_id(); - let xline = self.xline_container(size); + let xline = self.xline_container(); + let labels = Self::selector_labels(name, Component::Nodes); PodTemplateSpec { - metadata: Some(self.general_metadata(Component::Node)), + metadata: Some(ObjectMeta { + labels: Some(labels), + ..ObjectMeta::default() + }), spec: Some(PodSpec { init_containers: Some(vec![]), containers: vec![xline], @@ -354,13 +364,34 @@ impl Factory { }), } } + + /// Get the statefulset + pub(crate) fn sts(&self) -> StatefulSet { + let size = self.cluster.spec.size; + let extractor = Extractor::new(self.cluster.as_ref()); + let (name, _) = extractor.extract_id(); + let labels = Self::selector_labels(name, Component::Nodes); + StatefulSet { + metadata: self.general_metadata(Component::Nodes), + spec: Some(StatefulSetSpec { + replicas: Some(size), + selector: LabelSelector { + match_expressions: None, + match_labels: Some(labels), + }, + service_name: Self::component_name(name, Component::Service), + volume_claim_templates: Some(extractor.extract_pvc_template()), + template: self.pod_spec(), + ..StatefulSetSpec::default() + }), + status: None, + } + } } #[cfg(test)] mod tests { use super::*; - use k8s_openapi::api::core::v1::{Affinity, NodeAffinity, PersistentVolumeClaimSpec}; - use kube::CustomResourceExt; static CLUSTER_1: &str = r#" apiVersion: xlineoperator.xline.cloud/v1alpha @@ -586,8 +617,8 @@ spec: #[test] fn factory_component_name_should_work() { assert_eq!( - Factory::component_name("my-xline-cluster", Component::Node), - "my-xline-cluster-node" + Factory::component_name("my-xline-cluster", Component::Nodes), + "my-xline-cluster-nodes" ); assert_eq!( Factory::component_name("my-xline-cluster", Component::Service), @@ -598,4 +629,79 @@ spec: "my-xline-cluster-job" ); } + + #[test] + fn factory_general_metadata_should_work() { + let cluster_1_metadata = r#" +labels: + app: my-xline-cluster + appNamespace: default + xlinecluster/component: nodes + xlinecluster/name: my-xline-cluster + xlinecluster/operator-version: 0.1.0 +name: my-xline-cluster-nodes +namespace: default +ownerReferences: +- apiVersion: xlineoperator.xline.cloud/v1alpha1 + controller: true + kind: XlineCluster + name: my-xline-cluster + uid: this-is-a-random-uid + "# + .trim(); + + let cluster_other_metadata = r#" +labels: + xlinecluster/component: nodes + xlinecluster/name: my-xline-cluster + xlinecluster/operator-version: 0.1.0 +name: my-xline-cluster-nodes +namespace: default +ownerReferences: +- apiVersion: xlineoperator.xline.cloud/v1alpha1 + controller: true + kind: XlineCluster + name: my-xline-cluster + uid: this-is-a-random-uid + "# + .trim(); + + for (cluster_raw, metadata_str) in [ + (CLUSTER_1, cluster_1_metadata), + (CLUSTER_2, cluster_other_metadata), + (CLUSTER_3, cluster_other_metadata), + (CLUSTER_4, cluster_other_metadata), + ] { + let mut cluster: Cluster = serde_yaml::from_str(cluster_raw).unwrap(); + after_apiserver(&mut cluster); + let factory = Factory::new(Arc::new(cluster), "cluster.local"); + let metadata = factory.general_metadata(Component::Nodes); + let outputs = serde_yaml::to_string(&metadata).unwrap(); + assert_eq!(outputs.trim(), metadata_str); + } + } + + #[test] + fn factory_node_service_should_work() { + let spec = r#" +spec: + ports: + - name: xline + port: 2379 + - name: sidecar + port: 2380 + selector: + xlinecluster/component: nodes + xlinecluster/name: my-xline-cluster + "# + .trim(); + for cluster_raw in [CLUSTER_1, CLUSTER_3, CLUSTER_4] { + let mut cluster: Cluster = serde_yaml::from_str(cluster_raw).unwrap(); + after_apiserver(&mut cluster); + let factory = Factory::new(Arc::new(cluster), "cluster.local"); + let service = factory.node_service(); + let outputs = serde_yaml::to_string(&service).unwrap(); + assert!(outputs.contains(spec)); + } + } } diff --git a/operator-k8s/src/manager/mod.rs b/operator-k8s/src/manager/mod.rs index d273d9a2..ed8a5d29 100644 --- a/operator-k8s/src/manager/mod.rs +++ b/operator-k8s/src/manager/mod.rs @@ -1,2 +1,2 @@ /// `XlineCluster` manager -mod cluster; +pub(crate) mod cluster; diff --git a/tests/e2e/cases/ci.sh b/tests/e2e/cases/ci.sh index 166b7b71..9e303b83 100644 --- a/tests/e2e/cases/ci.sh +++ b/tests/e2e/cases/ci.sh @@ -9,11 +9,12 @@ _TEST_CI_DNS_SUFFIX="cluster.local" _TEST_CI_NAMESPACE="default" _TEST_CI_XLINE_PORT="2379" _TEST_CI_LOG_SYNC_TIMEOUT=30 +_TEST_CI_START_SIZE=3 function test::ci::_mk_endpoints() { - local endpoints="${_TEST_CI_CLUSTER_NAME}-0.${_TEST_CI_CLUSTER_NAME}.${_TEST_CI_NAMESPACE}.svc.${_TEST_CI_DNS_SUFFIX}:${_TEST_CI_XLINE_PORT}" + local endpoints="${_TEST_CI_CLUSTER_NAME}-nodes-0.${_TEST_CI_CLUSTER_NAME}-srv.${_TEST_CI_NAMESPACE}.svc.${_TEST_CI_DNS_SUFFIX}:${_TEST_CI_XLINE_PORT}" for ((i = 1; i < $1; i++)); do - endpoints="${endpoints},${_TEST_CI_CLUSTER_NAME}-${i}.${_TEST_CI_CLUSTER_NAME}.${_TEST_CI_NAMESPACE}.svc.${_TEST_CI_DNS_SUFFIX}:${_TEST_CI_XLINE_PORT}" + endpoints="${endpoints},${_TEST_CI_CLUSTER_NAME}-nodes-${i}.${_TEST_CI_CLUSTER_NAME}-srv.${_TEST_CI_NAMESPACE}.svc.${_TEST_CI_DNS_SUFFIX}:${_TEST_CI_XLINE_PORT}" done echo "$endpoints" } @@ -39,9 +40,9 @@ function test::ci::_start() { k8s::kubectl wait --for=condition=available deployment/$_TEST_CI_OPERATOR_NAME --timeout=300s >/dev/null 2>&1 k8s::kubectl::wait_resource_creation crd xlineclusters.xlineoperator.xline.cloud k8s::kubectl apply -f "$(dirname "${BASH_SOURCE[0]}")/manifests/cluster.yml" >/dev/null 2>&1 - k8s::kubectl::wait_resource_creation sts $_TEST_CI_CLUSTER_NAME - k8s::kubectl wait --for=jsonpath='{.status.updatedReplicas}'=3 sts/$_TEST_CI_CLUSTER_NAME --timeout=300s >/dev/null 2>&1 - k8s::kubectl wait --for=jsonpath='{.status.readyReplicas}'=3 sts/$_TEST_CI_CLUSTER_NAME --timeout=300s >/dev/null 2>&1 + k8s::kubectl::wait_resource_creation sts "${_TEST_CI_CLUSTER_NAME}-nodes" + k8s::kubectl wait --for=jsonpath='{.status.updatedReplicas}'=$_TEST_CI_START_SIZE sts "${_TEST_CI_CLUSTER_NAME}-nodes" --timeout=300s >/dev/null 2>&1 + k8s::kubectl wait --for=jsonpath='{.status.readyReplicas}'=$_TEST_CI_START_SIZE sts "${_TEST_CI_CLUSTER_NAME}-nodes" --timeout=300s >/dev/null 2>&1 log::info "cluster started" } @@ -56,8 +57,8 @@ function test::ci::_teardown() { function test::ci::_scale_cluster() { log::info "scaling cluster to $1" k8s::kubectl scale xc $_TEST_CI_CLUSTER_NAME --replicas="$1" >/dev/null 2>&1 - k8s::kubectl wait --for=jsonpath='{.status.updatedReplicas}'="$1" sts/$_TEST_CI_CLUSTER_NAME --timeout=300s >/dev/null 2>&1 - k8s::kubectl wait --for=jsonpath='{.status.readyReplicas}'="$1" sts/$_TEST_CI_CLUSTER_NAME --timeout=300s >/dev/null 2>&1 + k8s::kubectl wait --for=jsonpath='{.status.updatedReplicas}'="$1" sts "${_TEST_CI_CLUSTER_NAME}-nodes" --timeout=300s >/dev/null 2>&1 + k8s::kubectl wait --for=jsonpath='{.status.readyReplicas}'="$1" sts "${_TEST_CI_CLUSTER_NAME}-nodes" --timeout=300s >/dev/null 2>&1 got=$(k8s::kubectl get xc $_TEST_CI_CLUSTER_NAME -o=jsonpath='{.spec.size}') if [ "$got" -ne "$1" ]; then echo "failed scale cluster" @@ -81,13 +82,13 @@ function test::ci::_chaos() { kill=$((RANDOM % max_kill + 1)) log::info "chaos: kill=$kill" for ((j = 0; j < kill; j++)); do - pod="${_TEST_CI_CLUSTER_NAME}-$((RANDOM % size))" + pod="${_TEST_CI_CLUSTER_NAME}-nodes-$((RANDOM % size))" log::info "chaos: kill pod=$pod" k8s::kubectl delete pod "$pod" --force --grace-period=0 2>/dev/null done test::ci::_etcdctl_expect "$endpoints" "put B $i" "OK" || return $? test::ci::_etcdctl_expect "$endpoints" "get B" "B\n$i" || return $? - k8s::kubectl wait --for=jsonpath='{.status.readyReplicas}'="$size" sts/$_TEST_CI_CLUSTER_NAME --timeout=300s >/dev/null 2>&1 + k8s::kubectl wait --for=jsonpath='{.status.readyReplicas}'="$size" sts "${_TEST_CI_CLUSTER_NAME}-nodes" --timeout=300s >/dev/null 2>&1 log::info "wait for log synchronization" && sleep $_TEST_CI_LOG_SYNC_TIMEOUT done } diff --git a/tests/e2e/cases/manifests/cluster.yml b/tests/e2e/cases/manifests/cluster.yml index 0193a1e7..b1403eb8 100644 --- a/tests/e2e/cases/manifests/cluster.yml +++ b/tests/e2e/cases/manifests/cluster.yml @@ -1,11 +1,11 @@ -apiVersion: xlineoperator.xline.cloud/v1alpha +apiVersion: xlineoperator.xline.cloud/v1alpha1 kind: XlineCluster metadata: name: my-xline-cluster spec: size: 3 container: - image: "datenlord/xline:latest" + image: "ghcr.io/xline-kv/xline:latest" imagePullPolicy: IfNotPresent # we will try to load image into cluster first. name: "my-xline" ports: diff --git a/tests/e2e/cases/manifests/operators.yml b/tests/e2e/cases/manifests/operators.yml index 85992bd9..0e845757 100644 --- a/tests/e2e/cases/manifests/operators.yml +++ b/tests/e2e/cases/manifests/operators.yml @@ -17,5 +17,10 @@ spec: spec: containers: - name: xline-operator - image: datenlord/xline-operator:latest + image: xline-kv/xline-operator:latest + command: + - xline-operator + args: + - --auto-migration + - --create-crd imagePullPolicy: Never diff --git a/tests/e2e/testenv/testenv.sh b/tests/e2e/testenv/testenv.sh index 3f251031..db51783b 100644 --- a/tests/e2e/testenv/testenv.sh +++ b/tests/e2e/testenv/testenv.sh @@ -15,11 +15,11 @@ function testenv::k8s::delete() { function testenv::k8s::load_images() { # xline image - xline_image="${XLINE_IMAGE:-datenlord/xline:latest}" + xline_image="ghcr.io/xline-kv/xline:latest" docker pull "$xline_image" >/dev/null testenv::k8s::kind::load_image "$xline_image" # xline operator image, this needs to be built first - testenv::k8s::kind::load_image datenlord/xline-operator:latest + testenv::k8s::kind::load_image xline-kv/xline-operator:latest # etcdctl image docker pull gcr.io/etcd-development/etcd:v3.5.5 >/dev/null testenv::k8s::kind::load_image gcr.io/etcd-development/etcd:v3.5.5