diff --git a/Cargo.lock b/Cargo.lock index 99777274..6b3c1832 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1670,9 +1670,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.22" +version = "0.9.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "452e67b9c20c37fa79df53201dc03839651086ed9bbe92b3ca585ca9fdaa7d85" +checksum = "1a49e178e4452f45cb61d0cd8cebc1b0fafd3e41929e996cef79aa3aca91f574" dependencies = [ "indexmap", "itoa", @@ -2421,6 +2421,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "serde_yaml", "thiserror", "tokio", "tracing", diff --git a/operator-k8s/Cargo.toml b/operator-k8s/Cargo.toml index 5f8d3823..2fcfe10a 100644 --- a/operator-k8s/Cargo.toml +++ b/operator-k8s/Cargo.toml @@ -39,3 +39,4 @@ event-listener = "2.5.3" [dev-dependencies] garde = { version = "0.11.2", default-features = false, features = ["derive", "pattern"] } +serde_yaml = "0.9.25" diff --git a/operator-k8s/src/consts.rs b/operator-k8s/src/consts.rs index e1b3a96f..1403144f 100644 --- a/operator-k8s/src/consts.rs +++ b/operator-k8s/src/consts.rs @@ -28,3 +28,5 @@ pub(crate) const ANNOTATION_INHERIT_LABELS_PREFIX: &str = pub(crate) const LABEL_CLUSTER_NAME: &str = "xlinecluster/name"; /// The label attach to subresources, indicate the component type of this subresource pub(crate) const LABEL_CLUSTER_COMPONENT: &str = "xlinecluster/component"; +/// Indicate the version of operator that creates this subresource +pub(crate) const LABEL_OPERATOR_VERSION: &str = "xlinecluster/operator-version"; diff --git a/operator-k8s/src/crd/v1alpha1/cluster.rs b/operator-k8s/src/crd/v1alpha1/cluster.rs index 3bac92ee..ed840f2f 100644 --- a/operator-k8s/src/crd/v1alpha1/cluster.rs +++ b/operator-k8s/src/crd/v1alpha1/cluster.rs @@ -89,6 +89,15 @@ pub(crate) enum StorageSpec { }, } +impl StorageSpec { + pub(crate) fn as_pvc(&self) -> Option<&PersistentVolumeClaim> { + match *self { + Self::Pvc { ref pvc } => Some(pvc), + Self::S3 { .. } => None, + } + } +} + /// Xline cluster backup S3 specification #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] #[cfg_attr(test, derive(Validate))] diff --git a/operator-k8s/src/crd/v1alpha1/mod.rs b/operator-k8s/src/crd/v1alpha1/mod.rs index fc397c42..c2343934 100644 --- a/operator-k8s/src/crd/v1alpha1/mod.rs +++ b/operator-k8s/src/crd/v1alpha1/mod.rs @@ -1,6 +1,5 @@ #![allow(unused)] // TODO: remove when this CRD is used -pub(crate) use cluster::Cluster; -pub(crate) use cluster::StorageSpec; +pub(crate) use cluster::{BackupSpec, Cluster, ClusterSpec, StorageSpec}; mod cluster; diff --git a/operator-k8s/src/manager/cluster.rs b/operator-k8s/src/manager/cluster.rs index 5947703b..5d38ac7b 100644 --- a/operator-k8s/src/manager/cluster.rs +++ b/operator-k8s/src/manager/cluster.rs @@ -2,10 +2,10 @@ use crate::consts::{ ANNOTATION_INHERIT_LABELS_PREFIX, DEFAULT_SIDECAR_PORT, DEFAULT_XLINE_PORT, - LABEL_CLUSTER_COMPONENT, LABEL_CLUSTER_NAME, SIDECAR_PORT_NAME, XLINE_POD_NAME_ENV, - XLINE_PORT_NAME, + LABEL_CLUSTER_COMPONENT, LABEL_CLUSTER_NAME, LABEL_OPERATOR_VERSION, SIDECAR_PORT_NAME, + XLINE_POD_NAME_ENV, XLINE_PORT_NAME, }; -use crate::crd::v1alpha1::{Cluster, StorageSpec}; +use crate::crd::v1alpha1::{BackupSpec, Cluster, ClusterSpec, StorageSpec}; use std::collections::BTreeMap; use std::sync::Arc; @@ -126,13 +126,7 @@ impl<'a> Extractor<'a> { .spec .backup .iter() - .filter_map(|spec| { - if let StorageSpec::Pvc { pvc } = spec.storage.clone() { - Some(pvc) - } else { - None - } - }) + .filter_map(|spec| spec.storage.as_pvc().cloned()) .chain(self.cluster.spec.data.iter().cloned()) .chain(self.cluster.spec.pvcs.iter().flatten().cloned()) .collect() @@ -145,13 +139,7 @@ impl<'a> Extractor<'a> { .spec .backup .iter() - .filter_map(|spec| { - if let StorageSpec::Pvc { pvc } = spec.storage.clone() { - Some(pvc) - } else { - None - } - }) + .filter_map(|spec| spec.storage.as_pvc().cloned()) .map(|pvc| VolumeMount { name: pvc.name_any(), // because the volume name is the same as pvc template name, we can use it in volume mount mount_path: DEFAULT_BACKUP_DIR.to_owned(), @@ -173,28 +161,30 @@ impl<'a> Extractor<'a> { } /// Extract owner reference + #[allow(clippy::expect_used)] // it is ok because xlinecluster always populated from the apiserver fn extract_owner_ref(&self) -> OwnerReference { // unwrap controller_owner_ref is always safe - let Some(owner_ref) = self.cluster.controller_owner_ref(&()) else { unreachable!("kube-runtime has undergone some changes.") }; - owner_ref + self.cluster + .controller_owner_ref(&()) + .expect("metadata doesn't have name or uid") } /// Extract name, namespace #[allow(clippy::expect_used)] // it is ok because xlinecluster has field validation fn extract_id(&self) -> (&str, &str) { - let namespace = self - .cluster - .metadata - .namespace - .as_deref() - .expect("xlinecluster resource should have a namespace"); let name = self .cluster .metadata .name .as_deref() .expect("xlinecluster resource should have a name"); - (namespace, name) + let namespace = self + .cluster + .metadata + .namespace + .as_deref() + .expect("xlinecluster resource should have a namespace"); + (name, namespace) } /// Extract inherit labels @@ -251,16 +241,20 @@ impl Factory { let mut labels = extractor.extract_inherit_labels(); let (name, namespace) = extractor.extract_id(); let owner_ref = extractor.extract_owner_ref(); - let _ig = labels.insert(LABEL_CLUSTER_NAME.to_owned(), name.to_owned()); - let __ig = labels.insert( + _ = labels.insert(LABEL_CLUSTER_NAME.to_owned(), name.to_owned()); + _ = labels.insert( LABEL_CLUSTER_COMPONENT.to_owned(), component.label().to_owned(), ); + _ = labels.insert( + LABEL_OPERATOR_VERSION.to_owned(), + env!("CARGO_PKG_VERSION").to_owned(), + ); ObjectMeta { - labels: Some(labels), // it is used in selector - name: Some(Self::component_name(name, component)), // all subresources share the same name + labels: Some(labels), // it is used in selector + name: Some(Self::component_name(name, component)), namespace: Some(namespace.to_owned()), // all subresources share the same namespace - owner_references: Some(vec![owner_ref]), // allow k8s GC to automatically clean up itself + owner_references: Some(vec![owner_ref]), // allow k8s GC to automatically clean up itself when `XlineCluster` is deleted ..ObjectMeta::default() } } @@ -303,16 +297,13 @@ impl Factory { } /// Set the entrypoint of the container - fn set_command(&self, container: &mut Container, index: usize) { + fn set_command(&self, container: &mut Container, size: usize) { let extractor = Extractor::new(self.cluster.as_ref()); let (name, namespace) = extractor.extract_id(); let (xline_port, _, _) = extractor.extract_ports(); let srv_name = Self::component_name(name, Component::Service); let mut members = vec![]; - // the node before this index has already been added to the members - // we use the members from 0 to index to build the initial cluster config for this node - // and then do membership change to update the cluster config - for i in 0..=index { + for i in 0..=size { let node_name = format!("{}-{i}", Self::component_name(name, Component::Node)); members.push(format!( "{node_name}={node_name}.{srv_name}.{namespace}.svc.{}:{}", @@ -329,10 +320,10 @@ impl Factory { } /// Get the xline container - fn xline_container(&self, index: usize) -> Container { + fn xline_container(&self, size: usize) -> Container { let mut container = self.cluster.spec.container.clone(); self.mount_volume_on_container(&mut container); - self.set_command(&mut container, index); + self.set_command(&mut container, size); // we need to set the env variable to get the pod name in the container container.env = Some(vec![EnvVar { name: XLINE_POD_NAME_ENV.to_owned(), @@ -349,46 +340,262 @@ impl Factory { } /// Get the node pod - fn node_pod(&self, index: usize) -> PodTemplateSpec { + fn pod_spec(&self, size: usize) -> PodTemplateSpec { let extractor = Extractor::new(self.cluster.as_ref()); let (name, _) = extractor.extract_id(); - let node_name = format!("{}-{index}", Self::component_name(name, Component::Node)); - let xline = self.xline_container(index); - let volumes = extractor - .extract_pvc_template() - .into_iter() - .map(|pvc_template| Volume { - name: pvc_template.name_any(), // the volume name is the same as pvc template name - persistent_volume_claim: Some(PersistentVolumeClaimVolumeSource { - claim_name: format!("{}-{}", pvc_template.name_any(), node_name), // the pvc detail name is template name + node name - ..PersistentVolumeClaimVolumeSource::default() - }), - ..Volume::default() - }) - .collect(); - let mut meta = self.general_metadata(Component::Node); - meta.name = Some(node_name); + let xline = self.xline_container(size); PodTemplateSpec { - metadata: Some(meta), + metadata: Some(self.general_metadata(Component::Node)), spec: Some(PodSpec { init_containers: Some(vec![]), containers: vec![xline], affinity: self.cluster.spec.affinity.clone(), - volumes: Some(volumes), ..PodSpec::default() }), } } +} - /// Get the pvc for a node pod - fn pvc(&self, index: usize) -> Vec { - let extractor = Extractor::new(self.cluster.as_ref()); - let mut pvcs = extractor.extract_pvc_template(); - let (name, _) = extractor.extract_id(); - let node_name = format!("{}-{index}", Self::component_name(name, Component::Node)); - for pvc in &mut pvcs { - pvc.metadata.name = Some(format!("{}-{}", pvc.name_any(), node_name)); +#[cfg(test)] +mod tests { + use super::*; + use k8s_openapi::api::core::v1::{Affinity, NodeAffinity, PersistentVolumeClaimSpec}; + use kube::CustomResourceExt; + + static CLUSTER_1: &str = r#" +apiVersion: xlineoperator.xline.cloud/v1alpha +kind: XlineCluster +metadata: + name: my-xline-cluster + labels: + app: my-xline-cluster + appNamespace: default + annotations: + xlineoperator.datenlord.io/inherit-label-prefix: "app" +spec: + size: 3 + container: + image: "datenlord/xline" + name: "my-xline" + ports: + - containerPort: 2379 + name: xline + "#; + + static CLUSTER_2: &str = r#" +apiVersion: xlineoperator.xline.cloud/v1alpha +kind: XlineCluster +metadata: + name: my-xline-cluster +spec: + size: 5 + container: + image: "datenlord/xline" + name: "my-xline" + ports: + - containerPort: 3000 + name: xline + - containerPort: 3001 + name: sidecar + data: + metadata: + name: my-xline-cluster-data + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: "my-storage-class" + resources: + requests: + storage: 1Gi + "#; + + static CLUSTER_3: &str = r#" +apiVersion: xlineoperator.datenlord.io/v1alpha +kind: XlineCluster +metadata: + name: my-xline-cluster +spec: + size: 3 + backup: + cron: "*/15 * * * *" + pvc: + metadata: + name: backup-pvc + spec: + storageClassName: xline-backup + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi + container: + image: "datenlord/xline" + name: "my-xline" + ports: + - containerPort: 2379 + name: xline + "#; + + static CLUSTER_4: &str = r#" +apiVersion: xlineoperator.datenlord.io/v1alpha +kind: XlineCluster +metadata: + name: my-xline-cluster +spec: + size: 3 + container: + image: "datenlord/xline" + name: "my-xline" + ports: + - containerPort: 2379 + name: xline + pvcs: + - metadata: + name: xline-pvc + spec: + storageClassName: xline-backup + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi + "#; + + fn after_apiserver(cluster: &mut Cluster) { + cluster.metadata.namespace = Some("default".to_owned()); // use default namespace if no namespace specified in the yaml + cluster.metadata.uid = Some("this-is-a-random-uid".to_owned()); + } + + #[test] + fn extract_ports_should_work() { + for (cluster_raw, xline, sidecar) in [ + (CLUSTER_1, 2379, 2380), + (CLUSTER_2, 3000, 3001), + (CLUSTER_3, 2379, 2380), + (CLUSTER_4, 2379, 2380), + ] { + let mut cluster: Cluster = serde_yaml::from_str(cluster_raw).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let (xline_port, sidecar_port, service_ports) = extractor.extract_ports(); + assert_eq!(xline_port.container_port, xline); + assert_eq!(sidecar_port.container_port, sidecar); + assert_eq!(service_ports.len(), 2); + assert_eq!(service_ports[0].name.as_deref(), Some(XLINE_PORT_NAME)); + assert_eq!(service_ports[0].port, xline); + assert_eq!(service_ports[1].name.as_deref(), Some(SIDECAR_PORT_NAME)); + assert_eq!(service_ports[1].port, sidecar); + } + } + + #[test] + fn extract_id_should_work() { + for cluster_raw in [CLUSTER_1, CLUSTER_2, CLUSTER_3, CLUSTER_4] { + let mut cluster: Cluster = serde_yaml::from_str(cluster_raw).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let (name, namespace) = extractor.extract_id(); + assert_eq!(name, "my-xline-cluster"); + assert_eq!(namespace, "default"); + } + } + + #[test] + fn extract_pvc_should_work() { + let mut cluster: Cluster = serde_yaml::from_str(CLUSTER_1).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let pvcs = extractor.extract_pvc_template(); + assert_eq!(pvcs.len(), 0); + + let mut cluster: Cluster = serde_yaml::from_str(CLUSTER_2).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let pvcs = extractor.extract_pvc_template(); + assert_eq!(pvcs.len(), 1); + assert_eq!( + pvcs[0].metadata.name.as_deref(), + Some("my-xline-cluster-data") + ); + + let mut cluster: Cluster = serde_yaml::from_str(CLUSTER_3).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let pvcs = extractor.extract_pvc_template(); + assert_eq!(pvcs.len(), 1); + assert_eq!(pvcs[0].metadata.name.as_deref(), Some("backup-pvc")); + + let mut cluster: Cluster = serde_yaml::from_str(CLUSTER_4).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let pvcs = extractor.extract_pvc_template(); + assert_eq!(pvcs.len(), 1); + assert_eq!(pvcs[0].metadata.name.as_deref(), Some("xline-pvc")); + } + + #[test] + fn extract_volume_mount_should_work() { + let mut cluster: Cluster = serde_yaml::from_str(CLUSTER_1).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let volume_mount = extractor.extract_additional_volume_mount(); + assert_eq!(volume_mount.len(), 0); + + let mut cluster: Cluster = serde_yaml::from_str(CLUSTER_2).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let volume_mount = extractor.extract_additional_volume_mount(); + assert_eq!(volume_mount.len(), 1); + assert_eq!(volume_mount[0].name, "my-xline-cluster-data"); + assert_eq!(volume_mount[0].mount_path, DEFAULT_DATA_DIR); + + let mut cluster: Cluster = serde_yaml::from_str(CLUSTER_3).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let volume_mount = extractor.extract_additional_volume_mount(); + assert_eq!(volume_mount.len(), 1); + assert_eq!(volume_mount[0].name, "backup-pvc"); + assert_eq!(volume_mount[0].mount_path, DEFAULT_BACKUP_DIR); + + let mut cluster: Cluster = serde_yaml::from_str(CLUSTER_4).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let volume_mount = extractor.extract_additional_volume_mount(); + assert_eq!(volume_mount.len(), 0); + } + + #[test] + fn extract_owner_ref_should_work() { + for cluster_raw in [CLUSTER_1, CLUSTER_2, CLUSTER_3, CLUSTER_4] { + let mut cluster: Cluster = serde_yaml::from_str(cluster_raw).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let owner_ref = extractor.extract_owner_ref(); + assert_eq!(owner_ref.name, "my-xline-cluster"); } - pvcs + } + + #[test] + fn extract_inherit_labels_should_work() { + let mut cluster: Cluster = serde_yaml::from_str(CLUSTER_1).unwrap(); + after_apiserver(&mut cluster); + let extractor = Extractor::new(&cluster); + let labels = extractor.extract_inherit_labels(); + assert_eq!(labels.len(), 2); + assert_eq!(&labels["app"], "my-xline-cluster"); + assert_eq!(&labels["appNamespace"], "default"); + } + + #[test] + fn factory_component_name_should_work() { + assert_eq!( + Factory::component_name("my-xline-cluster", Component::Node), + "my-xline-cluster-node" + ); + assert_eq!( + Factory::component_name("my-xline-cluster", Component::Service), + "my-xline-cluster-srv" + ); + assert_eq!( + Factory::component_name("my-xline-cluster", Component::BackupJob), + "my-xline-cluster-job" + ); } }